001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.CountDownLatch;
020    import java.util.concurrent.TimeUnit;
021    
022    import org.apache.camel.AlreadyStoppedException;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    
028    /**
029     * A useful base class for any processor which provides some kind of throttling
030     * or delayed processing
031     * 
032     * @version $Revision: 1257 $
033     */
034    public abstract class DelayProcessorSupport extends DelegateProcessor {
035        private static final transient Log LOG = LogFactory.getLog(Delayer.class);
036        private CountDownLatch stoppedLatch = new CountDownLatch(1);
037        private boolean fastStop = true;
038    
039        public DelayProcessorSupport(Processor processor) {
040            super(processor);
041        }
042    
043        public void process(Exchange exchange) throws Exception {
044            delay(exchange);
045            super.process(exchange);
046        }
047    
048        public boolean isFastStop() {
049            return fastStop;
050        }
051    
052        /**
053         * Enables & disables a fast stop; basically to avoid waiting a possibly
054         * long time for delays to complete before the context shuts down; instead
055         * the current processing method throws
056         * {@link org.apache.camel.AlreadyStoppedException} to terminate processing.
057         */
058        public void setFastStop(boolean fastStop) {
059            this.fastStop = fastStop;
060        }
061    
062        protected void doStop() throws Exception {
063            stoppedLatch.countDown();
064            super.doStop();
065        }
066    
067        protected abstract void delay(Exchange exchange) throws Exception;
068    
069        /**
070         * Wait until the given system time before continuing
071         * 
072         * @param time the system time to wait for
073         * @param exchange the exchange being processed
074         */
075        protected void waitUntil(long time, Exchange exchange) throws Exception {
076            while (true) {
077                long delay = time - currentSystemTime();
078                if (delay < 0) {
079                    return;
080                } else {
081                    if (isFastStop() && !isRunAllowed()) {
082                        throw new AlreadyStoppedException();
083                    }
084                    try {
085                        sleep(delay);
086                    } catch (InterruptedException e) {
087                        handleSleepInteruptedException(e);
088                    }
089                }
090            }
091        }
092    
093        protected void sleep(long delay) throws InterruptedException {
094            if (delay <= 0) {
095                return;
096            }
097            if (LOG.isTraceEnabled()) {
098                LOG.trace("Sleeping for: " + delay + " millis");
099            }
100            if (isFastStop()) {
101                stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
102            } else {
103                Thread.sleep(delay);
104            }
105        }
106    
107        /**
108         * Called when a sleep is interupted; allows derived classes to handle this
109         * case differently
110         */
111        protected void handleSleepInteruptedException(InterruptedException e) {
112            LOG.debug("Sleep interupted: " + e, e);
113        }
114    
115        protected long currentSystemTime() {
116            return System.currentTimeMillis();
117        }
118    }