001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.ArrayBlockingQueue;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.TimeUnit;
022    
023    import org.apache.camel.Consumer;
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.PollingConsumer;
027    import org.apache.camel.Processor;
028    import org.apache.camel.processor.Logger;
029    import org.apache.camel.spi.ExceptionHandler;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    /**
034     * A default implementation of the {@link PollingConsumer} which uses the normal
035     * asynchronous consumer mechanism along with a {@link BlockingQueue} to allow
036     * the caller to pull messages on demand.
037     *
038     * @version $Revision: 55928 $
039     */
040    public class EventDrivenPollingConsumer<E extends Exchange> extends PollingConsumerSupport<E> implements Processor {
041        private static final transient Log LOG = LogFactory.getLog(EventDrivenPollingConsumer.class);
042        private BlockingQueue<E> queue;
043        private ExceptionHandler interuptedExceptionHandler = new LoggingExceptionHandler(new Logger(LOG));
044        private Consumer<E> consumer;
045    
046        public EventDrivenPollingConsumer(Endpoint<E> endpoint) {
047            this(endpoint, new ArrayBlockingQueue<E>(1000));
048        }
049    
050        public EventDrivenPollingConsumer(Endpoint<E> endpoint, BlockingQueue<E> queue) {
051            super(endpoint);
052            this.queue = queue;
053        }
054    
055        public E receiveNoWait() {
056            return receive(0);
057        }
058    
059        public E receive() {
060            while (isRunAllowed()) {
061                try {
062                    return queue.take();
063                } catch (InterruptedException e) {
064                    handleInteruptedException(e);
065                }
066            }
067            LOG.debug("Consumer is not running, so returning null");
068            return null;
069        }
070    
071        public E receive(long timeout) {
072            try {
073                return queue.poll(timeout, TimeUnit.MILLISECONDS);
074            } catch (InterruptedException e) {
075                handleInteruptedException(e);
076                return null;
077            }
078        }
079    
080        public void process(Exchange exchange) throws Exception {
081            queue.offer((E)exchange);
082        }
083    
084        public ExceptionHandler getInteruptedExceptionHandler() {
085            return interuptedExceptionHandler;
086        }
087    
088        public void setInteruptedExceptionHandler(ExceptionHandler interuptedExceptionHandler) {
089            this.interuptedExceptionHandler = interuptedExceptionHandler;
090        }
091    
092        protected void handleInteruptedException(InterruptedException e) {
093            getInteruptedExceptionHandler().handleException(e);
094        }
095    
096        protected void doStart() throws Exception {
097            // lets add ourselves as a consumer
098            consumer = getEndpoint().createConsumer(this);
099            consumer.start();
100        }
101    
102        protected void doStop() throws Exception {
103            if (consumer != null) {
104                try {
105                    consumer.stop();
106                } finally {
107                    consumer = null;
108                }
109            }
110        }
111    }