001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.AsyncProcessor;
021    import org.apache.camel.Endpoint;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Producer;
024    import org.apache.camel.Service;
025    import org.apache.camel.impl.ServiceSupport;
026    import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     * @version $Revision: 35332 $
032     */
033    public class SendProcessor extends ServiceSupport implements AsyncProcessor, Service {
034        private static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
035        private Endpoint destination;
036        private Producer producer;
037        private AsyncProcessor processor;
038    
039        public SendProcessor(Endpoint destination) {
040            if (destination == null) {
041                throw new IllegalArgumentException("Endpoint cannot be null!");
042            }
043            this.destination = destination;
044        }
045    
046        @Override
047        public String toString() {
048            return "sendTo(" + destination + ")";
049        }
050    
051        public void process(Exchange exchange) throws Exception {
052            if (producer == null) {
053                if (isStopped()) {
054                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
055                } else {
056                    throw new IllegalStateException("No producer, this processor has not been started!");
057                }
058            } else {
059                producer.process(exchange);
060            }
061        }
062    
063        public boolean process(Exchange exchange, AsyncCallback callback) {
064            if (producer == null) {
065                if (isStopped()) {
066                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
067                } else {
068                    exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
069                }
070                callback.done(true);
071                return true;
072            } else {
073                return processor.process(exchange, callback);
074            }
075        }
076    
077        
078        public Endpoint getDestination() {
079            return destination;
080        }
081    
082        protected void doStart() throws Exception {
083            this.producer = destination.createProducer();
084            this.producer.start();
085            this.processor = AsyncProcessorTypeConverter.convert(producer);
086        }
087    
088        protected void doStop() throws Exception {
089            if (producer != null) {
090                try {
091                    producer.stop();
092                } finally {
093                    producer = null;
094                    processor = null;
095                }
096            }
097        }
098    
099    }