001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.rmi.registry.Registry;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Collection;
023    import java.util.Collections;
024    import java.util.HashSet;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Set;
028    import java.util.concurrent.ThreadPoolExecutor;
029    
030    import javax.xml.bind.annotation.XmlAccessType;
031    import javax.xml.bind.annotation.XmlAccessorType;
032    import javax.xml.bind.annotation.XmlAttribute;
033    import javax.xml.bind.annotation.XmlTransient;
034    
035    import org.apache.camel.CamelContext;
036    import org.apache.camel.CamelException;
037    import org.apache.camel.Endpoint;
038    import org.apache.camel.Exchange;
039    import org.apache.camel.Expression;
040    import org.apache.camel.Predicate;
041    import org.apache.camel.Processor;
042    import org.apache.camel.Route;
043    import org.apache.camel.RuntimeCamelException;
044    import org.apache.camel.builder.DataFormatClause;
045    import org.apache.camel.builder.DeadLetterChannelBuilder;
046    import org.apache.camel.builder.ErrorHandlerBuilder;
047    import org.apache.camel.builder.ExpressionClause;
048    import org.apache.camel.builder.NoErrorHandlerBuilder;
049    import org.apache.camel.builder.ProcessorBuilder;
050    import org.apache.camel.impl.DefaultCamelContext;
051    import org.apache.camel.model.dataformat.DataFormatType;
052    import org.apache.camel.model.language.ExpressionType;
053    import org.apache.camel.model.language.LanguageExpression;
054    import org.apache.camel.processor.ConvertBodyProcessor;
055    import org.apache.camel.processor.DelegateProcessor;
056    import org.apache.camel.processor.MulticastProcessor;
057    import org.apache.camel.processor.Pipeline;
058    import org.apache.camel.processor.RecipientList;
059    import org.apache.camel.processor.aggregate.AggregationCollection;
060    import org.apache.camel.processor.aggregate.AggregationStrategy;
061    import org.apache.camel.processor.idempotent.IdempotentConsumer;
062    import org.apache.camel.processor.idempotent.MessageIdRepository;
063    import org.apache.camel.spi.DataFormat;
064    import org.apache.camel.spi.InterceptStrategy;
065    import org.apache.camel.spi.Policy;
066    import org.apache.camel.spi.RouteContext;
067    import org.apache.commons.logging.Log;
068    import org.apache.commons.logging.LogFactory;
069    
070    /**
071     * Base class for processor types that most XML types extend.
072     *
073     * @version $Revision: 42473 $
074     */
075    @XmlAccessorType(XmlAccessType.PROPERTY)
076    public abstract class ProcessorType<Type extends ProcessorType> extends OptionalIdentifiedType<Type> implements Block {
077        public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
078        private ErrorHandlerBuilder errorHandlerBuilder;
079        private Boolean inheritErrorHandlerFlag;
080        private NodeFactory nodeFactory;
081        private LinkedList<Block> blocks = new LinkedList<Block>();
082        private ProcessorType<? extends ProcessorType> parent;
083        private List<InterceptorType> interceptors = new ArrayList<InterceptorType>();
084    
085        // else to use an optional attribute in JAXB2
086        public abstract List<ProcessorType<?>> getOutputs();
087    
088    
089        public Processor createProcessor(RouteContext routeContext) throws Exception {
090            throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
091        }
092    
093        public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
094            Collection<ProcessorType<?>> outputs = getOutputs();
095            return createOutputsProcessor(routeContext, outputs);
096        }
097    
098        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
099            Processor processor = makeProcessor(routeContext);
100            routeContext.addEventDrivenProcessor(processor);
101        }
102    
103        /**
104         * Wraps the child processor in whatever necessary interceptors and error
105         * handlers
106         */
107        public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
108            processor = wrapProcessorInInterceptors(routeContext, processor);
109            return wrapInErrorHandler(processor);
110        }
111    
112        // Fluent API
113        // -------------------------------------------------------------------------
114    
115        /**
116         * Sends the exchange to the given endpoint URI
117         */
118        public Type to(String uri) {
119            addOutput(new ToType(uri));
120            return (Type) this;
121        }
122    
123        /**
124         * Sends the exchange to the given endpoint
125         */
126        public Type to(Endpoint endpoint) {
127            addOutput(new ToType(endpoint));
128            return (Type) this;
129        }
130    
131        /**
132         * Sends the exchange to a list of endpoints using the
133         * {@link MulticastProcessor} pattern
134         */
135        public Type to(String... uris) {
136            for (String uri : uris) {
137                addOutput(new ToType(uri));
138            }
139            return (Type) this;
140        }
141    
142        /**
143         * Sends the exchange to a list of endpoints using the
144         * {@link MulticastProcessor} pattern
145         */
146        public Type to(Endpoint... endpoints) {
147            for (Endpoint endpoint : endpoints) {
148                addOutput(new ToType(endpoint));
149            }
150            return (Type) this;
151        }
152    
153        /**
154         * Sends the exchange to a list of endpoint using the
155         * {@link MulticastProcessor} pattern
156         */
157        public Type to(Collection<Endpoint> endpoints) {
158            for (Endpoint endpoint : endpoints) {
159                addOutput(new ToType(endpoint));
160            }
161            return (Type) this;
162        }
163    
164        /**
165         * Multicasts messages to all its child outputs; so that each processor and
166         * destination gets a copy of the original message to avoid the processors
167         * interfering with each other.
168         */
169        public MulticastType multicast() {
170            MulticastType answer = new MulticastType();
171            addOutput(answer);
172            return answer;
173        }
174    
175        /**
176         * Multicasts messages to all its child outputs; so that each processor and
177         * destination gets a copy of the original message to avoid the processors
178         * interfering with each other.
179         * @param aggregationStrategy the strategy used to aggregate responses for
180         *          every part
181         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
182         * @return the multicast type
183         */
184        public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
185            MulticastType answer = new MulticastType();
186            addOutput(answer);
187            answer.setAggregationStrategy(aggregationStrategy);
188            answer.setParallelProcessing(parallelProcessing);
189            return answer;
190        }
191    
192        /**
193         * Multicasts messages to all its child outputs; so that each processor and
194         * destination gets a copy of the original message to avoid the processors
195         * interfering with each other.
196         * @param aggregationStrategy the strategy used to aggregate responses for
197         *          every part
198         * @return the multicast type
199         */
200        public MulticastType multicast(AggregationStrategy aggregationStrategy) {
201            MulticastType answer = new MulticastType();
202            addOutput(answer);
203            answer.setAggregationStrategy(aggregationStrategy);
204            return answer;
205        }
206    
207        /**
208         * Creates a {@link Pipeline} of the list of endpoints so that the message
209         * will get processed by each endpoint in turn and for request/response the
210         * output of one endpoint will be the input of the next endpoint
211         */
212        public Type pipeline(String... uris) {
213            // TODO pipeline v mulicast
214            return to(uris);
215        }
216    
217        /**
218         * Creates a {@link Pipeline} of the list of endpoints so that the message
219         * will get processed by each endpoint in turn and for request/response the
220         * output of one endpoint will be the input of the next endpoint
221         */
222        public Type pipeline(Endpoint... endpoints) {
223            // TODO pipeline v mulicast
224            return to(endpoints);
225        }
226    
227        /**
228         * Creates a {@link Pipeline} of the list of endpoints so that the message
229         * will get processed by each endpoint in turn and for request/response the
230         * output of one endpoint will be the input of the next endpoint
231         */
232        public Type pipeline(Collection<Endpoint> endpoints) {
233            // TODO pipeline v mulicast
234            return to(endpoints);
235        }
236    
237        /**
238         * Ends the current block
239         */
240        public ProcessorType<? extends ProcessorType> end() {
241            if (blocks.isEmpty()) {
242                if (parent == null) {
243                    throw new IllegalArgumentException("Root node with no active block");
244                }
245                return parent;
246            }
247            popBlock();
248            return this;
249        }
250    
251        /**
252         * Causes subsequent processors to be called asynchronously
253         *
254         * @param coreSize the number of threads that will be used to process
255         *                 messages in subsequent processors.
256         * @return a ThreadType builder that can be used to further configure the
257         *         the thread pool.
258         */
259        public ThreadType thread(int coreSize) {
260            ThreadType answer = new ThreadType(coreSize);
261            addOutput(answer);
262            return answer;
263        }
264    
265        /**
266         * Causes subsequent processors to be called asynchronously
267         *
268         * @param executor the executor that will be used to process
269         *                 messages in subsequent processors.
270         * @return a ThreadType builder that can be used to further configure the
271         *         the thread pool.
272         */
273        public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
274            ThreadType answer = new ThreadType(executor);
275            addOutput(answer);
276            return this;
277        }
278    
279        /**
280         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
281         */
282        public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
283                MessageIdRepository messageIdRepository) {
284            IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
285            addOutput(answer);
286            return answer;
287        }
288    
289        /**
290         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
291         *
292         * @return the builder used to create the expression
293         */
294        public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) {
295            IdempotentConsumerType answer = new IdempotentConsumerType();
296            answer.setMessageIdRepository(messageIdRepository);
297            addOutput(answer);
298            return ExpressionClause.createAndSetExpression(answer);
299        }
300    
301        /**
302         * Creates a predicate expression which only if it is true then the
303         * exchange is forwarded to the destination
304         *
305         * @return the clause used to create the filter expression
306         */
307        public ExpressionClause<FilterType> filter() {
308            FilterType filter = new FilterType();
309            addOutput(filter);
310            return ExpressionClause.createAndSetExpression(filter);
311        }
312    
313        /**
314         * Creates a predicate which is applied and only if it is true then the
315         * exchange is forwarded to the destination
316         *
317         * @return the builder for a predicate
318         */
319        public FilterType filter(Predicate predicate) {
320            FilterType filter = new FilterType(predicate);
321            addOutput(filter);
322            return filter;
323        }
324    
325        public FilterType filter(ExpressionType expression) {
326            FilterType filter = getNodeFactory().createFilter();
327            filter.setExpression(expression);
328            addOutput(filter);
329            return filter;
330        }
331    
332        public FilterType filter(String language, String expression) {
333            return filter(new LanguageExpression(language, expression));
334        }
335    
336        public LoadBalanceType loadBalance() {
337            LoadBalanceType answer = new LoadBalanceType();
338            addOutput(answer);
339            return answer;
340        }
341    
342    
343        /**
344         * Creates a choice of one or more predicates with an otherwise clause
345         *
346         * @return the builder for a choice expression
347         */
348        public ChoiceType choice() {
349            ChoiceType answer = new ChoiceType();
350            addOutput(answer);
351            return answer;
352        }
353    
354        /**
355         * Creates a try/catch block
356         *
357         * @return the builder for a tryBlock expression
358         */
359        public TryType tryBlock() {
360            TryType answer = new TryType();
361            addOutput(answer);
362            return answer;
363        }
364    
365        /**
366         * Creates a dynamic <a
367         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
368         * List</a> pattern.
369         *
370         * @param receipients is the builder of the expression used in the
371         *                    {@link RecipientList} to decide the destinations
372         */
373        public Type recipientList(Expression receipients) {
374            RecipientListType answer = new RecipientListType(receipients);
375            addOutput(answer);
376            return (Type) this;
377        }
378    
379        /**
380         * Creates a dynamic <a
381         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
382         * List</a> pattern.
383         *
384         * @return the expression clause for the expression used in the
385         *                    {@link RecipientList} to decide the destinations
386         */
387        public ExpressionClause<ProcessorType<Type>> recipientList() {
388            RecipientListType answer = new RecipientListType();
389            addOutput(answer);
390            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
391            answer.setExpression(clause);
392            return clause;
393        }
394    
395        /**
396         * Creates a <a
397         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
398         * Slip</a> pattern.
399         *
400         * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
401         * class will look in for the list of URIs to route the message to.
402         * @param uriDelimiter is the delimiter that will be used to split up
403         * the list of URIs in the routing slip.
404         */
405        public Type routingSlip(String header, String uriDelimiter) {
406            RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter);
407            addOutput(answer);
408            return (Type) this;
409        }
410    
411        /**
412         * Creates a <a
413         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
414         * Slip</a> pattern.
415         *
416         * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
417         * class will look in for the list of URIs to route the message to. The list of URIs
418         * will be split based on the default delimiter
419         * {@link RoutingSlipType#DEFAULT_DELIMITER}.
420         */
421        public Type routingSlip(String header) {
422            RoutingSlipType answer = new RoutingSlipType(header);
423            addOutput(answer);
424            return (Type) this;
425        }
426    
427        /**
428         * Creates a <a
429         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
430         * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}.
431         * The list of URIs in the header will be split based on the default delimiter
432         * {@link RoutingSlipType#DEFAULT_DELIMITER}.
433         */
434        public Type routingSlip() {
435            RoutingSlipType answer = new RoutingSlipType();
436            addOutput(answer);
437            return (Type) this;
438        }
439    
440        /**
441         * Creates the <a
442         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
443         * pattern where an expression is evaluated to iterate through each of the
444         * parts of a message and then each part is then send to some endpoint.
445         * This splitter responds with the latest message returned from destination
446         * endpoint.
447         *
448         * @param receipients the expression on which to split
449         * @return the builder
450         */
451        public SplitterType splitter(Expression receipients) {
452            SplitterType answer = new SplitterType(receipients);
453            addOutput(answer);
454            return answer;
455        }
456    
457        /**
458         * Creates the <a
459         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
460         * pattern where an expression is evaluated to iterate through each of the
461         * parts of a message and then each part is then send to some endpoint.
462         * This splitter responds with the latest message returned from destination
463         * endpoint.
464         *
465         * @return the expression clause for the expression on which to split
466         */
467        public ExpressionClause<SplitterType> splitter() {
468            SplitterType answer = new SplitterType();
469            addOutput(answer);
470            return ExpressionClause.createAndSetExpression(answer);
471        }
472    
473        /**
474         * Creates the <a
475         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
476         * pattern where an expression is evaluated to iterate through each of the
477         * parts of a message and then each part is then send to some endpoint.
478         * Answer from the splitter is produced using given {@link AggregationStrategy}
479         * @param partsExpression the expression on which to split
480         * @param aggregationStrategy the strategy used to aggregate responses for
481         *          every part
482         * @return the builder
483         */
484        public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) {
485            SplitterType answer = new SplitterType(partsExpression);
486            addOutput(answer);
487            answer.setAggregationStrategy(aggregationStrategy);
488            return answer;
489        }
490    
491        /**
492         * Creates the <a
493         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
494         * pattern where an expression is evaluated to iterate through each of the
495         * parts of a message and then each part is then send to some endpoint.
496         * Answer from the splitter is produced using given {@link AggregationStrategy}
497         * @param aggregationStrategy the strategy used to aggregate responses for
498         *          every part
499         * @return the expression clause for the expression on which to split
500         */
501        public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) {
502            SplitterType answer = new SplitterType();
503            addOutput(answer);
504            answer.setAggregationStrategy(aggregationStrategy);
505            return ExpressionClause.createAndSetExpression(answer);
506        }
507    
508        /**
509         * Creates the <a
510         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
511         * pattern where an expression is evaluated to iterate through each of the
512         * parts of a message and then each part is then send to some endpoint.
513         * This splitter responds with the latest message returned from destination
514         * endpoint.
515         *
516         * @param receipients the expression on which to split
517         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
518         * @return the builder
519         */
520        public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
521            SplitterType answer = new SplitterType(receipients);
522            addOutput(answer);
523            answer.setParallelProcessing(parallelProcessing);
524            return answer;
525        }
526    
527        /**
528         * Creates the <a
529         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
530         * pattern where an expression is evaluated to iterate through each of the
531         * parts of a message and then each part is then send to some endpoint.
532         * This splitter responds with the latest message returned from destination
533         * endpoint.
534         *
535         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
536         * @return the expression clause for the expression on which to split
537         */
538        public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) {
539            SplitterType answer = new SplitterType();
540            addOutput(answer);
541            answer.setParallelProcessing(parallelProcessing);
542            return ExpressionClause.createAndSetExpression(answer);
543        }
544    
545        /**
546         * Creates the <a
547         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
548         * pattern where an expression is evaluated to iterate through each of the
549         * parts of a message and then each part is then send to some endpoint.
550         * Answer from the splitter is produced using given {@link AggregationStrategy}
551         * @param partsExpression the expression on which to split
552         * @param aggregationStrategy the strategy used to aggregate responses for
553         *          every part
554         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
555         * @return the builder
556         */
557        public SplitterType splitter(Expression partsExpression,
558                AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
559            SplitterType answer = new SplitterType(partsExpression);
560            addOutput(answer);
561            answer.setAggregationStrategy(aggregationStrategy);
562            answer.setParallelProcessing(parallelProcessing);
563            return answer;
564        }
565    
566        /**
567         * Creates the <a
568         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
569         * pattern where an expression is evaluated to iterate through each of the
570         * parts of a message and then each part is then send to some endpoint.
571         * Answer from the splitter is produced using given {@link AggregationStrategy}
572         * @param aggregationStrategy the strategy used to aggregate responses for
573         *          every part
574         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
575         * @return the expression clause for the expression on which to split
576         */
577        public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
578            SplitterType answer = new SplitterType();
579            addOutput(answer);
580            answer.setAggregationStrategy(aggregationStrategy);
581            answer.setParallelProcessing(parallelProcessing);
582            return ExpressionClause.createAndSetExpression(answer);
583        }
584    
585    
586        /**
587         * Creates the <a
588         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
589         * pattern where a list of expressions are evaluated to be able to compare
590         * the message exchanges to reorder them. e.g. you may wish to sort by some
591         * headers
592         *
593         * @return the expression clause for the expressions on which to compare messages in order
594         */
595        public ExpressionClause<ResequencerType> resequencer() {
596            ResequencerType answer = new ResequencerType();
597            addOutput(answer);
598            ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer);
599            answer.expression(clause);
600            return clause;
601        }
602    
603        /**
604         * Creates the <a
605         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
606         * pattern where an expression is evaluated to be able to compare the
607         * message exchanges to reorder them. e.g. you may wish to sort by some
608         * header
609         *
610         * @param expression the expression on which to compare messages in order
611         * @return the builder
612         */
613        public ResequencerType resequencer(Expression<Exchange> expression) {
614            return resequencer(Collections.<Expression>singletonList(expression));
615        }
616    
617        /**
618         * Creates the <a
619         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
620         * pattern where a list of expressions are evaluated to be able to compare
621         * the message exchanges to reorder them. e.g. you may wish to sort by some
622         * headers
623         *
624         * @param expressions the expressions on which to compare messages in order
625         * @return the builder
626         */
627        public ResequencerType resequencer(List<Expression> expressions) {
628            ResequencerType answer = new ResequencerType(expressions);
629            addOutput(answer);
630            return answer;
631        }
632    
633        /**
634         * Creates the <a
635         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
636         * pattern where a list of expressions are evaluated to be able to compare
637         * the message exchanges to reorder them. e.g. you may wish to sort by some
638         * headers
639         *
640         * @param expressions the expressions on which to compare messages in order
641         * @return the builder
642         */
643        public ResequencerType resequencer(Expression... expressions) {
644            List<Expression> list = new ArrayList<Expression>();
645            list.addAll(Arrays.asList(expressions));
646            return resequencer(list);
647        }
648    
649        /**
650         * Creates an <a
651         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
652         * pattern where a batch of messages are processed (up to a maximum amount
653         * or until some timeout is reached) and messages for the same correlation
654         * key are combined together using some kind of {@link AggregationStrategy}
655         * (by default the latest message is used) to compress many message exchanges
656         * into a smaller number of exchanges.
657         * <p/>
658         * A good example of this is stock market data; you may be receiving 30,000
659         * messages/second and you may want to throttle it right down so that multiple
660         * messages for the same stock are combined (or just the latest message is used
661         * and older prices are discarded). Another idea is to combine line item messages
662         * together into a single invoice message.
663         */
664        public ExpressionClause<AggregatorType> aggregator() {
665            AggregatorType answer = new AggregatorType();
666            addOutput(answer);
667            return ExpressionClause.createAndSetExpression(answer);
668        }
669    
670        /**
671         * Creates an <a
672         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
673         * pattern where a batch of messages are processed (up to a maximum amount
674         * or until some timeout is reached) and messages for the same correlation
675         * key are combined together using some kind of {@link AggregationStrategy}
676         * (by default the latest message is used) to compress many message exchanges
677         * into a smaller number of exchanges.
678         * <p/>
679         * A good example of this is stock market data; you may be receiving 30,000
680         * messages/second and you may want to throttle it right down so that multiple
681         * messages for the same stock are combined (or just the latest message is used
682         * and older prices are discarded). Another idea is to combine line item messages
683         * together into a single invoice message.
684         *
685         * @param aggregationStrategy the strategy used for the aggregation
686         */
687        public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) {
688            AggregatorType answer = new AggregatorType();
689            answer.setAggregationStrategy(aggregationStrategy);
690            addOutput(answer);
691            return ExpressionClause.createAndSetExpression(answer);
692        }
693    
694        /**
695         * Creates an <a
696         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
697         * pattern using a custom aggregation collection implementation.
698         *
699         * @param aggregationCollection the collection used to perform the aggregation
700         */
701        public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) {
702            AggregatorType answer = new AggregatorType();
703            answer.setAggregationCollection(aggregationCollection);
704            addOutput(answer);
705            return ExpressionClause.createAndSetExpression(answer);
706        }
707    
708        /**
709         * Creates an <a
710         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
711         * pattern where a batch of messages are processed (up to a maximum amount
712         * or until some timeout is reached) and messages for the same correlation
713         * key are combined together using some kind of {@link AggregationStrategy}
714         * (by default the latest message is used) to compress many message exchanges
715         * into a smaller number of exchanges.
716         * <p/>
717         * A good example of this is stock market data; you may be receiving 30,000
718         * messages/second and you may want to throttle it right down so that multiple
719         * messages for the same stock are combined (or just the latest message is used
720         * and older prices are discarded). Another idea is to combine line item messages
721         * together into a single invoice message.
722         *
723         * @param correlationExpression the expression used to calculate the
724         *                              correlation key. For a JMS message this could be the
725         *                              expression <code>header("JMSDestination")</code> or
726         *                              <code>header("JMSCorrelationID")</code>
727         */
728        public AggregatorType aggregator(Expression correlationExpression) {
729            AggregatorType answer = new AggregatorType(correlationExpression);
730            addOutput(answer);
731            return answer;
732        }
733    
734        /**
735         * Creates an <a
736         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
737         * pattern where a batch of messages are processed (up to a maximum amount
738         * or until some timeout is reached) and messages for the same correlation
739         * key are combined together using some kind of {@link AggregationStrategy}
740         * (by default the latest message is used) to compress many message exchanges
741         * into a smaller number of exchanges.
742         * <p/>
743         * A good example of this is stock market data; you may be receiving 30,000
744         * messages/second and you may want to throttle it right down so that multiple
745         * messages for the same stock are combined (or just the latest message is used
746         * and older prices are discarded). Another idea is to combine line item messages
747         * together into a single invoice message.
748         *
749         * @param correlationExpression the expression used to calculate the
750         *                              correlation key. For a JMS message this could be the
751         *                              expression <code>header("JMSDestination")</code> or
752         *                              <code>header("JMSCorrelationID")</code>
753         */
754        public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
755            AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
756            addOutput(answer);
757            return answer;
758        }
759    
760        /**
761         * Creates the <a
762         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
763         * where an expression is used to calculate the time which the message will
764         * be dispatched on
765         *
766         * @param processAtExpression an expression to calculate the time at which
767         *                            the messages should be processed
768         * @return the builder
769         */
770        public DelayerType delayer(Expression<Exchange> processAtExpression) {
771            return delayer(processAtExpression, 0L);
772        }
773    
774        /**
775         * Creates the <a
776         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
777         * where an expression is used to calculate the time which the message will
778         * be dispatched on
779         *
780         * @param processAtExpression an expression to calculate the time at which
781         *                            the messages should be processed
782         * @param delay               the delay in milliseconds which is added to the
783         *                            processAtExpression to determine the time the message
784         *                            should be processed
785         * @return the builder
786         */
787        public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
788            DelayerType answer = new DelayerType(processAtExpression, delay);
789            addOutput(answer);
790            return answer;
791        }
792    
793        /**
794         * Creates the <a
795         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
796         * where an expression is used to calculate the time which the message will
797         * be dispatched on
798         * @return the expression clause to create the expression
799         */
800        public ExpressionClause<DelayerType> delayer() {
801            DelayerType answer = new DelayerType();
802            addOutput(answer);
803            return ExpressionClause.createAndSetExpression(answer);
804        }
805    
806        /**
807         * Creates the <a
808         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
809         * where a fixed amount of milliseconds are used to delay processing of a
810         * message exchange
811         *
812         * @param delay the default delay in milliseconds
813         * @return the builder
814         */
815        public DelayerType delayer(long delay) {
816            return delayer(null, delay);
817        }
818    
819        /**
820         * Creates the <a
821         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
822         * where an expression is used to calculate the time which the message will
823         * be dispatched on
824         *
825         * @return the builder
826         */
827        public ThrottlerType throttler(long maximumRequestCount) {
828            ThrottlerType answer = new ThrottlerType(maximumRequestCount);
829            addOutput(answer);
830            return answer;
831        }
832    
833    
834        public Type throwFault(Throwable fault) {
835            ThrowFaultType answer = new ThrowFaultType();
836            answer.setFault(fault);
837            addOutput(answer);
838            return (Type) this;
839        }
840    
841        public Type throwFault(String message) {
842            return throwFault(new CamelException(message));
843        }
844    
845        /**
846         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
847         */
848        public Type interceptor(String ref) {
849            InterceptorRef interceptor = new InterceptorRef(ref);
850            intercept(interceptor);
851            return (Type) this;
852        }
853    
854        /**
855         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
856         */
857        public Type intercept(DelegateProcessor interceptor) {
858            intercept(new InterceptorRef(interceptor));
859            //lastInterceptor = interceptor;
860            return (Type) this;
861        }
862    
863        /**
864         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
865         */
866        public InterceptType intercept() {
867            InterceptType answer = new InterceptType();
868            addOutput(answer);
869            return answer;
870        }
871    
872        /**
873         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
874         */
875        public void intercept(InterceptorType interceptor) {
876            addOutput(interceptor);
877            pushBlock(interceptor);
878        }
879    
880        /**
881         * Adds an interceptor around the whole of this nodes processing
882         *
883         * @param interceptor
884         */
885        public void addInterceptor(InterceptorType interceptor) {
886            interceptors.add(interceptor);
887        }
888    
889        /**
890         * Adds an interceptor around the whole of this nodes processing
891         *
892         * @param interceptor
893         */
894        public void addInterceptor(DelegateProcessor interceptor) {
895            addInterceptor(new InterceptorRef(interceptor));
896        }
897    
898        protected void pushBlock(Block block) {
899            blocks.add(block);
900        }
901    
902        protected Block popBlock() {
903            return blocks.isEmpty() ? null : blocks.removeLast();
904        }
905    
906        public Type proceed() {
907            ProceedType proceed = null;
908            ProcessorType currentProcessor = this;
909    
910            if (currentProcessor instanceof InterceptType) {
911                proceed = ((InterceptType) currentProcessor).getProceed();
912            }
913            if (proceed == null) {
914                for (ProcessorType node = parent; node != null; node = node.getParent()) {
915                    if (node instanceof InterceptType) {
916                        InterceptType intercept = (InterceptType)node;
917                        proceed = intercept.getProceed();
918                        break;
919                    }
920                }
921            }
922    
923            if (currentProcessor instanceof InterceptType) {
924                proceed = ((InterceptType) currentProcessor).getProceed();
925            }
926    
927            if (proceed == null) {
928                throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block");
929            }
930    
931            addOutput(proceed);
932            return (Type) this;
933        }
934    
935        public ExceptionType exception(Class exceptionType) {
936            ExceptionType answer = new ExceptionType(exceptionType);
937            addOutput(answer);
938            return answer;
939        }
940    
941        /**
942         * Apply an interceptor route if the predicate is true
943         */
944        public ChoiceType intercept(Predicate predicate) {
945            InterceptType answer = new InterceptType();
946            addOutput(answer);
947            return answer.when(predicate);
948        }
949    
950        public Type interceptors(String... refs) {
951            for (String ref : refs) {
952                interceptor(ref);
953            }
954            return (Type) this;
955        }
956    
957        /**
958         * Trace logs the exchange before it goes to the next processing step using
959         * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
960         */
961        public Type trace() {
962            return trace(DEFAULT_TRACE_CATEGORY);
963        }
964    
965        /**
966         * Trace logs the exchange before it goes to the next processing step using
967         * the specified logging category.
968         *
969         * @param category the logging category trace messages will sent to.
970         */
971        public Type trace(String category) {
972            final Log log = LogFactory.getLog(category);
973            return intercept(new DelegateProcessor() {
974                @Override
975                public void process(Exchange exchange) throws Exception {
976                    log.trace(exchange);
977                    processNext(exchange);
978                }
979            });
980        }
981    
982        public PolicyRef policies() {
983            PolicyRef answer = new PolicyRef();
984            addOutput(answer);
985            return answer;
986        }
987    
988        public PolicyRef policy(Policy policy) {
989            PolicyRef answer = new PolicyRef(policy);
990            addOutput(answer);
991            return answer;
992        }
993    
994        /**
995         * Forces handling of faults as exceptions
996         *
997         * @return the current builder with the fault handler configured
998         */
999        public Type handleFault() {
1000            intercept(new HandleFaultType());
1001            return (Type) this;
1002        }
1003    
1004        /**
1005         * Installs the given error handler builder
1006         *
1007         * @param errorHandlerBuilder the error handler to be used by default for
1008         *                            all child routes
1009         * @return the current builder with the error handler configured
1010         */
1011        public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
1012            setErrorHandlerBuilder(errorHandlerBuilder);
1013            return (Type) this;
1014        }
1015    
1016        /**
1017         * Configures whether or not the error handler is inherited by every
1018         * processing node (or just the top most one)
1019         *
1020         * @param condition the flag as to whether error handlers should be
1021         *                  inherited or not
1022         * @return the current builder
1023         */
1024        public Type inheritErrorHandler(boolean condition) {
1025            setInheritErrorHandlerFlag(condition);
1026            return (Type) this;
1027        }
1028    
1029        // Transformers
1030        // -------------------------------------------------------------------------
1031    
1032        /**
1033         * Adds the custom processor to this destination which could be a final
1034         * destination, or could be a transformation in a pipeline
1035         */
1036        public Type process(Processor processor) {
1037            ProcessorRef answer = new ProcessorRef(processor);
1038            addOutput(answer);
1039            return (Type) this;
1040        }
1041    
1042        /**
1043         * Adds the custom processor reference to this destination which could be a final
1044         * destination, or could be a transformation in a pipeline
1045         */
1046        public Type processRef(String ref) {
1047            ProcessorRef answer = new ProcessorRef();
1048            answer.setRef(ref);
1049            addOutput(answer);
1050            return (Type) this;
1051        }
1052    
1053        /**
1054         * Adds a bean which is invoked which could be a final destination, or could
1055         * be a transformation in a pipeline
1056         */
1057        public Type bean(Object bean) {
1058            BeanRef answer = new BeanRef();
1059            answer.setBean(bean);
1060            addOutput(answer);
1061            return (Type) this;
1062        }
1063    
1064        /**
1065         * Adds a bean and method which is invoked which could be a final
1066         * destination, or could be a transformation in a pipeline
1067         */
1068        public Type bean(Object bean, String method) {
1069            BeanRef answer = new BeanRef();
1070            answer.setBean(bean);
1071            answer.setMethod(method);
1072            addOutput(answer);
1073            return (Type) this;
1074        }
1075    
1076        /**
1077         * Adds a bean by type which is invoked which could be a final destination, or could
1078         * be a transformation in a pipeline
1079         */
1080        public Type bean(Class beanType) {
1081            BeanRef answer = new BeanRef();
1082            answer.setBeanType(beanType);
1083            addOutput(answer);
1084            return (Type) this;
1085        }
1086    
1087        /**
1088         * Adds a bean type and method which is invoked which could be a final
1089         * destination, or could be a transformation in a pipeline
1090         */
1091        public Type bean(Class beanType, String method) {
1092            BeanRef answer = new BeanRef();
1093            answer.setBeanType(beanType);
1094            answer.setMethod(method);
1095            addOutput(answer);
1096            return (Type) this;
1097        }
1098    
1099        /**
1100         * Adds a bean which is invoked which could be a final destination, or could
1101         * be a transformation in a pipeline
1102         */
1103        public Type beanRef(String ref) {
1104            BeanRef answer = new BeanRef(ref);
1105            addOutput(answer);
1106            return (Type) this;
1107        }
1108    
1109        /**
1110         * Adds a bean and method which is invoked which could be a final
1111         * destination, or could be a transformation in a pipeline
1112         */
1113        public Type beanRef(String ref, String method) {
1114            BeanRef answer = new BeanRef(ref, method);
1115            addOutput(answer);
1116            return (Type) this;
1117        }
1118    
1119        /**
1120         * Adds a processor which sets the body on the IN message
1121         */
1122        public ExpressionClause<ProcessorType<Type>> setBody() {
1123            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1124            SetBodyType answer = new SetBodyType(clause);
1125            addOutput(answer);
1126            return clause;
1127        }
1128    
1129        /**
1130         * Adds a processor which sets the body on the IN message
1131         */
1132        public Type setBody(Expression expression) {
1133            SetBodyType answer = new SetBodyType(expression);
1134            addOutput(answer);
1135            return (Type) this;
1136        }
1137    
1138        /**
1139         * Adds a processor which sets the body on the OUT message
1140         *
1141         * @deprecated Please use {@link #transform(Expression)} instead. Will be removed in Camel 2.0.
1142         */
1143        @Deprecated
1144        public Type setOutBody(Expression expression) {
1145            return transform(expression);
1146        }
1147    
1148        /**
1149         * Adds a processor which sets the body on the OUT message
1150         *
1151         * @deprecated Please use {@link #transform()} instead. Will be removed in Camel 2.0.
1152         */
1153        @Deprecated
1154        public ExpressionClause<ProcessorType<Type>> setOutBody() {
1155            return transform();
1156        }
1157    
1158        /**
1159         * Adds a processor which sets the body on the OUT message
1160         */
1161        public Type transform(Expression expression) {
1162            TransformType answer = new TransformType(expression);
1163            addOutput(answer);
1164            return (Type) this;
1165        }
1166    
1167        /**
1168         * Adds a processor which sets the body on the OUT message
1169         */
1170        public ExpressionClause<ProcessorType<Type>> transform() {
1171            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1172            TransformType answer = new TransformType(clause);
1173            addOutput(answer);
1174            return clause;
1175        }
1176    
1177        /**
1178         * Adds a processor which sets the body on the FAULT message
1179         */
1180        public Type setFaultBody(Expression expression) {
1181            return process(ProcessorBuilder.setFaultBody(expression));
1182        }
1183    
1184        /**
1185         * Adds a processor which sets the header on the IN message
1186         */
1187        public ExpressionClause<ProcessorType<Type>> setHeader(String name) {
1188            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1189            SetHeaderType answer = new SetHeaderType(name, clause);
1190            addOutput(answer);
1191            return clause;
1192        }
1193    
1194        /**
1195         * Adds a processor which sets the header on the IN message
1196         */
1197        public Type setHeader(String name, Expression expression) {
1198            SetHeaderType answer = new SetHeaderType(name, expression);
1199            addOutput(answer);
1200            return (Type) this;
1201        }
1202    
1203        /**
1204         * Adds a processor which sets the header on the IN message to the given value
1205         */
1206        public Type setHeader(String name, String value) {
1207            SetHeaderType answer = new SetHeaderType(name, value);
1208            addOutput(answer);
1209            return (Type) this;
1210        }
1211    
1212        /**
1213         * Adds a processor which sets the header on the OUT message
1214         */
1215        public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) {
1216            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1217            process(ProcessorBuilder.setOutHeader(name, clause));
1218            return clause;
1219        }
1220    
1221        /**
1222         * Adds a processor which sets the header on the OUT message
1223         */
1224        public Type setOutHeader(String name, Expression expression) {
1225            return process(ProcessorBuilder.setOutHeader(name, expression));
1226        }
1227    
1228        /**
1229         * Adds a processor which sets the header on the OUT message
1230         */
1231        public Type setOutHeader(String name, String value) {
1232            return (Type) setOutHeader(name).constant(value);
1233        }
1234    
1235        /**
1236         * Adds a processor which sets the header on the FAULT message
1237         */
1238        public Type setFaultHeader(String name, Expression expression) {
1239            return process(ProcessorBuilder.setFaultHeader(name, expression));
1240        }
1241    
1242        /**
1243         * Adds a processor which sets the exchange property
1244         */
1245        public Type setProperty(String name, Expression expression) {
1246            return process(ProcessorBuilder.setProperty(name, expression));
1247        }
1248    
1249    
1250        /**
1251         * Adds a processor which sets the exchange property
1252         */
1253        public ExpressionClause<ProcessorType<Type>> setProperty(String name) {
1254            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1255            process(ProcessorBuilder.setProperty(name, clause));
1256            return clause;
1257        }
1258    
1259        /**
1260         * Adds a processor which removes the header on the IN message
1261         */
1262        public Type removeHeader(String name) {
1263            return process(ProcessorBuilder.removeHeader(name));
1264        }
1265    
1266        /**
1267         * Adds a processor which removes the header on the OUT message
1268         */
1269        public Type removeOutHeader(String name) {
1270            return process(ProcessorBuilder.removeOutHeader(name));
1271        }
1272    
1273        /**
1274         * Adds a processor which removes the header on the FAULT message
1275         */
1276        public Type removeFaultHeader(String name) {
1277            return process(ProcessorBuilder.removeFaultHeader(name));
1278        }
1279    
1280        /**
1281         * Adds a processor which removes the exchange property
1282         */
1283        public Type removeProperty(String name) {
1284            return process(ProcessorBuilder.removeProperty(name));
1285        }
1286    
1287        /**
1288         * Converts the IN message body to the specified type
1289         */
1290        public Type convertBodyTo(Class type) {
1291            addOutput(new ConvertBodyType(type));
1292            return (Type) this;
1293        }
1294    
1295        /**
1296         * Converts the OUT message body to the specified type
1297         *
1298         * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1299         */
1300        @Deprecated
1301        public Type convertOutBodyTo(Class type) {
1302            return process(new ConvertBodyProcessor(type));
1303        }
1304    
1305        /**
1306         * Converts the FAULT message body to the specified type
1307         *
1308         * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1309         */
1310        @Deprecated
1311        public Type convertFaultBodyTo(Class type) {
1312            return process(new ConvertBodyProcessor(type));
1313        }
1314    
1315        // DataFormat support
1316        // -------------------------------------------------------------------------
1317    
1318        /**
1319         * Unmarshals the in body using a {@link DataFormat} expression to define
1320         * the format of the input message and the output will be set on the out message body.
1321         *
1322         * @return the expression to create the {@link DataFormat}
1323         */
1324        public DataFormatClause<ProcessorType<Type>> unmarshal() {
1325            return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal);
1326        }
1327    
1328        /**
1329         * Unmarshals the in body using the specified {@link DataFormat}
1330         * and sets the output on the out message body.
1331         *
1332         * @return this object
1333         */
1334        public Type unmarshal(DataFormatType dataFormatType) {
1335            addOutput(new UnmarshalType(dataFormatType));
1336            return (Type) this;
1337        }
1338    
1339        /**
1340         * Unmarshals the in body using the specified {@link DataFormat}
1341         * and sets the output on the out message body.
1342         *
1343         * @return this object
1344         */
1345        public Type unmarshal(DataFormat dataFormat) {
1346            return unmarshal(new DataFormatType(dataFormat));
1347        }
1348    
1349        /**
1350         * Unmarshals the in body using the specified {@link DataFormat}
1351         * reference in the {@link Registry} and sets the output on the out message body.
1352         *
1353         * @return this object
1354         */
1355        public Type unmarshal(String dataTypeRef) {
1356            addOutput(new UnmarshalType(dataTypeRef));
1357            return (Type) this;
1358        }
1359    
1360        /**
1361         * Marshals the in body using a {@link DataFormat} expression to define
1362         * the format of the output which will be added to the out body.
1363         *
1364         * @return the expression to create the {@link DataFormat}
1365         */
1366        public DataFormatClause<ProcessorType<Type>> marshal() {
1367            return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal);
1368        }
1369    
1370        /**
1371         * Marshals the in body using the specified {@link DataFormat}
1372         * and sets the output on the out message body.
1373         *
1374         * @return this object
1375         */
1376        public Type marshal(DataFormatType dataFormatType) {
1377            addOutput(new MarshalType(dataFormatType));
1378            return (Type) this;
1379        }
1380    
1381        /**
1382         * Marshals the in body using the specified {@link DataFormat}
1383         * and sets the output on the out message body.
1384         *
1385         * @return this object
1386         */
1387        public Type marshal(DataFormat dataFormat) {
1388            return marshal(new DataFormatType(dataFormat));
1389        }
1390    
1391        /**
1392         * Marshals the in body the specified {@link DataFormat}
1393         * reference in the {@link Registry} and sets the output on the out message body.
1394         *
1395         * @return this object
1396         */
1397        public Type marshal(String dataTypeRef) {
1398            addOutput(new MarshalType(dataTypeRef));
1399            return (Type) this;
1400        }
1401    
1402        // Properties
1403        // -------------------------------------------------------------------------
1404        @XmlTransient
1405        public ProcessorType<? extends ProcessorType> getParent() {
1406            return parent;
1407        }
1408    
1409        public void setParent(ProcessorType<? extends ProcessorType> parent) {
1410            this.parent = parent;
1411        }
1412    
1413        @XmlTransient
1414        public ErrorHandlerBuilder getErrorHandlerBuilder() {
1415            if (errorHandlerBuilder == null) {
1416                errorHandlerBuilder = createErrorHandlerBuilder();
1417            }
1418            return errorHandlerBuilder;
1419        }
1420    
1421        /**
1422         * Sets the error handler to use with processors created by this builder
1423         */
1424        public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1425            this.errorHandlerBuilder = errorHandlerBuilder;
1426        }
1427    
1428        @XmlTransient
1429        public boolean isInheritErrorHandler() {
1430            return isInheritErrorHandler(getInheritErrorHandlerFlag());
1431        }
1432    
1433        /**
1434         * Lets default the inherit value to be true if there is none specified
1435         */
1436        public static boolean isInheritErrorHandler(Boolean value) {
1437            return value == null || value.booleanValue();
1438        }
1439    
1440        @XmlAttribute(name = "inheritErrorHandler", required = false)
1441        public Boolean getInheritErrorHandlerFlag() {
1442            return inheritErrorHandlerFlag;
1443        }
1444    
1445        public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
1446            this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
1447        }
1448    
1449        @XmlTransient
1450        public NodeFactory getNodeFactory() {
1451            if (nodeFactory == null) {
1452                nodeFactory = new NodeFactory();
1453            }
1454            return nodeFactory;
1455        }
1456    
1457        public void setNodeFactory(NodeFactory nodeFactory) {
1458            this.nodeFactory = nodeFactory;
1459        }
1460    
1461        /**
1462         * Returns a label to describe this node such as the expression if some kind of expression node
1463         */
1464        public String getLabel() {
1465            return "";
1466        }
1467    
1468        // Implementation methods
1469        // -------------------------------------------------------------------------
1470    
1471        /**
1472         * Creates the processor and wraps it in any necessary interceptors and
1473         * error handlers
1474         */
1475        protected Processor makeProcessor(RouteContext routeContext) throws Exception {
1476            Processor processor = createProcessor(routeContext);
1477            return wrapProcessor(routeContext, processor);
1478        }
1479    
1480        /**
1481         * A strategy method which allows derived classes to wrap the child
1482         * processor in some kind of interceptor
1483         *
1484         * @param routeContext
1485         * @param target       the processor which can be wrapped
1486         * @return the original processor or a new wrapped interceptor
1487         */
1488        protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
1489            // The target is required.
1490            if (target == null) {
1491                throw new RuntimeCamelException("target provided.");
1492            }
1493    
1494    
1495            List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>();
1496            CamelContext camelContext = routeContext.getCamelContext();
1497            if (camelContext instanceof DefaultCamelContext) {
1498                DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
1499                strategies.addAll(defaultCamelContext.getInterceptStrategies());
1500            }
1501            strategies.addAll(routeContext.getInterceptStrategies());
1502            for (InterceptStrategy strategy : strategies) {
1503                if (strategy != null) {
1504                    target = strategy.wrapProcessorInInterceptors(this, target);
1505                }
1506            }
1507    
1508            List<InterceptorType> list = routeContext.getRoute().getInterceptors();
1509            if (interceptors != null) {
1510                list.addAll(interceptors);
1511            }
1512            // lets reverse the list so we apply the inner interceptors first
1513            Collections.reverse(list);
1514            Set<Processor> interceptors = new HashSet<Processor>();
1515            interceptors.add(target);
1516            for (InterceptorType interceptorType : list) {
1517                DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext);
1518                if (!interceptors.contains(interceptor)) {
1519                    interceptors.add(interceptor);
1520                    interceptor.setProcessor(target);
1521                    target = interceptor;
1522                }
1523            }
1524            return target;
1525        }
1526    
1527        /**
1528         * A strategy method to allow newly created processors to be wrapped in an
1529         * error handler.
1530         */
1531        protected Processor wrapInErrorHandler(Processor processor) throws Exception {
1532            return getErrorHandlerBuilder().createErrorHandler(processor);
1533        }
1534    
1535        protected ErrorHandlerBuilder createErrorHandlerBuilder() {
1536            if (isInheritErrorHandler()) {
1537                return new DeadLetterChannelBuilder();
1538            } else {
1539                return new NoErrorHandlerBuilder();
1540            }
1541        }
1542    
1543        protected void configureChild(ProcessorType output) {
1544            output.setNodeFactory(getNodeFactory());
1545        }
1546    
1547        public void addOutput(ProcessorType processorType) {
1548            processorType.setParent(this);
1549            configureChild(processorType);
1550            if (blocks.isEmpty()) {
1551                getOutputs().add(processorType);
1552            } else {
1553                Block block = blocks.getLast();
1554                block.addOutput(processorType);
1555            }
1556        }
1557    
1558        /**
1559         * Creates a new instance of some kind of composite processor which defaults
1560         * to using a {@link Pipeline} but derived classes could change the
1561         * behaviour
1562         */
1563        protected Processor createCompositeProcessor(List<Processor> list) {
1564            // return new MulticastProcessor(list);
1565            return new Pipeline(list);
1566        }
1567    
1568        protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs)
1569            throws Exception {
1570            List<Processor> list = new ArrayList<Processor>();
1571            for (ProcessorType output : outputs) {
1572                Processor processor = output.createProcessor(routeContext);
1573                processor = output.wrapProcessorInInterceptors(routeContext, processor);
1574                list.add(processor);
1575            }
1576            Processor processor = null;
1577            if (!list.isEmpty()) {
1578                if (list.size() == 1) {
1579                    processor = list.get(0);
1580                } else {
1581                    processor = createCompositeProcessor(list);
1582                }
1583            }
1584            return processor;
1585        }
1586    
1587        public void clearOutput() {
1588            getOutputs().clear();
1589            blocks.clear();
1590        }
1591    }