001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.rmi.registry.Registry; 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.Collection; 023 import java.util.Collections; 024 import java.util.HashSet; 025 import java.util.LinkedList; 026 import java.util.List; 027 import java.util.Set; 028 import java.util.concurrent.ThreadPoolExecutor; 029 030 import javax.xml.bind.annotation.XmlAccessType; 031 import javax.xml.bind.annotation.XmlAccessorType; 032 import javax.xml.bind.annotation.XmlAttribute; 033 import javax.xml.bind.annotation.XmlTransient; 034 035 import org.apache.camel.CamelContext; 036 import org.apache.camel.CamelException; 037 import org.apache.camel.Endpoint; 038 import org.apache.camel.Exchange; 039 import org.apache.camel.Expression; 040 import org.apache.camel.Predicate; 041 import org.apache.camel.Processor; 042 import org.apache.camel.Route; 043 import org.apache.camel.RuntimeCamelException; 044 import org.apache.camel.builder.DataFormatClause; 045 import org.apache.camel.builder.DeadLetterChannelBuilder; 046 import org.apache.camel.builder.ErrorHandlerBuilder; 047 import org.apache.camel.builder.ExpressionClause; 048 import org.apache.camel.builder.NoErrorHandlerBuilder; 049 import org.apache.camel.builder.ProcessorBuilder; 050 import org.apache.camel.impl.DefaultCamelContext; 051 import org.apache.camel.model.dataformat.DataFormatType; 052 import org.apache.camel.model.language.ExpressionType; 053 import org.apache.camel.model.language.LanguageExpression; 054 import org.apache.camel.processor.ConvertBodyProcessor; 055 import org.apache.camel.processor.DelegateProcessor; 056 import org.apache.camel.processor.MulticastProcessor; 057 import org.apache.camel.processor.Pipeline; 058 import org.apache.camel.processor.RecipientList; 059 import org.apache.camel.processor.aggregate.AggregationCollection; 060 import org.apache.camel.processor.aggregate.AggregationStrategy; 061 import org.apache.camel.processor.idempotent.IdempotentConsumer; 062 import org.apache.camel.processor.idempotent.MessageIdRepository; 063 import org.apache.camel.spi.DataFormat; 064 import org.apache.camel.spi.InterceptStrategy; 065 import org.apache.camel.spi.Policy; 066 import org.apache.camel.spi.RouteContext; 067 import org.apache.commons.logging.Log; 068 import org.apache.commons.logging.LogFactory; 069 070 /** 071 * Base class for processor types that most XML types extend. 072 * 073 * @version $Revision: 42473 $ 074 */ 075 @XmlAccessorType(XmlAccessType.PROPERTY) 076 public abstract class ProcessorType<Type extends ProcessorType> extends OptionalIdentifiedType<Type> implements Block { 077 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE"; 078 private ErrorHandlerBuilder errorHandlerBuilder; 079 private Boolean inheritErrorHandlerFlag; 080 private NodeFactory nodeFactory; 081 private LinkedList<Block> blocks = new LinkedList<Block>(); 082 private ProcessorType<? extends ProcessorType> parent; 083 private List<InterceptorType> interceptors = new ArrayList<InterceptorType>(); 084 085 // else to use an optional attribute in JAXB2 086 public abstract List<ProcessorType<?>> getOutputs(); 087 088 089 public Processor createProcessor(RouteContext routeContext) throws Exception { 090 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName()); 091 } 092 093 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception { 094 Collection<ProcessorType<?>> outputs = getOutputs(); 095 return createOutputsProcessor(routeContext, outputs); 096 } 097 098 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 099 Processor processor = makeProcessor(routeContext); 100 routeContext.addEventDrivenProcessor(processor); 101 } 102 103 /** 104 * Wraps the child processor in whatever necessary interceptors and error 105 * handlers 106 */ 107 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { 108 processor = wrapProcessorInInterceptors(routeContext, processor); 109 return wrapInErrorHandler(processor); 110 } 111 112 // Fluent API 113 // ------------------------------------------------------------------------- 114 115 /** 116 * Sends the exchange to the given endpoint URI 117 */ 118 public Type to(String uri) { 119 addOutput(new ToType(uri)); 120 return (Type) this; 121 } 122 123 /** 124 * Sends the exchange to the given endpoint 125 */ 126 public Type to(Endpoint endpoint) { 127 addOutput(new ToType(endpoint)); 128 return (Type) this; 129 } 130 131 /** 132 * Sends the exchange to a list of endpoints using the 133 * {@link MulticastProcessor} pattern 134 */ 135 public Type to(String... uris) { 136 for (String uri : uris) { 137 addOutput(new ToType(uri)); 138 } 139 return (Type) this; 140 } 141 142 /** 143 * Sends the exchange to a list of endpoints using the 144 * {@link MulticastProcessor} pattern 145 */ 146 public Type to(Endpoint... endpoints) { 147 for (Endpoint endpoint : endpoints) { 148 addOutput(new ToType(endpoint)); 149 } 150 return (Type) this; 151 } 152 153 /** 154 * Sends the exchange to a list of endpoint using the 155 * {@link MulticastProcessor} pattern 156 */ 157 public Type to(Collection<Endpoint> endpoints) { 158 for (Endpoint endpoint : endpoints) { 159 addOutput(new ToType(endpoint)); 160 } 161 return (Type) this; 162 } 163 164 /** 165 * Multicasts messages to all its child outputs; so that each processor and 166 * destination gets a copy of the original message to avoid the processors 167 * interfering with each other. 168 */ 169 public MulticastType multicast() { 170 MulticastType answer = new MulticastType(); 171 addOutput(answer); 172 return answer; 173 } 174 175 /** 176 * Multicasts messages to all its child outputs; so that each processor and 177 * destination gets a copy of the original message to avoid the processors 178 * interfering with each other. 179 * @param aggregationStrategy the strategy used to aggregate responses for 180 * every part 181 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 182 * @return the multicast type 183 */ 184 public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 185 MulticastType answer = new MulticastType(); 186 addOutput(answer); 187 answer.setAggregationStrategy(aggregationStrategy); 188 answer.setParallelProcessing(parallelProcessing); 189 return answer; 190 } 191 192 /** 193 * Multicasts messages to all its child outputs; so that each processor and 194 * destination gets a copy of the original message to avoid the processors 195 * interfering with each other. 196 * @param aggregationStrategy the strategy used to aggregate responses for 197 * every part 198 * @return the multicast type 199 */ 200 public MulticastType multicast(AggregationStrategy aggregationStrategy) { 201 MulticastType answer = new MulticastType(); 202 addOutput(answer); 203 answer.setAggregationStrategy(aggregationStrategy); 204 return answer; 205 } 206 207 /** 208 * Creates a {@link Pipeline} of the list of endpoints so that the message 209 * will get processed by each endpoint in turn and for request/response the 210 * output of one endpoint will be the input of the next endpoint 211 */ 212 public Type pipeline(String... uris) { 213 // TODO pipeline v mulicast 214 return to(uris); 215 } 216 217 /** 218 * Creates a {@link Pipeline} of the list of endpoints so that the message 219 * will get processed by each endpoint in turn and for request/response the 220 * output of one endpoint will be the input of the next endpoint 221 */ 222 public Type pipeline(Endpoint... endpoints) { 223 // TODO pipeline v mulicast 224 return to(endpoints); 225 } 226 227 /** 228 * Creates a {@link Pipeline} of the list of endpoints so that the message 229 * will get processed by each endpoint in turn and for request/response the 230 * output of one endpoint will be the input of the next endpoint 231 */ 232 public Type pipeline(Collection<Endpoint> endpoints) { 233 // TODO pipeline v mulicast 234 return to(endpoints); 235 } 236 237 /** 238 * Ends the current block 239 */ 240 public ProcessorType<? extends ProcessorType> end() { 241 if (blocks.isEmpty()) { 242 if (parent == null) { 243 throw new IllegalArgumentException("Root node with no active block"); 244 } 245 return parent; 246 } 247 popBlock(); 248 return this; 249 } 250 251 /** 252 * Causes subsequent processors to be called asynchronously 253 * 254 * @param coreSize the number of threads that will be used to process 255 * messages in subsequent processors. 256 * @return a ThreadType builder that can be used to further configure the 257 * the thread pool. 258 */ 259 public ThreadType thread(int coreSize) { 260 ThreadType answer = new ThreadType(coreSize); 261 addOutput(answer); 262 return answer; 263 } 264 265 /** 266 * Causes subsequent processors to be called asynchronously 267 * 268 * @param executor the executor that will be used to process 269 * messages in subsequent processors. 270 * @return a ThreadType builder that can be used to further configure the 271 * the thread pool. 272 */ 273 public ProcessorType<Type> thread(ThreadPoolExecutor executor) { 274 ThreadType answer = new ThreadType(executor); 275 addOutput(answer); 276 return this; 277 } 278 279 /** 280 * Creates an {@link IdempotentConsumer} to avoid duplicate messages 281 */ 282 public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression, 283 MessageIdRepository messageIdRepository) { 284 IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository); 285 addOutput(answer); 286 return answer; 287 } 288 289 /** 290 * Creates an {@link IdempotentConsumer} to avoid duplicate messages 291 * 292 * @return the builder used to create the expression 293 */ 294 public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) { 295 IdempotentConsumerType answer = new IdempotentConsumerType(); 296 answer.setMessageIdRepository(messageIdRepository); 297 addOutput(answer); 298 return ExpressionClause.createAndSetExpression(answer); 299 } 300 301 /** 302 * Creates a predicate expression which only if it is true then the 303 * exchange is forwarded to the destination 304 * 305 * @return the clause used to create the filter expression 306 */ 307 public ExpressionClause<FilterType> filter() { 308 FilterType filter = new FilterType(); 309 addOutput(filter); 310 return ExpressionClause.createAndSetExpression(filter); 311 } 312 313 /** 314 * Creates a predicate which is applied and only if it is true then the 315 * exchange is forwarded to the destination 316 * 317 * @return the builder for a predicate 318 */ 319 public FilterType filter(Predicate predicate) { 320 FilterType filter = new FilterType(predicate); 321 addOutput(filter); 322 return filter; 323 } 324 325 public FilterType filter(ExpressionType expression) { 326 FilterType filter = getNodeFactory().createFilter(); 327 filter.setExpression(expression); 328 addOutput(filter); 329 return filter; 330 } 331 332 public FilterType filter(String language, String expression) { 333 return filter(new LanguageExpression(language, expression)); 334 } 335 336 public LoadBalanceType loadBalance() { 337 LoadBalanceType answer = new LoadBalanceType(); 338 addOutput(answer); 339 return answer; 340 } 341 342 343 /** 344 * Creates a choice of one or more predicates with an otherwise clause 345 * 346 * @return the builder for a choice expression 347 */ 348 public ChoiceType choice() { 349 ChoiceType answer = new ChoiceType(); 350 addOutput(answer); 351 return answer; 352 } 353 354 /** 355 * Creates a try/catch block 356 * 357 * @return the builder for a tryBlock expression 358 */ 359 public TryType tryBlock() { 360 TryType answer = new TryType(); 361 addOutput(answer); 362 return answer; 363 } 364 365 /** 366 * Creates a dynamic <a 367 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 368 * List</a> pattern. 369 * 370 * @param receipients is the builder of the expression used in the 371 * {@link RecipientList} to decide the destinations 372 */ 373 public Type recipientList(Expression receipients) { 374 RecipientListType answer = new RecipientListType(receipients); 375 addOutput(answer); 376 return (Type) this; 377 } 378 379 /** 380 * Creates a dynamic <a 381 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 382 * List</a> pattern. 383 * 384 * @return the expression clause for the expression used in the 385 * {@link RecipientList} to decide the destinations 386 */ 387 public ExpressionClause<ProcessorType<Type>> recipientList() { 388 RecipientListType answer = new RecipientListType(); 389 addOutput(answer); 390 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 391 answer.setExpression(clause); 392 return clause; 393 } 394 395 /** 396 * Creates a <a 397 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 398 * Slip</a> pattern. 399 * 400 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 401 * class will look in for the list of URIs to route the message to. 402 * @param uriDelimiter is the delimiter that will be used to split up 403 * the list of URIs in the routing slip. 404 */ 405 public Type routingSlip(String header, String uriDelimiter) { 406 RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter); 407 addOutput(answer); 408 return (Type) this; 409 } 410 411 /** 412 * Creates a <a 413 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 414 * Slip</a> pattern. 415 * 416 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 417 * class will look in for the list of URIs to route the message to. The list of URIs 418 * will be split based on the default delimiter 419 * {@link RoutingSlipType#DEFAULT_DELIMITER}. 420 */ 421 public Type routingSlip(String header) { 422 RoutingSlipType answer = new RoutingSlipType(header); 423 addOutput(answer); 424 return (Type) this; 425 } 426 427 /** 428 * Creates a <a 429 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 430 * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}. 431 * The list of URIs in the header will be split based on the default delimiter 432 * {@link RoutingSlipType#DEFAULT_DELIMITER}. 433 */ 434 public Type routingSlip() { 435 RoutingSlipType answer = new RoutingSlipType(); 436 addOutput(answer); 437 return (Type) this; 438 } 439 440 /** 441 * Creates the <a 442 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 443 * pattern where an expression is evaluated to iterate through each of the 444 * parts of a message and then each part is then send to some endpoint. 445 * This splitter responds with the latest message returned from destination 446 * endpoint. 447 * 448 * @param receipients the expression on which to split 449 * @return the builder 450 */ 451 public SplitterType splitter(Expression receipients) { 452 SplitterType answer = new SplitterType(receipients); 453 addOutput(answer); 454 return answer; 455 } 456 457 /** 458 * Creates the <a 459 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 460 * pattern where an expression is evaluated to iterate through each of the 461 * parts of a message and then each part is then send to some endpoint. 462 * This splitter responds with the latest message returned from destination 463 * endpoint. 464 * 465 * @return the expression clause for the expression on which to split 466 */ 467 public ExpressionClause<SplitterType> splitter() { 468 SplitterType answer = new SplitterType(); 469 addOutput(answer); 470 return ExpressionClause.createAndSetExpression(answer); 471 } 472 473 /** 474 * Creates the <a 475 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 476 * pattern where an expression is evaluated to iterate through each of the 477 * parts of a message and then each part is then send to some endpoint. 478 * Answer from the splitter is produced using given {@link AggregationStrategy} 479 * @param partsExpression the expression on which to split 480 * @param aggregationStrategy the strategy used to aggregate responses for 481 * every part 482 * @return the builder 483 */ 484 public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) { 485 SplitterType answer = new SplitterType(partsExpression); 486 addOutput(answer); 487 answer.setAggregationStrategy(aggregationStrategy); 488 return answer; 489 } 490 491 /** 492 * Creates the <a 493 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 494 * pattern where an expression is evaluated to iterate through each of the 495 * parts of a message and then each part is then send to some endpoint. 496 * Answer from the splitter is produced using given {@link AggregationStrategy} 497 * @param aggregationStrategy the strategy used to aggregate responses for 498 * every part 499 * @return the expression clause for the expression on which to split 500 */ 501 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) { 502 SplitterType answer = new SplitterType(); 503 addOutput(answer); 504 answer.setAggregationStrategy(aggregationStrategy); 505 return ExpressionClause.createAndSetExpression(answer); 506 } 507 508 /** 509 * Creates the <a 510 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 511 * pattern where an expression is evaluated to iterate through each of the 512 * parts of a message and then each part is then send to some endpoint. 513 * This splitter responds with the latest message returned from destination 514 * endpoint. 515 * 516 * @param receipients the expression on which to split 517 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 518 * @return the builder 519 */ 520 public SplitterType splitter(Expression receipients, boolean parallelProcessing) { 521 SplitterType answer = new SplitterType(receipients); 522 addOutput(answer); 523 answer.setParallelProcessing(parallelProcessing); 524 return answer; 525 } 526 527 /** 528 * Creates the <a 529 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 530 * pattern where an expression is evaluated to iterate through each of the 531 * parts of a message and then each part is then send to some endpoint. 532 * This splitter responds with the latest message returned from destination 533 * endpoint. 534 * 535 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 536 * @return the expression clause for the expression on which to split 537 */ 538 public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) { 539 SplitterType answer = new SplitterType(); 540 addOutput(answer); 541 answer.setParallelProcessing(parallelProcessing); 542 return ExpressionClause.createAndSetExpression(answer); 543 } 544 545 /** 546 * Creates the <a 547 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 548 * pattern where an expression is evaluated to iterate through each of the 549 * parts of a message and then each part is then send to some endpoint. 550 * Answer from the splitter is produced using given {@link AggregationStrategy} 551 * @param partsExpression the expression on which to split 552 * @param aggregationStrategy the strategy used to aggregate responses for 553 * every part 554 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 555 * @return the builder 556 */ 557 public SplitterType splitter(Expression partsExpression, 558 AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 559 SplitterType answer = new SplitterType(partsExpression); 560 addOutput(answer); 561 answer.setAggregationStrategy(aggregationStrategy); 562 answer.setParallelProcessing(parallelProcessing); 563 return answer; 564 } 565 566 /** 567 * Creates the <a 568 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 569 * pattern where an expression is evaluated to iterate through each of the 570 * parts of a message and then each part is then send to some endpoint. 571 * Answer from the splitter is produced using given {@link AggregationStrategy} 572 * @param aggregationStrategy the strategy used to aggregate responses for 573 * every part 574 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 575 * @return the expression clause for the expression on which to split 576 */ 577 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 578 SplitterType answer = new SplitterType(); 579 addOutput(answer); 580 answer.setAggregationStrategy(aggregationStrategy); 581 answer.setParallelProcessing(parallelProcessing); 582 return ExpressionClause.createAndSetExpression(answer); 583 } 584 585 586 /** 587 * Creates the <a 588 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 589 * pattern where a list of expressions are evaluated to be able to compare 590 * the message exchanges to reorder them. e.g. you may wish to sort by some 591 * headers 592 * 593 * @return the expression clause for the expressions on which to compare messages in order 594 */ 595 public ExpressionClause<ResequencerType> resequencer() { 596 ResequencerType answer = new ResequencerType(); 597 addOutput(answer); 598 ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer); 599 answer.expression(clause); 600 return clause; 601 } 602 603 /** 604 * Creates the <a 605 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 606 * pattern where an expression is evaluated to be able to compare the 607 * message exchanges to reorder them. e.g. you may wish to sort by some 608 * header 609 * 610 * @param expression the expression on which to compare messages in order 611 * @return the builder 612 */ 613 public ResequencerType resequencer(Expression<Exchange> expression) { 614 return resequencer(Collections.<Expression>singletonList(expression)); 615 } 616 617 /** 618 * Creates the <a 619 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 620 * pattern where a list of expressions are evaluated to be able to compare 621 * the message exchanges to reorder them. e.g. you may wish to sort by some 622 * headers 623 * 624 * @param expressions the expressions on which to compare messages in order 625 * @return the builder 626 */ 627 public ResequencerType resequencer(List<Expression> expressions) { 628 ResequencerType answer = new ResequencerType(expressions); 629 addOutput(answer); 630 return answer; 631 } 632 633 /** 634 * Creates the <a 635 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 636 * pattern where a list of expressions are evaluated to be able to compare 637 * the message exchanges to reorder them. e.g. you may wish to sort by some 638 * headers 639 * 640 * @param expressions the expressions on which to compare messages in order 641 * @return the builder 642 */ 643 public ResequencerType resequencer(Expression... expressions) { 644 List<Expression> list = new ArrayList<Expression>(); 645 list.addAll(Arrays.asList(expressions)); 646 return resequencer(list); 647 } 648 649 /** 650 * Creates an <a 651 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 652 * pattern where a batch of messages are processed (up to a maximum amount 653 * or until some timeout is reached) and messages for the same correlation 654 * key are combined together using some kind of {@link AggregationStrategy} 655 * (by default the latest message is used) to compress many message exchanges 656 * into a smaller number of exchanges. 657 * <p/> 658 * A good example of this is stock market data; you may be receiving 30,000 659 * messages/second and you may want to throttle it right down so that multiple 660 * messages for the same stock are combined (or just the latest message is used 661 * and older prices are discarded). Another idea is to combine line item messages 662 * together into a single invoice message. 663 */ 664 public ExpressionClause<AggregatorType> aggregator() { 665 AggregatorType answer = new AggregatorType(); 666 addOutput(answer); 667 return ExpressionClause.createAndSetExpression(answer); 668 } 669 670 /** 671 * Creates an <a 672 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 673 * pattern where a batch of messages are processed (up to a maximum amount 674 * or until some timeout is reached) and messages for the same correlation 675 * key are combined together using some kind of {@link AggregationStrategy} 676 * (by default the latest message is used) to compress many message exchanges 677 * into a smaller number of exchanges. 678 * <p/> 679 * A good example of this is stock market data; you may be receiving 30,000 680 * messages/second and you may want to throttle it right down so that multiple 681 * messages for the same stock are combined (or just the latest message is used 682 * and older prices are discarded). Another idea is to combine line item messages 683 * together into a single invoice message. 684 * 685 * @param aggregationStrategy the strategy used for the aggregation 686 */ 687 public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) { 688 AggregatorType answer = new AggregatorType(); 689 answer.setAggregationStrategy(aggregationStrategy); 690 addOutput(answer); 691 return ExpressionClause.createAndSetExpression(answer); 692 } 693 694 /** 695 * Creates an <a 696 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 697 * pattern using a custom aggregation collection implementation. 698 * 699 * @param aggregationCollection the collection used to perform the aggregation 700 */ 701 public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) { 702 AggregatorType answer = new AggregatorType(); 703 answer.setAggregationCollection(aggregationCollection); 704 addOutput(answer); 705 return ExpressionClause.createAndSetExpression(answer); 706 } 707 708 /** 709 * Creates an <a 710 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 711 * pattern where a batch of messages are processed (up to a maximum amount 712 * or until some timeout is reached) and messages for the same correlation 713 * key are combined together using some kind of {@link AggregationStrategy} 714 * (by default the latest message is used) to compress many message exchanges 715 * into a smaller number of exchanges. 716 * <p/> 717 * A good example of this is stock market data; you may be receiving 30,000 718 * messages/second and you may want to throttle it right down so that multiple 719 * messages for the same stock are combined (or just the latest message is used 720 * and older prices are discarded). Another idea is to combine line item messages 721 * together into a single invoice message. 722 * 723 * @param correlationExpression the expression used to calculate the 724 * correlation key. For a JMS message this could be the 725 * expression <code>header("JMSDestination")</code> or 726 * <code>header("JMSCorrelationID")</code> 727 */ 728 public AggregatorType aggregator(Expression correlationExpression) { 729 AggregatorType answer = new AggregatorType(correlationExpression); 730 addOutput(answer); 731 return answer; 732 } 733 734 /** 735 * Creates an <a 736 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 737 * pattern where a batch of messages are processed (up to a maximum amount 738 * or until some timeout is reached) and messages for the same correlation 739 * key are combined together using some kind of {@link AggregationStrategy} 740 * (by default the latest message is used) to compress many message exchanges 741 * into a smaller number of exchanges. 742 * <p/> 743 * A good example of this is stock market data; you may be receiving 30,000 744 * messages/second and you may want to throttle it right down so that multiple 745 * messages for the same stock are combined (or just the latest message is used 746 * and older prices are discarded). Another idea is to combine line item messages 747 * together into a single invoice message. 748 * 749 * @param correlationExpression the expression used to calculate the 750 * correlation key. For a JMS message this could be the 751 * expression <code>header("JMSDestination")</code> or 752 * <code>header("JMSCorrelationID")</code> 753 */ 754 public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 755 AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy); 756 addOutput(answer); 757 return answer; 758 } 759 760 /** 761 * Creates the <a 762 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 763 * where an expression is used to calculate the time which the message will 764 * be dispatched on 765 * 766 * @param processAtExpression an expression to calculate the time at which 767 * the messages should be processed 768 * @return the builder 769 */ 770 public DelayerType delayer(Expression<Exchange> processAtExpression) { 771 return delayer(processAtExpression, 0L); 772 } 773 774 /** 775 * Creates the <a 776 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 777 * where an expression is used to calculate the time which the message will 778 * be dispatched on 779 * 780 * @param processAtExpression an expression to calculate the time at which 781 * the messages should be processed 782 * @param delay the delay in milliseconds which is added to the 783 * processAtExpression to determine the time the message 784 * should be processed 785 * @return the builder 786 */ 787 public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) { 788 DelayerType answer = new DelayerType(processAtExpression, delay); 789 addOutput(answer); 790 return answer; 791 } 792 793 /** 794 * Creates the <a 795 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 796 * where an expression is used to calculate the time which the message will 797 * be dispatched on 798 * @return the expression clause to create the expression 799 */ 800 public ExpressionClause<DelayerType> delayer() { 801 DelayerType answer = new DelayerType(); 802 addOutput(answer); 803 return ExpressionClause.createAndSetExpression(answer); 804 } 805 806 /** 807 * Creates the <a 808 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 809 * where a fixed amount of milliseconds are used to delay processing of a 810 * message exchange 811 * 812 * @param delay the default delay in milliseconds 813 * @return the builder 814 */ 815 public DelayerType delayer(long delay) { 816 return delayer(null, delay); 817 } 818 819 /** 820 * Creates the <a 821 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 822 * where an expression is used to calculate the time which the message will 823 * be dispatched on 824 * 825 * @return the builder 826 */ 827 public ThrottlerType throttler(long maximumRequestCount) { 828 ThrottlerType answer = new ThrottlerType(maximumRequestCount); 829 addOutput(answer); 830 return answer; 831 } 832 833 834 public Type throwFault(Throwable fault) { 835 ThrowFaultType answer = new ThrowFaultType(); 836 answer.setFault(fault); 837 addOutput(answer); 838 return (Type) this; 839 } 840 841 public Type throwFault(String message) { 842 return throwFault(new CamelException(message)); 843 } 844 845 /** 846 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 847 */ 848 public Type interceptor(String ref) { 849 InterceptorRef interceptor = new InterceptorRef(ref); 850 intercept(interceptor); 851 return (Type) this; 852 } 853 854 /** 855 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 856 */ 857 public Type intercept(DelegateProcessor interceptor) { 858 intercept(new InterceptorRef(interceptor)); 859 //lastInterceptor = interceptor; 860 return (Type) this; 861 } 862 863 /** 864 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 865 */ 866 public InterceptType intercept() { 867 InterceptType answer = new InterceptType(); 868 addOutput(answer); 869 return answer; 870 } 871 872 /** 873 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 874 */ 875 public void intercept(InterceptorType interceptor) { 876 addOutput(interceptor); 877 pushBlock(interceptor); 878 } 879 880 /** 881 * Adds an interceptor around the whole of this nodes processing 882 * 883 * @param interceptor 884 */ 885 public void addInterceptor(InterceptorType interceptor) { 886 interceptors.add(interceptor); 887 } 888 889 /** 890 * Adds an interceptor around the whole of this nodes processing 891 * 892 * @param interceptor 893 */ 894 public void addInterceptor(DelegateProcessor interceptor) { 895 addInterceptor(new InterceptorRef(interceptor)); 896 } 897 898 protected void pushBlock(Block block) { 899 blocks.add(block); 900 } 901 902 protected Block popBlock() { 903 return blocks.isEmpty() ? null : blocks.removeLast(); 904 } 905 906 public Type proceed() { 907 ProceedType proceed = null; 908 ProcessorType currentProcessor = this; 909 910 if (currentProcessor instanceof InterceptType) { 911 proceed = ((InterceptType) currentProcessor).getProceed(); 912 } 913 if (proceed == null) { 914 for (ProcessorType node = parent; node != null; node = node.getParent()) { 915 if (node instanceof InterceptType) { 916 InterceptType intercept = (InterceptType)node; 917 proceed = intercept.getProceed(); 918 break; 919 } 920 } 921 } 922 923 if (currentProcessor instanceof InterceptType) { 924 proceed = ((InterceptType) currentProcessor).getProceed(); 925 } 926 927 if (proceed == null) { 928 throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block"); 929 } 930 931 addOutput(proceed); 932 return (Type) this; 933 } 934 935 public ExceptionType exception(Class exceptionType) { 936 ExceptionType answer = new ExceptionType(exceptionType); 937 addOutput(answer); 938 return answer; 939 } 940 941 /** 942 * Apply an interceptor route if the predicate is true 943 */ 944 public ChoiceType intercept(Predicate predicate) { 945 InterceptType answer = new InterceptType(); 946 addOutput(answer); 947 return answer.when(predicate); 948 } 949 950 public Type interceptors(String... refs) { 951 for (String ref : refs) { 952 interceptor(ref); 953 } 954 return (Type) this; 955 } 956 957 /** 958 * Trace logs the exchange before it goes to the next processing step using 959 * the {@link #DEFAULT_TRACE_CATEGORY} logging category. 960 */ 961 public Type trace() { 962 return trace(DEFAULT_TRACE_CATEGORY); 963 } 964 965 /** 966 * Trace logs the exchange before it goes to the next processing step using 967 * the specified logging category. 968 * 969 * @param category the logging category trace messages will sent to. 970 */ 971 public Type trace(String category) { 972 final Log log = LogFactory.getLog(category); 973 return intercept(new DelegateProcessor() { 974 @Override 975 public void process(Exchange exchange) throws Exception { 976 log.trace(exchange); 977 processNext(exchange); 978 } 979 }); 980 } 981 982 public PolicyRef policies() { 983 PolicyRef answer = new PolicyRef(); 984 addOutput(answer); 985 return answer; 986 } 987 988 public PolicyRef policy(Policy policy) { 989 PolicyRef answer = new PolicyRef(policy); 990 addOutput(answer); 991 return answer; 992 } 993 994 /** 995 * Forces handling of faults as exceptions 996 * 997 * @return the current builder with the fault handler configured 998 */ 999 public Type handleFault() { 1000 intercept(new HandleFaultType()); 1001 return (Type) this; 1002 } 1003 1004 /** 1005 * Installs the given error handler builder 1006 * 1007 * @param errorHandlerBuilder the error handler to be used by default for 1008 * all child routes 1009 * @return the current builder with the error handler configured 1010 */ 1011 public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { 1012 setErrorHandlerBuilder(errorHandlerBuilder); 1013 return (Type) this; 1014 } 1015 1016 /** 1017 * Configures whether or not the error handler is inherited by every 1018 * processing node (or just the top most one) 1019 * 1020 * @param condition the flag as to whether error handlers should be 1021 * inherited or not 1022 * @return the current builder 1023 */ 1024 public Type inheritErrorHandler(boolean condition) { 1025 setInheritErrorHandlerFlag(condition); 1026 return (Type) this; 1027 } 1028 1029 // Transformers 1030 // ------------------------------------------------------------------------- 1031 1032 /** 1033 * Adds the custom processor to this destination which could be a final 1034 * destination, or could be a transformation in a pipeline 1035 */ 1036 public Type process(Processor processor) { 1037 ProcessorRef answer = new ProcessorRef(processor); 1038 addOutput(answer); 1039 return (Type) this; 1040 } 1041 1042 /** 1043 * Adds the custom processor reference to this destination which could be a final 1044 * destination, or could be a transformation in a pipeline 1045 */ 1046 public Type processRef(String ref) { 1047 ProcessorRef answer = new ProcessorRef(); 1048 answer.setRef(ref); 1049 addOutput(answer); 1050 return (Type) this; 1051 } 1052 1053 /** 1054 * Adds a bean which is invoked which could be a final destination, or could 1055 * be a transformation in a pipeline 1056 */ 1057 public Type bean(Object bean) { 1058 BeanRef answer = new BeanRef(); 1059 answer.setBean(bean); 1060 addOutput(answer); 1061 return (Type) this; 1062 } 1063 1064 /** 1065 * Adds a bean and method which is invoked which could be a final 1066 * destination, or could be a transformation in a pipeline 1067 */ 1068 public Type bean(Object bean, String method) { 1069 BeanRef answer = new BeanRef(); 1070 answer.setBean(bean); 1071 answer.setMethod(method); 1072 addOutput(answer); 1073 return (Type) this; 1074 } 1075 1076 /** 1077 * Adds a bean by type which is invoked which could be a final destination, or could 1078 * be a transformation in a pipeline 1079 */ 1080 public Type bean(Class beanType) { 1081 BeanRef answer = new BeanRef(); 1082 answer.setBeanType(beanType); 1083 addOutput(answer); 1084 return (Type) this; 1085 } 1086 1087 /** 1088 * Adds a bean type and method which is invoked which could be a final 1089 * destination, or could be a transformation in a pipeline 1090 */ 1091 public Type bean(Class beanType, String method) { 1092 BeanRef answer = new BeanRef(); 1093 answer.setBeanType(beanType); 1094 answer.setMethod(method); 1095 addOutput(answer); 1096 return (Type) this; 1097 } 1098 1099 /** 1100 * Adds a bean which is invoked which could be a final destination, or could 1101 * be a transformation in a pipeline 1102 */ 1103 public Type beanRef(String ref) { 1104 BeanRef answer = new BeanRef(ref); 1105 addOutput(answer); 1106 return (Type) this; 1107 } 1108 1109 /** 1110 * Adds a bean and method which is invoked which could be a final 1111 * destination, or could be a transformation in a pipeline 1112 */ 1113 public Type beanRef(String ref, String method) { 1114 BeanRef answer = new BeanRef(ref, method); 1115 addOutput(answer); 1116 return (Type) this; 1117 } 1118 1119 /** 1120 * Adds a processor which sets the body on the IN message 1121 */ 1122 public ExpressionClause<ProcessorType<Type>> setBody() { 1123 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1124 SetBodyType answer = new SetBodyType(clause); 1125 addOutput(answer); 1126 return clause; 1127 } 1128 1129 /** 1130 * Adds a processor which sets the body on the IN message 1131 */ 1132 public Type setBody(Expression expression) { 1133 SetBodyType answer = new SetBodyType(expression); 1134 addOutput(answer); 1135 return (Type) this; 1136 } 1137 1138 /** 1139 * Adds a processor which sets the body on the OUT message 1140 * 1141 * @deprecated Please use {@link #transform(Expression)} instead. Will be removed in Camel 2.0. 1142 */ 1143 @Deprecated 1144 public Type setOutBody(Expression expression) { 1145 return transform(expression); 1146 } 1147 1148 /** 1149 * Adds a processor which sets the body on the OUT message 1150 * 1151 * @deprecated Please use {@link #transform()} instead. Will be removed in Camel 2.0. 1152 */ 1153 @Deprecated 1154 public ExpressionClause<ProcessorType<Type>> setOutBody() { 1155 return transform(); 1156 } 1157 1158 /** 1159 * Adds a processor which sets the body on the OUT message 1160 */ 1161 public Type transform(Expression expression) { 1162 TransformType answer = new TransformType(expression); 1163 addOutput(answer); 1164 return (Type) this; 1165 } 1166 1167 /** 1168 * Adds a processor which sets the body on the OUT message 1169 */ 1170 public ExpressionClause<ProcessorType<Type>> transform() { 1171 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1172 TransformType answer = new TransformType(clause); 1173 addOutput(answer); 1174 return clause; 1175 } 1176 1177 /** 1178 * Adds a processor which sets the body on the FAULT message 1179 */ 1180 public Type setFaultBody(Expression expression) { 1181 return process(ProcessorBuilder.setFaultBody(expression)); 1182 } 1183 1184 /** 1185 * Adds a processor which sets the header on the IN message 1186 */ 1187 public ExpressionClause<ProcessorType<Type>> setHeader(String name) { 1188 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1189 SetHeaderType answer = new SetHeaderType(name, clause); 1190 addOutput(answer); 1191 return clause; 1192 } 1193 1194 /** 1195 * Adds a processor which sets the header on the IN message 1196 */ 1197 public Type setHeader(String name, Expression expression) { 1198 SetHeaderType answer = new SetHeaderType(name, expression); 1199 addOutput(answer); 1200 return (Type) this; 1201 } 1202 1203 /** 1204 * Adds a processor which sets the header on the IN message to the given value 1205 */ 1206 public Type setHeader(String name, String value) { 1207 SetHeaderType answer = new SetHeaderType(name, value); 1208 addOutput(answer); 1209 return (Type) this; 1210 } 1211 1212 /** 1213 * Adds a processor which sets the header on the OUT message 1214 */ 1215 public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) { 1216 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1217 process(ProcessorBuilder.setOutHeader(name, clause)); 1218 return clause; 1219 } 1220 1221 /** 1222 * Adds a processor which sets the header on the OUT message 1223 */ 1224 public Type setOutHeader(String name, Expression expression) { 1225 return process(ProcessorBuilder.setOutHeader(name, expression)); 1226 } 1227 1228 /** 1229 * Adds a processor which sets the header on the OUT message 1230 */ 1231 public Type setOutHeader(String name, String value) { 1232 return (Type) setOutHeader(name).constant(value); 1233 } 1234 1235 /** 1236 * Adds a processor which sets the header on the FAULT message 1237 */ 1238 public Type setFaultHeader(String name, Expression expression) { 1239 return process(ProcessorBuilder.setFaultHeader(name, expression)); 1240 } 1241 1242 /** 1243 * Adds a processor which sets the exchange property 1244 */ 1245 public Type setProperty(String name, Expression expression) { 1246 return process(ProcessorBuilder.setProperty(name, expression)); 1247 } 1248 1249 1250 /** 1251 * Adds a processor which sets the exchange property 1252 */ 1253 public ExpressionClause<ProcessorType<Type>> setProperty(String name) { 1254 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1255 process(ProcessorBuilder.setProperty(name, clause)); 1256 return clause; 1257 } 1258 1259 /** 1260 * Adds a processor which removes the header on the IN message 1261 */ 1262 public Type removeHeader(String name) { 1263 return process(ProcessorBuilder.removeHeader(name)); 1264 } 1265 1266 /** 1267 * Adds a processor which removes the header on the OUT message 1268 */ 1269 public Type removeOutHeader(String name) { 1270 return process(ProcessorBuilder.removeOutHeader(name)); 1271 } 1272 1273 /** 1274 * Adds a processor which removes the header on the FAULT message 1275 */ 1276 public Type removeFaultHeader(String name) { 1277 return process(ProcessorBuilder.removeFaultHeader(name)); 1278 } 1279 1280 /** 1281 * Adds a processor which removes the exchange property 1282 */ 1283 public Type removeProperty(String name) { 1284 return process(ProcessorBuilder.removeProperty(name)); 1285 } 1286 1287 /** 1288 * Converts the IN message body to the specified type 1289 */ 1290 public Type convertBodyTo(Class type) { 1291 addOutput(new ConvertBodyType(type)); 1292 return (Type) this; 1293 } 1294 1295 /** 1296 * Converts the OUT message body to the specified type 1297 * 1298 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0. 1299 */ 1300 @Deprecated 1301 public Type convertOutBodyTo(Class type) { 1302 return process(new ConvertBodyProcessor(type)); 1303 } 1304 1305 /** 1306 * Converts the FAULT message body to the specified type 1307 * 1308 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0. 1309 */ 1310 @Deprecated 1311 public Type convertFaultBodyTo(Class type) { 1312 return process(new ConvertBodyProcessor(type)); 1313 } 1314 1315 // DataFormat support 1316 // ------------------------------------------------------------------------- 1317 1318 /** 1319 * Unmarshals the in body using a {@link DataFormat} expression to define 1320 * the format of the input message and the output will be set on the out message body. 1321 * 1322 * @return the expression to create the {@link DataFormat} 1323 */ 1324 public DataFormatClause<ProcessorType<Type>> unmarshal() { 1325 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal); 1326 } 1327 1328 /** 1329 * Unmarshals the in body using the specified {@link DataFormat} 1330 * and sets the output on the out message body. 1331 * 1332 * @return this object 1333 */ 1334 public Type unmarshal(DataFormatType dataFormatType) { 1335 addOutput(new UnmarshalType(dataFormatType)); 1336 return (Type) this; 1337 } 1338 1339 /** 1340 * Unmarshals the in body using the specified {@link DataFormat} 1341 * and sets the output on the out message body. 1342 * 1343 * @return this object 1344 */ 1345 public Type unmarshal(DataFormat dataFormat) { 1346 return unmarshal(new DataFormatType(dataFormat)); 1347 } 1348 1349 /** 1350 * Unmarshals the in body using the specified {@link DataFormat} 1351 * reference in the {@link Registry} and sets the output on the out message body. 1352 * 1353 * @return this object 1354 */ 1355 public Type unmarshal(String dataTypeRef) { 1356 addOutput(new UnmarshalType(dataTypeRef)); 1357 return (Type) this; 1358 } 1359 1360 /** 1361 * Marshals the in body using a {@link DataFormat} expression to define 1362 * the format of the output which will be added to the out body. 1363 * 1364 * @return the expression to create the {@link DataFormat} 1365 */ 1366 public DataFormatClause<ProcessorType<Type>> marshal() { 1367 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal); 1368 } 1369 1370 /** 1371 * Marshals the in body using the specified {@link DataFormat} 1372 * and sets the output on the out message body. 1373 * 1374 * @return this object 1375 */ 1376 public Type marshal(DataFormatType dataFormatType) { 1377 addOutput(new MarshalType(dataFormatType)); 1378 return (Type) this; 1379 } 1380 1381 /** 1382 * Marshals the in body using the specified {@link DataFormat} 1383 * and sets the output on the out message body. 1384 * 1385 * @return this object 1386 */ 1387 public Type marshal(DataFormat dataFormat) { 1388 return marshal(new DataFormatType(dataFormat)); 1389 } 1390 1391 /** 1392 * Marshals the in body the specified {@link DataFormat} 1393 * reference in the {@link Registry} and sets the output on the out message body. 1394 * 1395 * @return this object 1396 */ 1397 public Type marshal(String dataTypeRef) { 1398 addOutput(new MarshalType(dataTypeRef)); 1399 return (Type) this; 1400 } 1401 1402 // Properties 1403 // ------------------------------------------------------------------------- 1404 @XmlTransient 1405 public ProcessorType<? extends ProcessorType> getParent() { 1406 return parent; 1407 } 1408 1409 public void setParent(ProcessorType<? extends ProcessorType> parent) { 1410 this.parent = parent; 1411 } 1412 1413 @XmlTransient 1414 public ErrorHandlerBuilder getErrorHandlerBuilder() { 1415 if (errorHandlerBuilder == null) { 1416 errorHandlerBuilder = createErrorHandlerBuilder(); 1417 } 1418 return errorHandlerBuilder; 1419 } 1420 1421 /** 1422 * Sets the error handler to use with processors created by this builder 1423 */ 1424 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { 1425 this.errorHandlerBuilder = errorHandlerBuilder; 1426 } 1427 1428 @XmlTransient 1429 public boolean isInheritErrorHandler() { 1430 return isInheritErrorHandler(getInheritErrorHandlerFlag()); 1431 } 1432 1433 /** 1434 * Lets default the inherit value to be true if there is none specified 1435 */ 1436 public static boolean isInheritErrorHandler(Boolean value) { 1437 return value == null || value.booleanValue(); 1438 } 1439 1440 @XmlAttribute(name = "inheritErrorHandler", required = false) 1441 public Boolean getInheritErrorHandlerFlag() { 1442 return inheritErrorHandlerFlag; 1443 } 1444 1445 public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) { 1446 this.inheritErrorHandlerFlag = inheritErrorHandlerFlag; 1447 } 1448 1449 @XmlTransient 1450 public NodeFactory getNodeFactory() { 1451 if (nodeFactory == null) { 1452 nodeFactory = new NodeFactory(); 1453 } 1454 return nodeFactory; 1455 } 1456 1457 public void setNodeFactory(NodeFactory nodeFactory) { 1458 this.nodeFactory = nodeFactory; 1459 } 1460 1461 /** 1462 * Returns a label to describe this node such as the expression if some kind of expression node 1463 */ 1464 public String getLabel() { 1465 return ""; 1466 } 1467 1468 // Implementation methods 1469 // ------------------------------------------------------------------------- 1470 1471 /** 1472 * Creates the processor and wraps it in any necessary interceptors and 1473 * error handlers 1474 */ 1475 protected Processor makeProcessor(RouteContext routeContext) throws Exception { 1476 Processor processor = createProcessor(routeContext); 1477 return wrapProcessor(routeContext, processor); 1478 } 1479 1480 /** 1481 * A strategy method which allows derived classes to wrap the child 1482 * processor in some kind of interceptor 1483 * 1484 * @param routeContext 1485 * @param target the processor which can be wrapped 1486 * @return the original processor or a new wrapped interceptor 1487 */ 1488 protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception { 1489 // The target is required. 1490 if (target == null) { 1491 throw new RuntimeCamelException("target provided."); 1492 } 1493 1494 1495 List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>(); 1496 CamelContext camelContext = routeContext.getCamelContext(); 1497 if (camelContext instanceof DefaultCamelContext) { 1498 DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext; 1499 strategies.addAll(defaultCamelContext.getInterceptStrategies()); 1500 } 1501 strategies.addAll(routeContext.getInterceptStrategies()); 1502 for (InterceptStrategy strategy : strategies) { 1503 if (strategy != null) { 1504 target = strategy.wrapProcessorInInterceptors(this, target); 1505 } 1506 } 1507 1508 List<InterceptorType> list = routeContext.getRoute().getInterceptors(); 1509 if (interceptors != null) { 1510 list.addAll(interceptors); 1511 } 1512 // lets reverse the list so we apply the inner interceptors first 1513 Collections.reverse(list); 1514 Set<Processor> interceptors = new HashSet<Processor>(); 1515 interceptors.add(target); 1516 for (InterceptorType interceptorType : list) { 1517 DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext); 1518 if (!interceptors.contains(interceptor)) { 1519 interceptors.add(interceptor); 1520 interceptor.setProcessor(target); 1521 target = interceptor; 1522 } 1523 } 1524 return target; 1525 } 1526 1527 /** 1528 * A strategy method to allow newly created processors to be wrapped in an 1529 * error handler. 1530 */ 1531 protected Processor wrapInErrorHandler(Processor processor) throws Exception { 1532 return getErrorHandlerBuilder().createErrorHandler(processor); 1533 } 1534 1535 protected ErrorHandlerBuilder createErrorHandlerBuilder() { 1536 if (isInheritErrorHandler()) { 1537 return new DeadLetterChannelBuilder(); 1538 } else { 1539 return new NoErrorHandlerBuilder(); 1540 } 1541 } 1542 1543 protected void configureChild(ProcessorType output) { 1544 output.setNodeFactory(getNodeFactory()); 1545 } 1546 1547 public void addOutput(ProcessorType processorType) { 1548 processorType.setParent(this); 1549 configureChild(processorType); 1550 if (blocks.isEmpty()) { 1551 getOutputs().add(processorType); 1552 } else { 1553 Block block = blocks.getLast(); 1554 block.addOutput(processorType); 1555 } 1556 } 1557 1558 /** 1559 * Creates a new instance of some kind of composite processor which defaults 1560 * to using a {@link Pipeline} but derived classes could change the 1561 * behaviour 1562 */ 1563 protected Processor createCompositeProcessor(List<Processor> list) { 1564 // return new MulticastProcessor(list); 1565 return new Pipeline(list); 1566 } 1567 1568 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs) 1569 throws Exception { 1570 List<Processor> list = new ArrayList<Processor>(); 1571 for (ProcessorType output : outputs) { 1572 Processor processor = output.createProcessor(routeContext); 1573 processor = output.wrapProcessorInInterceptors(routeContext, processor); 1574 list.add(processor); 1575 } 1576 Processor processor = null; 1577 if (!list.isEmpty()) { 1578 if (list.size() == 1) { 1579 processor = list.get(0); 1580 } else { 1581 processor = createCompositeProcessor(list); 1582 } 1583 } 1584 return processor; 1585 } 1586 1587 public void clearOutput() { 1588 getOutputs().clear(); 1589 blocks.clear(); 1590 } 1591 }