001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.Collections;
022    import java.util.LinkedList;
023    import java.util.List;
024    import java.util.concurrent.ThreadPoolExecutor;
025    
026    import javax.xml.bind.annotation.XmlAttribute;
027    import javax.xml.bind.annotation.XmlTransient;
028    
029    import org.apache.camel.CamelException;
030    import org.apache.camel.Endpoint;
031    import org.apache.camel.Exchange;
032    import org.apache.camel.Expression;
033    import org.apache.camel.Predicate;
034    import org.apache.camel.Processor;
035    import org.apache.camel.Route;
036    import org.apache.camel.RuntimeCamelException;
037    import org.apache.camel.builder.DataFormatClause;
038    import org.apache.camel.builder.DeadLetterChannelBuilder;
039    import org.apache.camel.builder.ErrorHandlerBuilder;
040    import org.apache.camel.builder.ExpressionClause;
041    import org.apache.camel.builder.NoErrorHandlerBuilder;
042    import org.apache.camel.builder.ProcessorBuilder;
043    import org.apache.camel.converter.ObjectConverter;
044    import org.apache.camel.impl.RouteContext;
045    import org.apache.camel.model.dataformat.DataFormatType;
046    import org.apache.camel.model.language.ExpressionType;
047    import org.apache.camel.model.language.LanguageExpression;
048    import org.apache.camel.processor.ConvertBodyProcessor;
049    import org.apache.camel.processor.DelegateProcessor;
050    import org.apache.camel.processor.MulticastProcessor;
051    import org.apache.camel.processor.Pipeline;
052    import org.apache.camel.processor.RecipientList;
053    import org.apache.camel.processor.aggregate.AggregationCollection;
054    import org.apache.camel.processor.aggregate.AggregationStrategy;
055    import org.apache.camel.processor.idempotent.IdempotentConsumer;
056    import org.apache.camel.processor.idempotent.MessageIdRepository;
057    import org.apache.camel.spi.DataFormat;
058    import org.apache.camel.spi.Policy;
059    import org.apache.camel.spi.Registry;
060    import org.apache.commons.logging.Log;
061    import org.apache.commons.logging.LogFactory;
062    
063    /**
064     * @version $Revision: 41168 $
065     */
066    public abstract class ProcessorType<Type extends ProcessorType> implements Block {
067        public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
068        private ErrorHandlerBuilder errorHandlerBuilder;
069        private Boolean inheritErrorHandlerFlag = Boolean.TRUE; // TODO not sure how
070        private DelegateProcessor lastInterceptor;
071        private NodeFactory nodeFactory;
072        private LinkedList<Block> blocks = new LinkedList<Block>();
073        private ProcessorType<? extends ProcessorType> parent;
074    
075        // else to use an optional attribute in JAXB2
076        public abstract List<ProcessorType<?>> getOutputs();
077    
078    
079        public Processor createProcessor(RouteContext routeContext) throws Exception {
080            throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
081        }
082    
083        public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
084            Collection<ProcessorType<?>> outputs = getOutputs();
085            return createOutputsProcessor(routeContext, outputs);
086        }
087    
088        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
089            Processor processor = makeProcessor(routeContext);
090            routeContext.addEventDrivenProcessor(processor);
091        }
092    
093        /**
094         * Wraps the child processor in whatever necessary interceptors and error
095         * handlers
096         */
097        public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
098            processor = wrapProcessorInInterceptors(routeContext, processor);
099            return wrapInErrorHandler(processor);
100        }
101    
102        // Fluent API
103        // -------------------------------------------------------------------------
104    
105        /**
106         * Sends the exchange to the given endpoint URI
107         */
108        public Type to(String uri) {
109            addOutput(new ToType(uri));
110            return (Type) this;
111        }
112    
113        /**
114         * Sends the exchange to the given endpoint
115         */
116        public Type to(Endpoint endpoint) {
117            addOutput(new ToType(endpoint));
118            return (Type) this;
119        }
120    
121        /**
122         * Sends the exchange to a list of endpoints using the
123         * {@link MulticastProcessor} pattern
124         */
125        public Type to(String... uris) {
126            for (String uri : uris) {
127                addOutput(new ToType(uri));
128            }
129            return (Type) this;
130        }
131    
132        /**
133         * Sends the exchange to a list of endpoints using the
134         * {@link MulticastProcessor} pattern
135         */
136        public Type to(Endpoint... endpoints) {
137            for (Endpoint endpoint : endpoints) {
138                addOutput(new ToType(endpoint));
139            }
140            return (Type) this;
141        }
142    
143        /**
144         * Sends the exchange to a list of endpoint using the
145         * {@link MulticastProcessor} pattern
146         */
147        public Type to(Collection<Endpoint> endpoints) {
148            for (Endpoint endpoint : endpoints) {
149                addOutput(new ToType(endpoint));
150            }
151            return (Type) this;
152        }
153    
154        /**
155         * Multicasts messages to all its child outputs; so that each processor and
156         * destination gets a copy of the original message to avoid the processors
157         * interfering with each other.
158         */
159        public MulticastType multicast() {
160            MulticastType answer = new MulticastType();
161            addOutput(answer);
162            return answer;
163        }
164    
165        /**
166         * Multicasts messages to all its child outputs; so that each processor and
167         * destination gets a copy of the original message to avoid the processors
168         * interfering with each other.
169         * @param aggregationStrategy the strategy used to aggregate responses for
170         *          every part
171         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
172         * @return the multicast type
173         */
174        public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
175            MulticastType answer = new MulticastType();
176            addOutput(answer);
177            answer.setAggregationStrategy(aggregationStrategy);
178            answer.setParallelProcessing(parallelProcessing);
179            return answer;
180        }
181    
182        /**
183         * Multicasts messages to all its child outputs; so that each processor and
184         * destination gets a copy of the original message to avoid the processors
185         * interfering with each other.
186         * @param aggregationStrategy the strategy used to aggregate responses for
187         *          every part
188         * @return the multicast type
189         */
190        public MulticastType multicast(AggregationStrategy aggregationStrategy) {
191            MulticastType answer = new MulticastType();
192            addOutput(answer);
193            answer.setAggregationStrategy(aggregationStrategy);
194            return answer;
195        }
196    
197        /**
198         * Creates a {@link Pipeline} of the list of endpoints so that the message
199         * will get processed by each endpoint in turn and for request/response the
200         * output of one endpoint will be the input of the next endpoint
201         */
202        public Type pipeline(String... uris) {
203            // TODO pipeline v mulicast
204            return to(uris);
205        }
206    
207        /**
208         * Creates a {@link Pipeline} of the list of endpoints so that the message
209         * will get processed by each endpoint in turn and for request/response the
210         * output of one endpoint will be the input of the next endpoint
211         */
212        public Type pipeline(Endpoint... endpoints) {
213            // TODO pipeline v mulicast
214            return to(endpoints);
215        }
216    
217        /**
218         * Creates a {@link Pipeline} of the list of endpoints so that the message
219         * will get processed by each endpoint in turn and for request/response the
220         * output of one endpoint will be the input of the next endpoint
221         */
222        public Type pipeline(Collection<Endpoint> endpoints) {
223            // TODO pipeline v mulicast
224            return to(endpoints);
225        }
226    
227        /**
228         * Ends the current block
229         */
230        public ProcessorType<? extends ProcessorType> end() {
231            if (blocks.isEmpty()) {
232                if (parent == null) {
233                    throw new IllegalArgumentException("Root node with no active block");
234                }
235                return parent;
236            }
237            popBlock();
238            return this;
239        }
240    
241        /**
242         * Causes subsequent processors to be called asynchronously
243         *
244         * @param coreSize the number of threads that will be used to process
245         *                 messages in subsequent processors.
246         * @return a ThreadType builder that can be used to further configure the
247         *         the thread pool.
248         */
249        public ThreadType thread(int coreSize) {
250            ThreadType answer = new ThreadType(coreSize);
251            addOutput(answer);
252            return answer;
253        }
254    
255        /**
256         * Causes subsequent processors to be called asynchronously
257         *
258         * @param executor the executor that will be used to process
259         *                 messages in subsequent processors.
260         * @return a ThreadType builder that can be used to further configure the
261         *         the thread pool.
262         */
263        public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
264            ThreadType answer = new ThreadType(executor);
265            addOutput(answer);
266            return this;
267        }
268    
269        /**
270         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
271         */
272        public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
273                MessageIdRepository messageIdRepository) {
274            IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
275            addOutput(answer);
276            return answer;
277        }
278    
279        /**
280         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
281         *
282         * @return the builder used to create the expression
283         */
284        public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) {
285            IdempotentConsumerType answer = new IdempotentConsumerType();
286            answer.setMessageIdRepository(messageIdRepository);
287            addOutput(answer);
288            return ExpressionClause.createAndSetExpression(answer);
289        }
290    
291        /**
292         * Creates a predicate expression which only if it is true then the
293         * exchange is forwarded to the destination
294         *
295         * @return the clause used to create the filter expression
296         */
297        public ExpressionClause<FilterType> filter() {
298            FilterType filter = new FilterType();
299            addOutput(filter);
300            return ExpressionClause.createAndSetExpression(filter);
301        }
302    
303        /**
304         * Creates a predicate which is applied and only if it is true then the
305         * exchange is forwarded to the destination
306         *
307         * @return the builder for a predicate
308         */
309        public FilterType filter(Predicate predicate) {
310            FilterType filter = new FilterType(predicate);
311            addOutput(filter);
312            return filter;
313        }
314    
315        public FilterType filter(ExpressionType expression) {
316            FilterType filter = getNodeFactory().createFilter();
317            filter.setExpression(expression);
318            addOutput(filter);
319            return filter;
320        }
321    
322        public FilterType filter(String language, String expression) {
323            return filter(new LanguageExpression(language, expression));
324        }
325    
326        public LoadBalanceType loadBalance() {
327            LoadBalanceType answer = new LoadBalanceType();
328            addOutput(answer);
329            return answer;
330        }
331    
332    
333        /**
334         * Creates a choice of one or more predicates with an otherwise clause
335         *
336         * @return the builder for a choice expression
337         */
338        public ChoiceType choice() {
339            ChoiceType answer = new ChoiceType();
340            addOutput(answer);
341            return answer;
342        }
343    
344        /**
345         * Creates a try/catch block
346         *
347         * @return the builder for a tryBlock expression
348         */
349        public TryType tryBlock() {
350            TryType answer = new TryType();
351            addOutput(answer);
352            return answer;
353        }
354    
355        /**
356         * Creates a dynamic <a
357         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
358         * List</a> pattern.
359         *
360         * @param receipients is the builder of the expression used in the
361         *                    {@link RecipientList} to decide the destinations
362         */
363        public Type recipientList(Expression receipients) {
364            RecipientListType answer = new RecipientListType(receipients);
365            addOutput(answer);
366            return (Type) this;
367        }
368    
369        /**
370         * Creates a dynamic <a
371         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
372         * List</a> pattern.
373         *
374         * @return the expression clause for the expression used in the
375         *                    {@link RecipientList} to decide the destinations
376         */
377        public ExpressionClause<ProcessorType<Type>> recipientList() {
378            RecipientListType answer = new RecipientListType();
379            addOutput(answer);
380            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
381            answer.setExpression(clause);
382            return clause;
383        }
384    
385        /**
386         * Creates a <a
387         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
388         * Slip</a> pattern.
389         *
390         * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
391         * class will look in for the list of URIs to route the message to.
392         * @param uriDelimiter is the delimiter that will be used to split up
393         * the list of URIs in the routing slip.
394         */
395        public Type routingSlip(String header, String uriDelimiter) {
396            RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter);
397            addOutput(answer);
398            return (Type) this;
399        }
400    
401        /**
402         * Creates a <a
403         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
404         * Slip</a> pattern.
405         *
406         * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
407         * class will look in for the list of URIs to route the message to. The list of URIs
408         * will be split based on the default delimiter
409         * {@link RoutingSlipType#DEFAULT_DELIMITER}.
410         */
411        public Type routingSlip(String header) {
412            RoutingSlipType answer = new RoutingSlipType(header);
413            addOutput(answer);
414            return (Type) this;
415        }
416    
417        /**
418         * Creates a <a
419         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
420         * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}.
421         * The list of URIs in the header will be split based on the default delimiter
422         * {@link RoutingSlipType#DEFAULT_DELIMITER}.
423         */
424        public Type routingSlip() {
425            RoutingSlipType answer = new RoutingSlipType();
426            addOutput(answer);
427            return (Type) this;
428        }
429    
430        /**
431         * Creates the <a
432         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
433         * pattern where an expression is evaluated to iterate through each of the
434         * parts of a message and then each part is then send to some endpoint.
435         * This splitter responds with the latest message returned from destination
436         * endpoint.
437         *
438         * @param receipients the expression on which to split
439         * @return the builder
440         */
441        public SplitterType splitter(Expression receipients) {
442            SplitterType answer = new SplitterType(receipients);
443            addOutput(answer);
444            return answer;
445        }
446    
447        /**
448         * Creates the <a
449         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
450         * pattern where an expression is evaluated to iterate through each of the
451         * parts of a message and then each part is then send to some endpoint.
452         * This splitter responds with the latest message returned from destination
453         * endpoint.
454         *
455         * @return the expression clause for the expression on which to split
456         */
457        public ExpressionClause<SplitterType> splitter() {
458            SplitterType answer = new SplitterType();
459            addOutput(answer);
460            return ExpressionClause.createAndSetExpression(answer);
461        }
462    
463        /**
464         * Creates the <a
465         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
466         * pattern where an expression is evaluated to iterate through each of the
467         * parts of a message and then each part is then send to some endpoint.
468         * Answer from the splitter is produced using given {@link AggregationStrategy}
469         * @param partsExpression the expression on which to split
470         * @param aggregationStrategy the strategy used to aggregate responses for
471         *          every part
472         * @return the builder
473         */
474        public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) {
475            SplitterType answer = new SplitterType(partsExpression);
476            addOutput(answer);
477            answer.setAggregationStrategy(aggregationStrategy);
478            return answer;
479        }
480    
481        /**
482         * Creates the <a
483         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
484         * pattern where an expression is evaluated to iterate through each of the
485         * parts of a message and then each part is then send to some endpoint.
486         * Answer from the splitter is produced using given {@link AggregationStrategy}
487         * @param aggregationStrategy the strategy used to aggregate responses for
488         *          every part
489         * @return the expression clause for the expression on which to split
490         */
491        public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) {
492            SplitterType answer = new SplitterType();
493            addOutput(answer);
494            answer.setAggregationStrategy(aggregationStrategy);
495            return ExpressionClause.createAndSetExpression(answer);
496        }
497    
498        /**
499         * Creates the <a
500         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
501         * pattern where an expression is evaluated to iterate through each of the
502         * parts of a message and then each part is then send to some endpoint.
503         * This splitter responds with the latest message returned from destination
504         * endpoint.
505         *
506         * @param receipients the expression on which to split
507         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
508         * @return the builder
509         */
510        public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
511            SplitterType answer = new SplitterType(receipients);
512            addOutput(answer);
513            answer.setParallelProcessing(parallelProcessing);
514            return answer;
515        }
516    
517        /**
518         * Creates the <a
519         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
520         * pattern where an expression is evaluated to iterate through each of the
521         * parts of a message and then each part is then send to some endpoint.
522         * This splitter responds with the latest message returned from destination
523         * endpoint.
524         *
525         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
526         * @return the expression clause for the expression on which to split
527         */
528        public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) {
529            SplitterType answer = new SplitterType();
530            addOutput(answer);
531            answer.setParallelProcessing(parallelProcessing);
532            return ExpressionClause.createAndSetExpression(answer);
533        }
534    
535        /**
536         * Creates the <a
537         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
538         * pattern where an expression is evaluated to iterate through each of the
539         * parts of a message and then each part is then send to some endpoint.
540         * Answer from the splitter is produced using given {@link AggregationStrategy}
541         * @param partsExpression the expression on which to split
542         * @param aggregationStrategy the strategy used to aggregate responses for
543         *          every part
544         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
545         * @return the builder
546         */
547        public SplitterType splitter(Expression partsExpression,
548                AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
549            SplitterType answer = new SplitterType(partsExpression);
550            addOutput(answer);
551            answer.setAggregationStrategy(aggregationStrategy);
552            answer.setParallelProcessing(parallelProcessing);
553            return answer;
554        }
555    
556        /**
557         * Creates the <a
558         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
559         * pattern where an expression is evaluated to iterate through each of the
560         * parts of a message and then each part is then send to some endpoint.
561         * Answer from the splitter is produced using given {@link AggregationStrategy}
562         * @param aggregationStrategy the strategy used to aggregate responses for
563         *          every part
564         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
565         * @return the expression clause for the expression on which to split
566         */
567        public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
568            SplitterType answer = new SplitterType();
569            addOutput(answer);
570            answer.setAggregationStrategy(aggregationStrategy);
571            answer.setParallelProcessing(parallelProcessing);
572            return ExpressionClause.createAndSetExpression(answer);
573        }
574    
575    
576        /**
577         * Creates the <a
578         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
579         * pattern where a list of expressions are evaluated to be able to compare
580         * the message exchanges to reorder them. e.g. you may wish to sort by some
581         * headers
582         *
583         * @return the expression clause for the expressions on which to compare messages in order
584         */
585        public ExpressionClause<ResequencerType> resequencer() {
586            ResequencerType answer = new ResequencerType();
587            addOutput(answer);
588            ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer);
589            answer.expression(clause);
590            return clause;
591        }
592    
593        /**
594         * Creates the <a
595         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
596         * pattern where an expression is evaluated to be able to compare the
597         * message exchanges to reorder them. e.g. you may wish to sort by some
598         * header
599         *
600         * @param expression the expression on which to compare messages in order
601         * @return the builder
602         */
603        public ResequencerType resequencer(Expression<Exchange> expression) {
604            return resequencer(Collections.<Expression>singletonList(expression));
605        }
606    
607        /**
608         * Creates the <a
609         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
610         * pattern where a list of expressions are evaluated to be able to compare
611         * the message exchanges to reorder them. e.g. you may wish to sort by some
612         * headers
613         *
614         * @param expressions the expressions on which to compare messages in order
615         * @return the builder
616         */
617        public ResequencerType resequencer(List<Expression> expressions) {
618            ResequencerType answer = new ResequencerType(expressions);
619            addOutput(answer);
620            return answer;
621        }
622    
623        /**
624         * Creates the <a
625         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
626         * pattern where a list of expressions are evaluated to be able to compare
627         * the message exchanges to reorder them. e.g. you may wish to sort by some
628         * headers
629         *
630         * @param expressions the expressions on which to compare messages in order
631         * @return the builder
632         */
633        public ResequencerType resequencer(Expression... expressions) {
634            List<Expression> list = new ArrayList<Expression>();
635            for (Expression expression : expressions) {
636                list.add(expression);
637            }
638            return resequencer(list);
639        }
640    
641        /**
642         * Creates an <a
643         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
644         * pattern where a batch of messages are processed (up to a maximum amount
645         * or until some timeout is reached) and messages for the same correlation
646         * key are combined together using some kind of {@link AggregationStrategy}
647         * (by default the latest message is used) to compress many message exchanges
648         * into a smaller number of exchanges.
649         * <p/>
650         * A good example of this is stock market data; you may be receiving 30,000
651         * messages/second and you may want to throttle it right down so that multiple
652         * messages for the same stock are combined (or just the latest message is used
653         * and older prices are discarded). Another idea is to combine line item messages
654         * together into a single invoice message.
655         */
656        public ExpressionClause<AggregatorType> aggregator() {
657            AggregatorType answer = new AggregatorType();
658            addOutput(answer);
659            return ExpressionClause.createAndSetExpression(answer);
660        }
661    
662        /**
663         * Creates an <a
664         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
665         * pattern where a batch of messages are processed (up to a maximum amount
666         * or until some timeout is reached) and messages for the same correlation
667         * key are combined together using some kind of {@link AggregationStrategy}
668         * (by default the latest message is used) to compress many message exchanges
669         * into a smaller number of exchanges.
670         * <p/>
671         * A good example of this is stock market data; you may be receiving 30,000
672         * messages/second and you may want to throttle it right down so that multiple
673         * messages for the same stock are combined (or just the latest message is used
674         * and older prices are discarded). Another idea is to combine line item messages
675         * together into a single invoice message.
676         *
677         * @param aggregationStrategy the strategy used for the aggregation
678         */
679        public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) {
680            AggregatorType answer = new AggregatorType();
681            answer.setAggregationStrategy(aggregationStrategy);
682            addOutput(answer);
683            return ExpressionClause.createAndSetExpression(answer);
684        }
685    
686        /**
687         * Creates an <a
688         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
689         * pattern using a custom aggregation collection implementation.
690         *
691         * @param aggregationCollection the collection used to perform the aggregation
692         */
693        public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) {
694            AggregatorType answer = new AggregatorType();
695            answer.setAggregationCollection(aggregationCollection);
696            addOutput(answer);
697            return ExpressionClause.createAndSetExpression(answer);
698        }
699    
700        /**
701         * Creates an <a
702         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
703         * pattern where a batch of messages are processed (up to a maximum amount
704         * or until some timeout is reached) and messages for the same correlation
705         * key are combined together using some kind of {@link AggregationStrategy}
706         * (by default the latest message is used) to compress many message exchanges
707         * into a smaller number of exchanges.
708         * <p/>
709         * A good example of this is stock market data; you may be receiving 30,000
710         * messages/second and you may want to throttle it right down so that multiple
711         * messages for the same stock are combined (or just the latest message is used
712         * and older prices are discarded). Another idea is to combine line item messages
713         * together into a single invoice message.
714         *
715         * @param correlationExpression the expression used to calculate the
716         *                              correlation key. For a JMS message this could be the
717         *                              expression <code>header("JMSDestination")</code> or
718         *                              <code>header("JMSCorrelationID")</code>
719         */
720        public AggregatorType aggregator(Expression correlationExpression) {
721            AggregatorType answer = new AggregatorType(correlationExpression);
722            addOutput(answer);
723            return answer;
724        }
725    
726        /**
727         * Creates an <a
728         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
729         * pattern where a batch of messages are processed (up to a maximum amount
730         * or until some timeout is reached) and messages for the same correlation
731         * key are combined together using some kind of {@link AggregationStrategy}
732         * (by default the latest message is used) to compress many message exchanges
733         * into a smaller number of exchanges.
734         * <p/>
735         * A good example of this is stock market data; you may be receiving 30,000
736         * messages/second and you may want to throttle it right down so that multiple
737         * messages for the same stock are combined (or just the latest message is used
738         * and older prices are discarded). Another idea is to combine line item messages
739         * together into a single invoice message.
740         *
741         * @param correlationExpression the expression used to calculate the
742         *                              correlation key. For a JMS message this could be the
743         *                              expression <code>header("JMSDestination")</code> or
744         *                              <code>header("JMSCorrelationID")</code>
745         */
746        public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
747            AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
748            addOutput(answer);
749            return answer;
750        }
751    
752        /**
753         * Creates the <a
754         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
755         * where an expression is used to calculate the time which the message will
756         * be dispatched on
757         *
758         * @param processAtExpression an expression to calculate the time at which
759         *                            the messages should be processed
760         * @return the builder
761         */
762        public DelayerType delayer(Expression<Exchange> processAtExpression) {
763            return delayer(processAtExpression, 0L);
764        }
765    
766        /**
767         * Creates the <a
768         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
769         * where an expression is used to calculate the time which the message will
770         * be dispatched on
771         *
772         * @param processAtExpression an expression to calculate the time at which
773         *                            the messages should be processed
774         * @param delay               the delay in milliseconds which is added to the
775         *                            processAtExpression to determine the time the message
776         *                            should be processed
777         * @return the builder
778         */
779        public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
780            DelayerType answer = new DelayerType(processAtExpression, delay);
781            addOutput(answer);
782            return answer;
783        }
784    
785        /**
786         * Creates the <a
787         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
788         * where an expression is used to calculate the time which the message will
789         * be dispatched on
790         * @return the expression clause to create the expression
791         */
792        public ExpressionClause<DelayerType> delayer() {
793            DelayerType answer = new DelayerType();
794            addOutput(answer);
795            return ExpressionClause.createAndSetExpression(answer);
796        }
797    
798        /**
799         * Creates the <a
800         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
801         * where a fixed amount of milliseconds are used to delay processing of a
802         * message exchange
803         *
804         * @param delay the default delay in milliseconds
805         * @return the builder
806         */
807        public DelayerType delayer(long delay) {
808            return delayer(null, delay);
809        }
810    
811        /**
812         * Creates the <a
813         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
814         * where an expression is used to calculate the time which the message will
815         * be dispatched on
816         *
817         * @return the builder
818         */
819        public ThrottlerType throttler(long maximumRequestCount) {
820            ThrottlerType answer = new ThrottlerType(maximumRequestCount);
821            addOutput(answer);
822            return answer;
823        }
824    
825    
826        public Type throwFault(Throwable fault) {
827            ThrowFaultType answer = new ThrowFaultType();
828            answer.setFault(fault);
829            addOutput(answer);
830            return (Type) this;
831        }
832    
833        public Type throwFault(String message) {
834            return throwFault(new CamelException(message));
835        }
836    
837        public Type interceptor(String ref) {
838            InterceptorRef interceptor = new InterceptorRef(ref);
839            addInterceptor(interceptor);
840            return (Type) this;
841        }
842    
843    
844        public Type intercept(DelegateProcessor interceptor) {
845            addInterceptor(new InterceptorRef(interceptor));
846            lastInterceptor = interceptor;
847            return (Type) this;
848        }
849    
850        public InterceptType intercept() {
851            InterceptType answer = new InterceptType();
852            addOutput(answer);
853            return answer;
854        }
855    
856        public void addInterceptor(InterceptorType interceptor) {
857            addOutput(interceptor);
858            pushBlock(interceptor);
859        }
860    
861        protected void pushBlock(Block block) {
862            blocks.add(block);
863        }
864    
865        protected Block popBlock() {
866            return blocks.isEmpty() ? null : blocks.removeLast();
867        }
868    
869        public Type proceed() {
870            ProceedType proceed = null;
871            ProcessorType currentProcessor = this;
872    
873            if (currentProcessor instanceof InterceptType) {
874                proceed = ((InterceptType) currentProcessor).getProceed();
875            }
876            if (proceed == null) {
877                for (ProcessorType node = parent; node != null; node = node.getParent()) {
878                    if (node instanceof InterceptType) {
879                        InterceptType intercept = (InterceptType)node;
880                        proceed = intercept.getProceed();
881                        break;
882                    }
883                }
884            }
885    
886            if (currentProcessor instanceof InterceptType) {
887                proceed = ((InterceptType) currentProcessor).getProceed();
888            }
889    
890            if (proceed == null) {
891                throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block");
892            }
893    
894            addOutput(proceed);
895            return (Type) this;
896        }
897    
898        public ExceptionType exception(Class exceptionType) {
899            ExceptionType answer = new ExceptionType(exceptionType);
900            addOutput(answer);
901            return answer;
902        }
903    
904        /**
905         * Apply an interceptor route if the predicate is true
906         */
907        public ChoiceType intercept(Predicate predicate) {
908            InterceptType answer = new InterceptType();
909            addOutput(answer);
910            return answer.when(predicate);
911        }
912    
913        public Type interceptors(String... refs) {
914            for (String ref : refs) {
915                interceptor(ref);
916            }
917            return (Type) this;
918        }
919    
920        /**
921         * Trace logs the exchange before it goes to the next processing step using
922         * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
923         */
924        public Type trace() {
925            return trace(DEFAULT_TRACE_CATEGORY);
926        }
927    
928        /**
929         * Trace logs the exchange before it goes to the next processing step using
930         * the specified logging category.
931         *
932         * @param category the logging category trace messages will sent to.
933         */
934        public Type trace(String category) {
935            final Log log = LogFactory.getLog(category);
936            return intercept(new DelegateProcessor() {
937                @Override
938                public void process(Exchange exchange) throws Exception {
939                    log.trace(exchange);
940                    processNext(exchange);
941                }
942            });
943        }
944    
945        public PolicyRef policies() {
946            PolicyRef answer = new PolicyRef();
947            addOutput(answer);
948            return answer;
949        }
950    
951        public PolicyRef policy(Policy policy) {
952            PolicyRef answer = new PolicyRef(policy);
953            addOutput(answer);
954            return answer;
955        }
956    
957        /**
958         * Forces handling of faults as exceptions
959         *
960         * @return the current builder with the fault handler configured
961         */
962        public Type handleFault() {
963            HandleFaultType interceptor = new HandleFaultType();
964            addInterceptor(interceptor);
965            return (Type) this;
966        }
967    
968        /**
969         * Installs the given error handler builder
970         *
971         * @param errorHandlerBuilder the error handler to be used by default for
972         *                            all child routes
973         * @return the current builder with the error handler configured
974         */
975        public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
976            setErrorHandlerBuilder(errorHandlerBuilder);
977            return (Type) this;
978        }
979    
980        /**
981         * Configures whether or not the error handler is inherited by every
982         * processing node (or just the top most one)
983         *
984         * @param condition the flag as to whether error handlers should be
985         *                  inherited or not
986         * @return the current builder
987         */
988        public Type inheritErrorHandler(boolean condition) {
989            setInheritErrorHandlerFlag(condition);
990            return (Type) this;
991        }
992    
993        // Transformers
994        // -------------------------------------------------------------------------
995    
996        /**
997         * Adds the custom processor to this destination which could be a final
998         * destination, or could be a transformation in a pipeline
999         */
1000        public Type process(Processor processor) {
1001            ProcessorRef answer = new ProcessorRef(processor);
1002            addOutput(answer);
1003            return (Type) this;
1004        }
1005    
1006        /**
1007         * Adds the custom processor reference to this destination which could be a final
1008         * destination, or could be a transformation in a pipeline
1009         */
1010        public Type processRef(String ref) {
1011            ProcessorRef answer = new ProcessorRef();
1012            answer.setRef(ref);
1013            addOutput(answer);
1014            return (Type) this;
1015        }
1016    
1017        /**
1018         * Adds a bean which is invoked which could be a final destination, or could
1019         * be a transformation in a pipeline
1020         */
1021        public Type bean(Object bean) {
1022            BeanRef answer = new BeanRef();
1023            answer.setBean(bean);
1024            addOutput(answer);
1025            return (Type) this;
1026        }
1027    
1028        /**
1029         * Adds a bean and method which is invoked which could be a final
1030         * destination, or could be a transformation in a pipeline
1031         */
1032        public Type bean(Object bean, String method) {
1033            BeanRef answer = new BeanRef();
1034            answer.setBean(bean);
1035            answer.setMethod(method);
1036            addOutput(answer);
1037            return (Type) this;
1038        }
1039    
1040        /**
1041         * Adds a bean by type which is invoked which could be a final destination, or could
1042         * be a transformation in a pipeline
1043         */
1044        public Type bean(Class beanType) {
1045            BeanRef answer = new BeanRef();
1046            answer.setBeanType(beanType);
1047            addOutput(answer);
1048            return (Type) this;
1049        }
1050    
1051        /**
1052         * Adds a bean type and method which is invoked which could be a final
1053         * destination, or could be a transformation in a pipeline
1054         */
1055        public Type bean(Class beanType, String method) {
1056            BeanRef answer = new BeanRef();
1057            answer.setBeanType(beanType);
1058            answer.setMethod(method);
1059            addOutput(answer);
1060            return (Type) this;
1061        }
1062    
1063        /**
1064         * Adds a bean which is invoked which could be a final destination, or could
1065         * be a transformation in a pipeline
1066         */
1067        public Type beanRef(String ref) {
1068            BeanRef answer = new BeanRef(ref);
1069            addOutput(answer);
1070            return (Type) this;
1071        }
1072    
1073        /**
1074         * Adds a bean and method which is invoked which could be a final
1075         * destination, or could be a transformation in a pipeline
1076         */
1077        public Type beanRef(String ref, String method) {
1078            BeanRef answer = new BeanRef(ref, method);
1079            addOutput(answer);
1080            return (Type) this;
1081        }
1082    
1083        /**
1084         * Adds a processor which sets the body on the IN message
1085         */
1086        public ExpressionClause<ProcessorType<Type>> setBody() {
1087            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1088            process(ProcessorBuilder.setBody(clause));
1089            return clause;
1090        }
1091    
1092        /**
1093         * Adds a processor which sets the body on the IN message
1094         */
1095        public Type setBody(Expression expression) {
1096            return process(ProcessorBuilder.setBody(expression));
1097        }
1098    
1099        /**
1100         * Adds a processor which sets the body on the OUT message
1101         *
1102         * @deprecated Please use {@link #transform(Expression)} instead
1103         */
1104        public Type setOutBody(Expression expression) {
1105            return transform(expression);
1106        }
1107    
1108        /**
1109         * Adds a processor which sets the body on the OUT message
1110         *
1111         * @deprecated Please use {@link #transform()} instead
1112         */
1113        public ExpressionClause<ProcessorType<Type>> setOutBody() {
1114            return transform();
1115        }
1116    
1117        /**
1118         * Adds a processor which sets the body on the OUT message
1119         */
1120        public Type transform(Expression expression) {
1121            TransformType answer = new TransformType(expression);
1122            addOutput(answer);
1123            return (Type) this;
1124        }
1125    
1126        /**
1127         * Adds a processor which sets the body on the OUT message
1128         */
1129        public ExpressionClause<ProcessorType<Type>> transform() {
1130            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>(
1131                    (Type) this);
1132            TransformType answer = new TransformType(clause);
1133            addOutput(answer);
1134            return clause;
1135        }
1136    
1137        /**
1138         * Adds a processor which sets the body on the FAULT message
1139         */
1140        public Type setFaultBody(Expression expression) {
1141            return process(ProcessorBuilder.setFaultBody(expression));
1142        }
1143    
1144        /**
1145         * Adds a processor which sets the header on the IN message
1146         */
1147        public ExpressionClause<ProcessorType<Type>> setHeader(String name) {
1148            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1149            SetHeaderType answer = new SetHeaderType(name, clause);
1150            addOutput(answer);
1151            return clause;
1152        }
1153    
1154        /**
1155         * Adds a processor which sets the header on the IN message
1156         */
1157        public Type setHeader(String name, Expression expression) {
1158            SetHeaderType answer = new SetHeaderType(name, expression);
1159            addOutput(answer);
1160            return (Type) this;
1161        }
1162    
1163        /**
1164         * Adds a processor which sets the header on the IN message to the given value
1165         */
1166        public Type setHeader(String name, String value) {
1167            SetHeaderType answer = new SetHeaderType(name, value);
1168            addOutput(answer);
1169            return (Type) this;
1170        }
1171    
1172        /**
1173         * Adds a processor which sets the header on the OUT message
1174         */
1175        public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) {
1176            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1177            process(ProcessorBuilder.setOutHeader(name, clause));
1178            return clause;
1179        }
1180    
1181        /**
1182         * Adds a processor which sets the header on the OUT message
1183         */
1184        public Type setOutHeader(String name, Expression expression) {
1185            return process(ProcessorBuilder.setOutHeader(name, expression));
1186        }
1187    
1188        /**
1189         * Adds a processor which sets the header on the OUT message
1190         */
1191        public Type setOutHeader(String name, String value) {
1192            return (Type) setOutHeader(name).constant(value);
1193        }
1194    
1195        /**
1196         * Adds a processor which sets the header on the FAULT message
1197         */
1198        public Type setFaultHeader(String name, Expression expression) {
1199            return process(ProcessorBuilder.setFaultHeader(name, expression));
1200        }
1201    
1202        /**
1203         * Adds a processor which sets the exchange property
1204         */
1205        public Type setProperty(String name, Expression expression) {
1206            return process(ProcessorBuilder.setProperty(name, expression));
1207        }
1208    
1209    
1210        /**
1211         * Adds a processor which sets the exchange property
1212         */
1213        public ExpressionClause<ProcessorType<Type>> setProperty(String name) {
1214            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1215            process(ProcessorBuilder.setProperty(name, clause));
1216            return clause;
1217        }
1218    
1219        /**
1220         * Adds a processor which removes the header on the IN message
1221         */
1222        public Type removeHeader(String name) {
1223            return process(ProcessorBuilder.removeHeader(name));
1224        }
1225    
1226        /**
1227         * Adds a processor which removes the header on the OUT message
1228         */
1229        public Type removeOutHeader(String name) {
1230            return process(ProcessorBuilder.removeOutHeader(name));
1231        }
1232    
1233        /**
1234         * Adds a processor which removes the header on the FAULT message
1235         */
1236        public Type removeFaultHeader(String name) {
1237            return process(ProcessorBuilder.removeFaultHeader(name));
1238        }
1239    
1240        /**
1241         * Adds a processor which removes the exchange property
1242         */
1243        public Type removeProperty(String name) {
1244            return process(ProcessorBuilder.removeProperty(name));
1245        }
1246    
1247        /**
1248         * Converts the IN message body to the specified type
1249         */
1250        public Type convertBodyTo(Class type) {
1251            addOutput(new ConvertBodyType(type));
1252            return (Type) this;
1253        }
1254    
1255        /**
1256         * Converts the OUT message body to the specified type
1257         *
1258         * @deprecated Please use {@link #convertBodyTo(Class)} instead
1259         */
1260        public Type convertOutBodyTo(Class type) {
1261            // TODO deprecate method?
1262            //return process(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type)));
1263            return process(new ConvertBodyProcessor(type));
1264        }
1265    
1266        /**
1267         * Converts the FAULT message body to the specified type
1268         */
1269        public Type convertFaultBodyTo(Class type) {
1270            // TODO deprecate method?
1271            //return process(ProcessorBuilder.setFaultBody(Builder.faultBody().convertTo(type)));
1272            return process(new ConvertBodyProcessor(type));
1273        }
1274    
1275        // DataFormat support
1276        // -------------------------------------------------------------------------
1277    
1278        /**
1279         * Unmarshals the in body using a {@link DataFormat} expression to define
1280         * the format of the input message and the output will be set on the out message body.
1281         *
1282         * @return the expression to create the {@link DataFormat}
1283         */
1284        public DataFormatClause<ProcessorType<Type>> unmarshal() {
1285            return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal);
1286        }
1287    
1288        /**
1289         * Unmarshals the in body using the specified {@link DataFormat}
1290         * and sets the output on the out message body.
1291         *
1292         * @return this object
1293         */
1294        public Type unmarshal(DataFormatType dataFormatType) {
1295            addOutput(new UnmarshalType(dataFormatType));
1296            return (Type) this;
1297        }
1298    
1299        /**
1300         * Unmarshals the in body using the specified {@link DataFormat}
1301         * and sets the output on the out message body.
1302         *
1303         * @return this object
1304         */
1305        public Type unmarshal(DataFormat dataFormat) {
1306            return unmarshal(new DataFormatType(dataFormat));
1307        }
1308    
1309        /**
1310         * Unmarshals the in body using the specified {@link DataFormat}
1311         * reference in the {@link Registry} and sets the output on the out message body.
1312         *
1313         * @return this object
1314         */
1315        public Type unmarshal(String dataTypeRef) {
1316            addOutput(new UnmarshalType(dataTypeRef));
1317            return (Type) this;
1318        }
1319    
1320        /**
1321         * Marshals the in body using a {@link DataFormat} expression to define
1322         * the format of the output which will be added to the out body.
1323         *
1324         * @return the expression to create the {@link DataFormat}
1325         */
1326        public DataFormatClause<ProcessorType<Type>> marshal() {
1327            return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal);
1328        }
1329    
1330        /**
1331         * Marshals the in body using the specified {@link DataFormat}
1332         * and sets the output on the out message body.
1333         *
1334         * @return this object
1335         */
1336        public Type marshal(DataFormatType dataFormatType) {
1337            addOutput(new MarshalType(dataFormatType));
1338            return (Type) this;
1339        }
1340    
1341        /**
1342         * Marshals the in body using the specified {@link DataFormat}
1343         * and sets the output on the out message body.
1344         *
1345         * @return this object
1346         */
1347        public Type marshal(DataFormat dataFormat) {
1348            return marshal(new DataFormatType(dataFormat));
1349        }
1350    
1351        /**
1352         * Marshals the in body the specified {@link DataFormat}
1353         * reference in the {@link Registry} and sets the output on the out message body.
1354         *
1355         * @return this object
1356         */
1357        public Type marshal(String dataTypeRef) {
1358            addOutput(new MarshalType(dataTypeRef));
1359            return (Type) this;
1360        }
1361    
1362        // Properties
1363        // -------------------------------------------------------------------------
1364        @XmlTransient
1365        public ProcessorType<? extends ProcessorType> getParent() {
1366            return parent;
1367        }
1368    
1369        public void setParent(ProcessorType<? extends ProcessorType> parent) {
1370            this.parent = parent;
1371        }
1372    
1373        @XmlTransient
1374        public ErrorHandlerBuilder getErrorHandlerBuilder() {
1375            if (errorHandlerBuilder == null) {
1376                errorHandlerBuilder = createErrorHandlerBuilder();
1377            }
1378            return errorHandlerBuilder;
1379        }
1380    
1381        /**
1382         * Sets the error handler to use with processors created by this builder
1383         */
1384        public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1385            this.errorHandlerBuilder = errorHandlerBuilder;
1386        }
1387    
1388        @XmlTransient
1389        public boolean isInheritErrorHandler() {
1390            return ObjectConverter.toBoolean(getInheritErrorHandlerFlag());
1391        }
1392    
1393        @XmlAttribute(name = "inheritErrorHandler", required = false)
1394        public Boolean getInheritErrorHandlerFlag() {
1395            return inheritErrorHandlerFlag;
1396        }
1397    
1398        public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
1399            this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
1400        }
1401    
1402        @XmlTransient
1403        public NodeFactory getNodeFactory() {
1404            if (nodeFactory == null) {
1405                nodeFactory = new NodeFactory();
1406            }
1407            return nodeFactory;
1408        }
1409    
1410        public void setNodeFactory(NodeFactory nodeFactory) {
1411            this.nodeFactory = nodeFactory;
1412        }
1413    
1414        /**
1415         * Returns a label to describe this node such as the expression if some kind of expression node
1416         */
1417        public String getLabel() {
1418            return "";
1419        }
1420    
1421        // Implementation methods
1422        // -------------------------------------------------------------------------
1423    
1424        /**
1425         * Creates the processor and wraps it in any necessary interceptors and
1426         * error handlers
1427         */
1428        protected Processor makeProcessor(RouteContext routeContext) throws Exception {
1429            Processor processor = createProcessor(routeContext);
1430            return wrapProcessor(routeContext, processor);
1431        }
1432    
1433        /**
1434         * A strategy method which allows derived classes to wrap the child
1435         * processor in some kind of interceptor
1436         *
1437         * @param routeContext
1438         * @param target       the processor which can be wrapped
1439         * @return the original processor or a new wrapped interceptor
1440         */
1441        protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
1442            // The target is required.
1443            if (target == null) {
1444                throw new RuntimeCamelException("target provided.");
1445            }
1446    
1447            // Interceptors are optional
1448            DelegateProcessor first = null;
1449            DelegateProcessor last = null;
1450    /*
1451    
1452            List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute()
1453                    .getInterceptors());
1454            List<InterceptorType> list = getInterceptors();
1455            for (InterceptorType interceptorType : list) {
1456                if (!interceptors.contains(interceptorType)) {
1457                    interceptors.add(interceptorType);
1458                }
1459            }
1460            for (InterceptorType interceptorRef : interceptors) {
1461                DelegateProcessor p = interceptorRef.createInterceptor(routeContext);
1462                if (first == null) {
1463                    first = p;
1464                }
1465                if (last != null) {
1466                    last.setProcessor(p);
1467                }
1468                last = p;
1469            }
1470    
1471            if (last != null) {
1472                last.setProcessor(target);
1473            }
1474    */
1475            return first == null ? target : first;
1476        }
1477    
1478        /**
1479         * A strategy method to allow newly created processors to be wrapped in an
1480         * error handler.
1481         */
1482        protected Processor wrapInErrorHandler(Processor processor) throws Exception {
1483            return getErrorHandlerBuilder().createErrorHandler(processor);
1484        }
1485    
1486        protected ErrorHandlerBuilder createErrorHandlerBuilder() {
1487            if (isInheritErrorHandler()) {
1488                return new DeadLetterChannelBuilder();
1489            } else {
1490                return new NoErrorHandlerBuilder();
1491            }
1492        }
1493    
1494        protected void configureChild(ProcessorType output) {
1495            output.setNodeFactory(getNodeFactory());
1496        }
1497    
1498        public void addOutput(ProcessorType processorType) {
1499            processorType.setParent(this);
1500            configureChild(processorType);
1501            if (blocks.isEmpty()) {
1502                getOutputs().add(processorType);
1503            } else {
1504                Block block = blocks.getLast();
1505                block.addOutput(processorType);
1506            }
1507        }
1508    
1509        /**
1510         * Creates a new instance of some kind of composite processor which defaults
1511         * to using a {@link Pipeline} but derived classes could change the
1512         * behaviour
1513         */
1514        protected Processor createCompositeProcessor(List<Processor> list) {
1515            // return new MulticastProcessor(list);
1516            return new Pipeline(list);
1517        }
1518    
1519        protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs)
1520            throws Exception {
1521            List<Processor> list = new ArrayList<Processor>();
1522            for (ProcessorType output : outputs) {
1523                Processor processor = output.createProcessor(routeContext);
1524                list.add(processor);
1525            }
1526            Processor processor = null;
1527            if (!list.isEmpty()) {
1528                if (list.size() == 1) {
1529                    processor = list.get(0);
1530                } else {
1531                    processor = createCompositeProcessor(list);
1532                }
1533            }
1534            return processor;
1535        }
1536    
1537        public void clearOutput() {
1538            getOutputs().clear();
1539            blocks.clear();
1540        }
1541    }