001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.List;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Message;
027    import org.apache.camel.Processor;
028    import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
029    import org.apache.camel.util.AsyncProcessorHelper;
030    import org.apache.camel.util.ExchangeHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * Creates a Pipeline pattern where the output of the previous step is sent as
036     * input to the next step, reusing the same message exchanges
037     *
038     * @version $Revision: 36321 $
039     */
040    public class Pipeline extends MulticastProcessor implements AsyncProcessor {
041        private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
042    
043        public Pipeline(Collection<Processor> processors) {
044            super(processors);
045        }
046    
047        public static Processor newInstance(List<Processor> processors) {
048            if (processors.isEmpty()) {
049                return null;
050            } else if (processors.size() == 1) {
051                return processors.get(0);
052            }
053            return new Pipeline(processors);
054        }
055    
056        public void process(Exchange exchange) throws Exception {
057            AsyncProcessorHelper.process(this, exchange);
058        }
059    
060        public boolean process(Exchange original, AsyncCallback callback) {
061            Iterator<Processor> processors = getProcessors().iterator();
062            Exchange nextExchange = original;
063            boolean first = true;
064            while (true) {
065                if (nextExchange.isFailed()) {
066                    if (LOG.isDebugEnabled()) {
067                        LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange
068                                  + " exception: " + nextExchange.getException() + " fault: "
069                                  + nextExchange.getFault(false));
070                    }
071                    break;
072                }
073                if (!processors.hasNext()) {
074                    break;
075                }
076    
077                AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
078    
079                if (first) {
080                    first = false;
081                } else {
082                    nextExchange = createNextExchange(processor, nextExchange);
083                }
084    
085                boolean sync = process(original, nextExchange, callback, processors, processor);
086                // Continue processing the pipeline synchronously ...
087                if (!sync) {
088                    // The pipeline will be completed async...
089                    return false;
090                }
091            }
092    
093            // If we get here then the pipeline was processed entirely
094            // synchronously.
095            ExchangeHelper.copyResults(original, nextExchange);
096            callback.done(true);
097            return true;
098        }
099    
100        private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
101            return processor.process(exchange, new AsyncCallback() {
102                public void done(boolean sync) {
103    
104                    // We only have to handle async completion of
105                    // the pipeline..
106                    if (sync) {
107                        return;
108                    }
109    
110                    // Continue processing the pipeline...
111                    Exchange nextExchange = exchange;
112                    while (processors.hasNext()) {
113                        AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
114    
115                        if (nextExchange.isFailed()) {
116                            if (LOG.isDebugEnabled()) {
117                                LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: "
118                                        + nextExchange.getFault(false));
119                            }
120                            break;
121                        }
122    
123                        nextExchange = createNextExchange(processor, nextExchange);
124                        sync = process(original, nextExchange, callback, processors, processor);
125                        if (!sync) {
126                            return;
127                        }
128                    }
129    
130                    ExchangeHelper.copyResults(original, nextExchange);
131                    callback.done(false);
132                }
133            });
134        }
135    
136        /**
137         * Strategy method to create the next exchange from the
138         *
139         * @param producer         the producer used to send to the endpoint
140         * @param previousExchange the previous exchange
141         * @return a new exchange
142         */
143        protected Exchange createNextExchange(Processor producer, Exchange previousExchange) {
144            Exchange answer = previousExchange.newInstance();
145    
146            answer.getProperties().putAll(previousExchange.getProperties());
147    
148            // now lets set the input of the next exchange to the output of the
149            // previous message if it is not null
150            Message previousOut = previousExchange.getOut(false);
151            Message in = answer.getIn();
152            if (previousOut != null) {
153                in.copyFrom(previousOut);
154            } else {
155                in.copyFrom(previousExchange.getIn());
156            }
157            return answer;
158        }
159    
160        @Override
161        public String toString() {
162            return "Pipeline" + getProcessors();
163        }
164    }