001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.CountDownLatch;
020    import java.util.concurrent.TimeUnit;
021    
022    import org.apache.camel.AlreadyStoppedException;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    
028    /**
029     * A useful base class for any processor which provides some kind of throttling
030     * or delayed processing
031     * 
032     * @version $Revision: 35332 $
033     */
034    public abstract class DelayProcessorSupport extends DelegateProcessor {
035        private static final transient Log LOG = LogFactory.getLog(Delayer.class);
036        private CountDownLatch stoppedLatch = new CountDownLatch(1);
037        private boolean fastStop = true;
038    
039        public DelayProcessorSupport(Processor processor) {
040            super(processor);
041        }
042    
043        public void process(Exchange exchange) throws Exception {
044            delay(exchange);
045            super.process(exchange);
046        }
047    
048        public boolean isFastStop() {
049            return fastStop;
050        }
051    
052        /**
053         * Enables & disables a fast stop; basically to avoid waiting a possibly
054         * long time for delays to complete before the context shuts down; instead
055         * the current processing method throws
056         * {@link org.apache.camel.AlreadyStoppedException} to terminate processing.
057         */
058        public void setFastStop(boolean fastStop) {
059            this.fastStop = fastStop;
060        }
061    
062        protected void doStop() throws Exception {
063            stoppedLatch.countDown();
064            super.doStop();
065        }
066    
067        protected abstract void delay(Exchange exchange) throws Exception;
068    
069        /**
070         * Wait until the given system time before continuing
071         * 
072         * @param time the system time to wait for
073         * @param exchange the exchange being processed
074         */
075        protected void waitUntil(long time, Exchange exchange) throws Exception {
076            while (true) {
077                long delay = time - currentSystemTime();
078                if (delay < 0) {
079                    return;
080                } else {
081                    if (isFastStop() && !isRunAllowed()) {
082                        throw new AlreadyStoppedException();
083                    }
084                    try {
085                        sleep(delay);
086                    } catch (InterruptedException e) {
087                        handleSleepInteruptedException(e);
088                    }
089                }
090            }
091        }
092    
093        protected void sleep(long delay) throws InterruptedException {
094            if (LOG.isDebugEnabled()) {
095                LOG.debug("Sleeping for: " + delay + " millis");
096            }
097            if (isFastStop()) {
098                stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
099            } else {
100                Thread.sleep(delay);
101            }
102        }
103    
104        /**
105         * Called when a sleep is interupted; allows derived classes to handle this
106         * case differently
107         */
108        protected void handleSleepInteruptedException(InterruptedException e) {
109            LOG.debug("Sleep interupted: " + e, e);
110        }
111    
112        protected long currentSystemTime() {
113            return System.currentTimeMillis();
114        }
115    }