001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Processor;
021    
022    /**
023     * A <a href="http://activemq.apache.org/camel/throttler.html">Throttler</a>
024     * will set a limit on the maximum number of message exchanges which can be sent
025     * to a processor within a specific time period. <p/> This pattern can be
026     * extremely useful if you have some external system which meters access; such
027     * as only allowing 100 requests per second; or if huge load can cause a
028     * particular systme to malfunction or to reduce its throughput you might want
029     * to introduce some throttling.
030     * 
031     * @version $Revision: 35332 $
032     */
033    public class Throttler extends DelayProcessorSupport {
034        private long maximumRequestsPerPeriod;
035        private long timePeriodMillis;
036        private long startTimeMillis;
037        private long requestCount;
038    
039        public Throttler(Processor processor, long maximumRequestsPerPeriod) {
040            this(processor, maximumRequestsPerPeriod, 1000);
041        }
042    
043        public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
044            super(processor);
045            this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
046            this.timePeriodMillis = timePeriodMillis;
047        }
048    
049        @Override
050        public String toString() {
051            return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: "
052                   + getProcessor() + "]";
053        }
054    
055        // Properties
056        // -----------------------------------------------------------------------
057        public long getMaximumRequestsPerPeriod() {
058            return maximumRequestsPerPeriod;
059        }
060    
061        /**
062         * Sets the maximum number of requests per time period
063         */
064        public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
065            this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
066        }
067    
068        public long getTimePeriodMillis() {
069            return timePeriodMillis;
070        }
071    
072        /**
073         * Sets the time period during which the maximum number of requests apply
074         */
075        public void setTimePeriodMillis(long timePeriodMillis) {
076            this.timePeriodMillis = timePeriodMillis;
077        }
078    
079        /**
080         * The number of requests which have taken place so far within this time
081         * period
082         */
083        public long getRequestCount() {
084            return requestCount;
085        }
086    
087        /**
088         * The start time when this current period began
089         */
090        public long getStartTimeMillis() {
091            return startTimeMillis;
092        }
093    
094        // Implementation methods
095        // -----------------------------------------------------------------------
096        protected void delay(Exchange exchange) throws Exception {
097            long now = currentSystemTime();
098            if (startTimeMillis == 0) {
099                startTimeMillis = now;
100            }
101            if (now - startTimeMillis > timePeriodMillis) {
102                // we're at the start of a new time period
103                // so lets reset things
104                requestCount = 1;
105                startTimeMillis = now;
106            } else {
107                if (++requestCount > maximumRequestsPerPeriod) {
108                    // lets sleep until the start of the next time period
109                    long time = startTimeMillis + timePeriodMillis;
110                    waitUntil(time, exchange);
111                }
112            }
113        }
114    }