001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.ScheduledExecutorService;
020    import java.util.concurrent.ScheduledFuture;
021    import java.util.concurrent.TimeUnit;
022    
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.Processor;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    
029    /**
030     * A useful base class for any consumer which is polling based
031     * 
032     * @version $Revision: 65756 $
033     */
034    public abstract class ScheduledPollConsumer<E extends Exchange> extends DefaultConsumer<E> implements
035        Runnable {
036        private static final transient Log LOG = LogFactory.getLog(ScheduledPollConsumer.class);
037    
038        private final ScheduledExecutorService executor;
039        private long initialDelay = 1000;
040        private long delay = 500;
041        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
042        private boolean useFixedDelay;
043        private ScheduledFuture<?> future;
044        private Exception firstExceptionThrown;
045        
046        public ScheduledPollConsumer(DefaultEndpoint<E> endpoint, Processor processor) {
047            this(endpoint, processor, endpoint.getExecutorService());
048        }
049    
050        public ScheduledPollConsumer(Endpoint<E> endpoint, Processor processor, ScheduledExecutorService executor) {
051            super(endpoint, processor);
052            this.executor = executor;
053            if (executor == null) {
054                throw new IllegalArgumentException("A non null ScheduledExecutorService must be provided.");
055            }
056        }
057    
058        /**
059         * Invoked whenever we should be polled
060         */
061        public void run() {
062            if (LOG.isTraceEnabled()) {
063                LOG.trace("Starting to poll: " + this.getEndpoint());
064            }
065            try {
066                poll();
067            } catch (Exception e) {
068                LOG.warn("An exception occured while polling: " + this.getEndpoint() + ": " + e.getMessage(), e);
069                if (firstExceptionThrown == null) {
070                    firstExceptionThrown = e;
071                } 
072            }
073        }
074    
075        // Properties
076        // -------------------------------------------------------------------------
077        public long getInitialDelay() {
078            return initialDelay;
079        }
080    
081        public void setInitialDelay(long initialDelay) {
082            this.initialDelay = initialDelay;
083        }
084    
085        public long getDelay() {
086            return delay;
087        }
088    
089        public void setDelay(long delay) {
090            this.delay = delay;
091        }
092    
093        public TimeUnit getTimeUnit() {
094            return timeUnit;
095        }
096    
097        public void setTimeUnit(TimeUnit timeUnit) {
098            this.timeUnit = timeUnit;
099        }
100    
101        public boolean isUseFixedDelay() {
102            return useFixedDelay;
103        }
104    
105        public void setUseFixedDelay(boolean useFixedDelay) {
106            this.useFixedDelay = useFixedDelay;
107        }
108    
109        // Implementation methods
110        // -------------------------------------------------------------------------
111    
112        /**
113         * The polling method which is invoked periodically to poll this consumer
114         * 
115         * @throws Exception can be thrown if an exception occurred during polling
116         */
117        protected abstract void poll() throws Exception;
118    
119        @Override
120        protected void doStart() throws Exception {
121            firstExceptionThrown = null;
122            super.doStart();
123            if (isUseFixedDelay()) {
124                future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
125            } else {
126                future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
127            }
128        }
129    
130        @Override
131        protected void doStop() throws Exception {
132            if (future != null) {
133                future.cancel(false);
134            }
135            super.doStop();
136            
137            if (firstExceptionThrown != null) {
138                throw firstExceptionThrown;
139            }
140        }
141    }