001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Processor;
021    
022    /**
023     * A <a href="http://activemq.apache.org/camel/throttler.html">Throttler</a>
024     * will set a limit on the maximum number of message exchanges which can be sent
025     * to a processor within a specific time period. <p/> This pattern can be
026     * extremely useful if you have some external system which meters access; such
027     * as only allowing 100 requests per second; or if huge load can cause a
028     * particular systme to malfunction or to reduce its throughput you might want
029     * to introduce some throttling.
030     * 
031     * @version $Revision: 65754 $
032     */
033    public class Throttler extends DelayProcessorSupport {
034        private long maximumRequestsPerPeriod;
035        private long timePeriodMillis;
036        private TimeSlot slot;
037    
038        public Throttler(Processor processor, long maximumRequestsPerPeriod) {
039            this(processor, maximumRequestsPerPeriod, 1000);
040        }
041    
042        public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
043            super(processor);
044            this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
045            this.timePeriodMillis = timePeriodMillis;
046        }
047    
048        @Override
049        public String toString() {
050            return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: "
051                   + getProcessor() + "]";
052        }
053    
054        // Properties
055        // -----------------------------------------------------------------------
056        public long getMaximumRequestsPerPeriod() {
057            return maximumRequestsPerPeriod;
058        }
059    
060        /**
061         * Sets the maximum number of requests per time period
062         */
063        public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
064            this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
065        }
066    
067        public long getTimePeriodMillis() {
068            return timePeriodMillis;
069        }
070    
071        /**
072         * Sets the time period during which the maximum number of requests apply
073         */
074        public void setTimePeriodMillis(long timePeriodMillis) {
075            this.timePeriodMillis = timePeriodMillis;
076        }
077    
078        // Implementation methods
079        // -----------------------------------------------------------------------
080        protected void delay(Exchange exchange) throws Exception {
081            TimeSlot slot = nextSlot();
082            if (!slot.isActive()) {
083                waitUntil(slot.startTime, exchange);
084            }
085        }
086        
087        /*
088         * Determine what the next available time slot is for handling an Exchange
089         */
090        protected synchronized TimeSlot nextSlot() {
091            if (slot == null) {
092                slot = new TimeSlot();
093            }
094            if (slot.isFull()) {
095                slot = slot.next();
096            }
097            slot.assign();
098            return slot;
099        }
100        
101        /*
102         * A time slot is capable of handling a number of exchanges within a certain period of time.
103         */
104        protected class TimeSlot {
105            
106            private long capacity = Throttler.this.maximumRequestsPerPeriod;
107            private final long duration = Throttler.this.timePeriodMillis;
108            private final long startTime;
109    
110            protected TimeSlot() {
111                this(System.currentTimeMillis());
112            }
113    
114            protected TimeSlot(long startTime) {
115                this.startTime = startTime;
116            }
117            
118            protected void assign() {
119                capacity--;
120            }
121            
122            /*
123             * Start the next time slot either now or in the future
124             * (no time slots are being created in the past)
125             */
126            protected TimeSlot next() {
127                return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
128            }
129            
130            protected boolean isActive() {
131                return startTime <= System.currentTimeMillis();
132            }
133            
134            protected boolean isFull() {
135                return capacity <= 0;
136            }        
137        }
138    }