001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.Collections; 022 import java.util.LinkedList; 023 import java.util.List; 024 import java.util.concurrent.ThreadPoolExecutor; 025 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlTransient; 028 029 import org.apache.camel.CamelException; 030 import org.apache.camel.Endpoint; 031 import org.apache.camel.Exchange; 032 import org.apache.camel.Expression; 033 import org.apache.camel.Predicate; 034 import org.apache.camel.Processor; 035 import org.apache.camel.Route; 036 import org.apache.camel.RuntimeCamelException; 037 import org.apache.camel.builder.DataFormatClause; 038 import org.apache.camel.builder.DeadLetterChannelBuilder; 039 import org.apache.camel.builder.ErrorHandlerBuilder; 040 import org.apache.camel.builder.ExpressionClause; 041 import org.apache.camel.builder.NoErrorHandlerBuilder; 042 import org.apache.camel.builder.ProcessorBuilder; 043 import org.apache.camel.converter.ObjectConverter; 044 import org.apache.camel.impl.RouteContext; 045 import org.apache.camel.model.dataformat.DataFormatType; 046 import org.apache.camel.model.language.ExpressionType; 047 import org.apache.camel.model.language.LanguageExpression; 048 import org.apache.camel.processor.ConvertBodyProcessor; 049 import org.apache.camel.processor.DelegateProcessor; 050 import org.apache.camel.processor.MulticastProcessor; 051 import org.apache.camel.processor.Pipeline; 052 import org.apache.camel.processor.RecipientList; 053 import org.apache.camel.processor.aggregate.AggregationCollection; 054 import org.apache.camel.processor.aggregate.AggregationStrategy; 055 import org.apache.camel.processor.idempotent.IdempotentConsumer; 056 import org.apache.camel.processor.idempotent.MessageIdRepository; 057 import org.apache.camel.spi.DataFormat; 058 import org.apache.camel.spi.Policy; 059 import org.apache.camel.spi.Registry; 060 import org.apache.commons.logging.Log; 061 import org.apache.commons.logging.LogFactory; 062 063 /** 064 * @version $Revision: 41004 $ 065 */ 066 public abstract class ProcessorType<Type extends ProcessorType> implements Block { 067 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE"; 068 private ErrorHandlerBuilder errorHandlerBuilder; 069 private Boolean inheritErrorHandlerFlag = Boolean.TRUE; // TODO not sure how 070 private DelegateProcessor lastInterceptor; 071 private NodeFactory nodeFactory; 072 private LinkedList<Block> blocks = new LinkedList<Block>(); 073 private ProcessorType<? extends ProcessorType> parent; 074 075 // else to use an optional attribute in JAXB2 076 public abstract List<ProcessorType<?>> getOutputs(); 077 078 079 public Processor createProcessor(RouteContext routeContext) throws Exception { 080 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName()); 081 } 082 083 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception { 084 Collection<ProcessorType<?>> outputs = getOutputs(); 085 return createOutputsProcessor(routeContext, outputs); 086 } 087 088 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 089 Processor processor = makeProcessor(routeContext); 090 routeContext.addEventDrivenProcessor(processor); 091 } 092 093 /** 094 * Wraps the child processor in whatever necessary interceptors and error 095 * handlers 096 */ 097 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { 098 processor = wrapProcessorInInterceptors(routeContext, processor); 099 return wrapInErrorHandler(processor); 100 } 101 102 // Fluent API 103 // ------------------------------------------------------------------------- 104 105 /** 106 * Sends the exchange to the given endpoint URI 107 */ 108 public Type to(String uri) { 109 addOutput(new ToType(uri)); 110 return (Type) this; 111 } 112 113 /** 114 * Sends the exchange to the given endpoint 115 */ 116 public Type to(Endpoint endpoint) { 117 addOutput(new ToType(endpoint)); 118 return (Type) this; 119 } 120 121 /** 122 * Sends the exchange to a list of endpoints using the 123 * {@link MulticastProcessor} pattern 124 */ 125 public Type to(String... uris) { 126 for (String uri : uris) { 127 addOutput(new ToType(uri)); 128 } 129 return (Type) this; 130 } 131 132 /** 133 * Sends the exchange to a list of endpoints using the 134 * {@link MulticastProcessor} pattern 135 */ 136 public Type to(Endpoint... endpoints) { 137 for (Endpoint endpoint : endpoints) { 138 addOutput(new ToType(endpoint)); 139 } 140 return (Type) this; 141 } 142 143 /** 144 * Sends the exchange to a list of endpoint using the 145 * {@link MulticastProcessor} pattern 146 */ 147 public Type to(Collection<Endpoint> endpoints) { 148 for (Endpoint endpoint : endpoints) { 149 addOutput(new ToType(endpoint)); 150 } 151 return (Type) this; 152 } 153 154 /** 155 * Multicasts messages to all its child outputs; so that each processor and 156 * destination gets a copy of the original message to avoid the processors 157 * interfering with each other. 158 */ 159 public MulticastType multicast() { 160 MulticastType answer = new MulticastType(); 161 addOutput(answer); 162 return answer; 163 } 164 165 /** 166 * Multicasts messages to all its child outputs; so that each processor and 167 * destination gets a copy of the original message to avoid the processors 168 * interfering with each other. 169 * @param aggregationStrategy the strategy used to aggregate responses for 170 * every part 171 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 172 * @return the multicast type 173 */ 174 public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 175 MulticastType answer = new MulticastType(); 176 addOutput(answer); 177 answer.setAggregationStrategy(aggregationStrategy); 178 answer.setParallelProcessing(parallelProcessing); 179 return answer; 180 } 181 182 /** 183 * Multicasts messages to all its child outputs; so that each processor and 184 * destination gets a copy of the original message to avoid the processors 185 * interfering with each other. 186 * @param aggregationStrategy the strategy used to aggregate responses for 187 * every part 188 * @return the multicast type 189 */ 190 public MulticastType multicast(AggregationStrategy aggregationStrategy) { 191 MulticastType answer = new MulticastType(); 192 addOutput(answer); 193 answer.setAggregationStrategy(aggregationStrategy); 194 return answer; 195 } 196 197 /** 198 * Creates a {@link Pipeline} of the list of endpoints so that the message 199 * will get processed by each endpoint in turn and for request/response the 200 * output of one endpoint will be the input of the next endpoint 201 */ 202 public Type pipeline(String... uris) { 203 // TODO pipeline v mulicast 204 return to(uris); 205 } 206 207 /** 208 * Creates a {@link Pipeline} of the list of endpoints so that the message 209 * will get processed by each endpoint in turn and for request/response the 210 * output of one endpoint will be the input of the next endpoint 211 */ 212 public Type pipeline(Endpoint... endpoints) { 213 // TODO pipeline v mulicast 214 return to(endpoints); 215 } 216 217 /** 218 * Creates a {@link Pipeline} of the list of endpoints so that the message 219 * will get processed by each endpoint in turn and for request/response the 220 * output of one endpoint will be the input of the next endpoint 221 */ 222 public Type pipeline(Collection<Endpoint> endpoints) { 223 // TODO pipeline v mulicast 224 return to(endpoints); 225 } 226 227 /** 228 * Ends the current block 229 */ 230 public ProcessorType<? extends ProcessorType> end() { 231 if (blocks.isEmpty()) { 232 if (parent == null) { 233 throw new IllegalArgumentException("Root node with no active block"); 234 } 235 return parent; 236 } 237 popBlock(); 238 return this; 239 } 240 241 /** 242 * Causes subsequent processors to be called asynchronously 243 * 244 * @param coreSize the number of threads that will be used to process 245 * messages in subsequent processors. 246 * @return a ThreadType builder that can be used to further configure the 247 * the thread pool. 248 */ 249 public ThreadType thread(int coreSize) { 250 ThreadType answer = new ThreadType(coreSize); 251 addOutput(answer); 252 return answer; 253 } 254 255 /** 256 * Causes subsequent processors to be called asynchronously 257 * 258 * @param executor the executor that will be used to process 259 * messages in subsequent processors. 260 * @return a ThreadType builder that can be used to further configure the 261 * the thread pool. 262 */ 263 public ProcessorType<Type> thread(ThreadPoolExecutor executor) { 264 ThreadType answer = new ThreadType(executor); 265 addOutput(answer); 266 return this; 267 } 268 269 /** 270 * Creates an {@link IdempotentConsumer} to avoid duplicate messages 271 */ 272 public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression, 273 MessageIdRepository messageIdRepository) { 274 IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository); 275 addOutput(answer); 276 return answer; 277 } 278 279 /** 280 * Creates an {@link IdempotentConsumer} to avoid duplicate messages 281 * 282 * @return the builder used to create the expression 283 */ 284 public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) { 285 IdempotentConsumerType answer = new IdempotentConsumerType(); 286 answer.setMessageIdRepository(messageIdRepository); 287 addOutput(answer); 288 return ExpressionClause.createAndSetExpression(answer); 289 } 290 291 /** 292 * Creates a predicate expression which only if it is true then the 293 * exchange is forwarded to the destination 294 * 295 * @return the clause used to create the filter expression 296 */ 297 public ExpressionClause<FilterType> filter() { 298 FilterType filter = new FilterType(); 299 addOutput(filter); 300 return ExpressionClause.createAndSetExpression(filter); 301 } 302 303 /** 304 * Creates a predicate which is applied and only if it is true then the 305 * exchange is forwarded to the destination 306 * 307 * @return the builder for a predicate 308 */ 309 public FilterType filter(Predicate predicate) { 310 FilterType filter = new FilterType(predicate); 311 addOutput(filter); 312 return filter; 313 } 314 315 public FilterType filter(ExpressionType expression) { 316 FilterType filter = getNodeFactory().createFilter(); 317 filter.setExpression(expression); 318 addOutput(filter); 319 return filter; 320 } 321 322 public FilterType filter(String language, String expression) { 323 return filter(new LanguageExpression(language, expression)); 324 } 325 326 public LoadBalanceType loadBalance() { 327 LoadBalanceType answer = new LoadBalanceType(); 328 addOutput(answer); 329 return answer; 330 } 331 332 333 /** 334 * Creates a choice of one or more predicates with an otherwise clause 335 * 336 * @return the builder for a choice expression 337 */ 338 public ChoiceType choice() { 339 ChoiceType answer = new ChoiceType(); 340 addOutput(answer); 341 return answer; 342 } 343 344 /** 345 * Creates a try/catch block 346 * 347 * @return the builder for a tryBlock expression 348 */ 349 public TryType tryBlock() { 350 TryType answer = new TryType(); 351 addOutput(answer); 352 return answer; 353 } 354 355 /** 356 * Creates a dynamic <a 357 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 358 * List</a> pattern. 359 * 360 * @param receipients is the builder of the expression used in the 361 * {@link RecipientList} to decide the destinations 362 */ 363 public Type recipientList(Expression receipients) { 364 RecipientListType answer = new RecipientListType(receipients); 365 addOutput(answer); 366 return (Type) this; 367 } 368 369 /** 370 * Creates a dynamic <a 371 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 372 * List</a> pattern. 373 * 374 * @return the expression clause for the expression used in the 375 * {@link RecipientList} to decide the destinations 376 */ 377 public ExpressionClause<ProcessorType<Type>> recipientList() { 378 RecipientListType answer = new RecipientListType(); 379 addOutput(answer); 380 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 381 answer.setExpression(clause); 382 return clause; 383 } 384 385 /** 386 * Creates a <a 387 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 388 * Slip</a> pattern. 389 * 390 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 391 * class will look in for the list of URIs to route the message to. 392 * @param uriDelimiter is the delimiter that will be used to split up 393 * the list of URIs in the routing slip. 394 */ 395 public Type routingSlip(String header, String uriDelimiter) { 396 RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter); 397 addOutput(answer); 398 return (Type) this; 399 } 400 401 /** 402 * Creates a <a 403 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 404 * Slip</a> pattern. 405 * 406 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 407 * class will look in for the list of URIs to route the message to. The list of URIs 408 * will be split based on the default delimiter 409 * {@link RoutingSlipType#DEFAULT_DELIMITER}. 410 */ 411 public Type routingSlip(String header) { 412 RoutingSlipType answer = new RoutingSlipType(header); 413 addOutput(answer); 414 return (Type) this; 415 } 416 417 /** 418 * Creates a <a 419 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 420 * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}. 421 * The list of URIs in the header will be split based on the default delimiter 422 * {@link RoutingSlipType#DEFAULT_DELIMITER}. 423 */ 424 public Type routingSlip() { 425 RoutingSlipType answer = new RoutingSlipType(); 426 addOutput(answer); 427 return (Type) this; 428 } 429 430 /** 431 * Creates the <a 432 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 433 * pattern where an expression is evaluated to iterate through each of the 434 * parts of a message and then each part is then send to some endpoint. 435 * This splitter responds with the latest message returned from destination 436 * endpoint. 437 * 438 * @param receipients the expression on which to split 439 * @return the builder 440 */ 441 public SplitterType splitter(Expression receipients) { 442 SplitterType answer = new SplitterType(receipients); 443 addOutput(answer); 444 return answer; 445 } 446 447 /** 448 * Creates the <a 449 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 450 * pattern where an expression is evaluated to iterate through each of the 451 * parts of a message and then each part is then send to some endpoint. 452 * This splitter responds with the latest message returned from destination 453 * endpoint. 454 * 455 * @return the expression clause for the expression on which to split 456 */ 457 public ExpressionClause<SplitterType> splitter() { 458 SplitterType answer = new SplitterType(); 459 addOutput(answer); 460 return ExpressionClause.createAndSetExpression(answer); 461 } 462 463 /** 464 * Creates the <a 465 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 466 * pattern where an expression is evaluated to iterate through each of the 467 * parts of a message and then each part is then send to some endpoint. 468 * Answer from the splitter is produced using given {@link AggregationStrategy} 469 * @param partsExpression the expression on which to split 470 * @param aggregationStrategy the strategy used to aggregate responses for 471 * every part 472 * @return the builder 473 */ 474 public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) { 475 SplitterType answer = new SplitterType(partsExpression); 476 addOutput(answer); 477 answer.setAggregationStrategy(aggregationStrategy); 478 return answer; 479 } 480 481 /** 482 * Creates the <a 483 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 484 * pattern where an expression is evaluated to iterate through each of the 485 * parts of a message and then each part is then send to some endpoint. 486 * Answer from the splitter is produced using given {@link AggregationStrategy} 487 * @param aggregationStrategy the strategy used to aggregate responses for 488 * every part 489 * @return the expression clause for the expression on which to split 490 */ 491 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) { 492 SplitterType answer = new SplitterType(); 493 addOutput(answer); 494 answer.setAggregationStrategy(aggregationStrategy); 495 return ExpressionClause.createAndSetExpression(answer); 496 } 497 498 /** 499 * Creates the <a 500 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 501 * pattern where an expression is evaluated to iterate through each of the 502 * parts of a message and then each part is then send to some endpoint. 503 * This splitter responds with the latest message returned from destination 504 * endpoint. 505 * 506 * @param receipients the expression on which to split 507 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 508 * @return the builder 509 */ 510 public SplitterType splitter(Expression receipients, boolean parallelProcessing) { 511 SplitterType answer = new SplitterType(receipients); 512 addOutput(answer); 513 answer.setParallelProcessing(parallelProcessing); 514 return answer; 515 } 516 517 /** 518 * Creates the <a 519 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 520 * pattern where an expression is evaluated to iterate through each of the 521 * parts of a message and then each part is then send to some endpoint. 522 * This splitter responds with the latest message returned from destination 523 * endpoint. 524 * 525 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 526 * @return the expression clause for the expression on which to split 527 */ 528 public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) { 529 SplitterType answer = new SplitterType(); 530 addOutput(answer); 531 answer.setParallelProcessing(parallelProcessing); 532 return ExpressionClause.createAndSetExpression(answer); 533 } 534 535 /** 536 * Creates the <a 537 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 538 * pattern where an expression is evaluated to iterate through each of the 539 * parts of a message and then each part is then send to some endpoint. 540 * Answer from the splitter is produced using given {@link AggregationStrategy} 541 * @param partsExpression the expression on which to split 542 * @param aggregationStrategy the strategy used to aggregate responses for 543 * every part 544 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 545 * @return the builder 546 */ 547 public SplitterType splitter(Expression partsExpression, 548 AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 549 SplitterType answer = new SplitterType(partsExpression); 550 addOutput(answer); 551 answer.setAggregationStrategy(aggregationStrategy); 552 answer.setParallelProcessing(parallelProcessing); 553 return answer; 554 } 555 556 /** 557 * Creates the <a 558 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 559 * pattern where an expression is evaluated to iterate through each of the 560 * parts of a message and then each part is then send to some endpoint. 561 * Answer from the splitter is produced using given {@link AggregationStrategy} 562 * @param aggregationStrategy the strategy used to aggregate responses for 563 * every part 564 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 565 * @return the expression clause for the expression on which to split 566 */ 567 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 568 SplitterType answer = new SplitterType(); 569 addOutput(answer); 570 answer.setAggregationStrategy(aggregationStrategy); 571 answer.setParallelProcessing(parallelProcessing); 572 return ExpressionClause.createAndSetExpression(answer); 573 } 574 575 576 /** 577 * Creates the <a 578 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 579 * pattern where a list of expressions are evaluated to be able to compare 580 * the message exchanges to reorder them. e.g. you may wish to sort by some 581 * headers 582 * 583 * @return the expression clause for the expressions on which to compare messages in order 584 */ 585 public ExpressionClause<ResequencerType> resequencer() { 586 ResequencerType answer = new ResequencerType(); 587 addOutput(answer); 588 ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer); 589 answer.expression(clause); 590 return clause; 591 } 592 593 /** 594 * Creates the <a 595 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 596 * pattern where an expression is evaluated to be able to compare the 597 * message exchanges to reorder them. e.g. you may wish to sort by some 598 * header 599 * 600 * @param expression the expression on which to compare messages in order 601 * @return the builder 602 */ 603 public ResequencerType resequencer(Expression<Exchange> expression) { 604 return resequencer(Collections.<Expression>singletonList(expression)); 605 } 606 607 /** 608 * Creates the <a 609 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 610 * pattern where a list of expressions are evaluated to be able to compare 611 * the message exchanges to reorder them. e.g. you may wish to sort by some 612 * headers 613 * 614 * @param expressions the expressions on which to compare messages in order 615 * @return the builder 616 */ 617 public ResequencerType resequencer(List<Expression> expressions) { 618 ResequencerType answer = new ResequencerType(expressions); 619 addOutput(answer); 620 return answer; 621 } 622 623 /** 624 * Creates the <a 625 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 626 * pattern where a list of expressions are evaluated to be able to compare 627 * the message exchanges to reorder them. e.g. you may wish to sort by some 628 * headers 629 * 630 * @param expressions the expressions on which to compare messages in order 631 * @return the builder 632 */ 633 public ResequencerType resequencer(Expression... expressions) { 634 List<Expression> list = new ArrayList<Expression>(); 635 for (Expression expression : expressions) { 636 list.add(expression); 637 } 638 return resequencer(list); 639 } 640 641 /** 642 * Creates an <a 643 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 644 * pattern where a batch of messages are processed (up to a maximum amount 645 * or until some timeout is reached) and messages for the same correlation 646 * key are combined together using some kind of {@link AggregationStrategy} 647 * (by default the latest message is used) to compress many message exchanges 648 * into a smaller number of exchanges. 649 * <p/> 650 * A good example of this is stock market data; you may be receiving 30,000 651 * messages/second and you may want to throttle it right down so that multiple 652 * messages for the same stock are combined (or just the latest message is used 653 * and older prices are discarded). Another idea is to combine line item messages 654 * together into a single invoice message. 655 */ 656 public ExpressionClause<AggregatorType> aggregator() { 657 AggregatorType answer = new AggregatorType(); 658 addOutput(answer); 659 return ExpressionClause.createAndSetExpression(answer); 660 } 661 662 /** 663 * Creates an <a 664 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 665 * pattern where a batch of messages are processed (up to a maximum amount 666 * or until some timeout is reached) and messages for the same correlation 667 * key are combined together using some kind of {@link AggregationStrategy} 668 * (by default the latest message is used) to compress many message exchanges 669 * into a smaller number of exchanges. 670 * <p/> 671 * A good example of this is stock market data; you may be receiving 30,000 672 * messages/second and you may want to throttle it right down so that multiple 673 * messages for the same stock are combined (or just the latest message is used 674 * and older prices are discarded). Another idea is to combine line item messages 675 * together into a single invoice message. 676 * 677 * @param aggregationStrategy the strategy used for the aggregation 678 */ 679 public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) { 680 AggregatorType answer = new AggregatorType(); 681 answer.setAggregationStrategy(aggregationStrategy); 682 addOutput(answer); 683 return ExpressionClause.createAndSetExpression(answer); 684 } 685 686 /** 687 * Creates an <a 688 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 689 * pattern using a custom aggregation collection implementation. 690 * 691 * @param aggregationCollection the collection used to perform the aggregation 692 */ 693 public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) { 694 AggregatorType answer = new AggregatorType(); 695 answer.setAggregationCollection(aggregationCollection); 696 addOutput(answer); 697 return ExpressionClause.createAndSetExpression(answer); 698 } 699 700 /** 701 * Creates an <a 702 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 703 * pattern where a batch of messages are processed (up to a maximum amount 704 * or until some timeout is reached) and messages for the same correlation 705 * key are combined together using some kind of {@link AggregationStrategy} 706 * (by default the latest message is used) to compress many message exchanges 707 * into a smaller number of exchanges. 708 * <p/> 709 * A good example of this is stock market data; you may be receiving 30,000 710 * messages/second and you may want to throttle it right down so that multiple 711 * messages for the same stock are combined (or just the latest message is used 712 * and older prices are discarded). Another idea is to combine line item messages 713 * together into a single invoice message. 714 * 715 * @param correlationExpression the expression used to calculate the 716 * correlation key. For a JMS message this could be the 717 * expression <code>header("JMSDestination")</code> or 718 * <code>header("JMSCorrelationID")</code> 719 */ 720 public AggregatorType aggregator(Expression correlationExpression) { 721 AggregatorType answer = new AggregatorType(correlationExpression); 722 addOutput(answer); 723 return answer; 724 } 725 726 /** 727 * Creates an <a 728 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 729 * pattern where a batch of messages are processed (up to a maximum amount 730 * or until some timeout is reached) and messages for the same correlation 731 * key are combined together using some kind of {@link AggregationStrategy} 732 * (by default the latest message is used) to compress many message exchanges 733 * into a smaller number of exchanges. 734 * <p/> 735 * A good example of this is stock market data; you may be receiving 30,000 736 * messages/second and you may want to throttle it right down so that multiple 737 * messages for the same stock are combined (or just the latest message is used 738 * and older prices are discarded). Another idea is to combine line item messages 739 * together into a single invoice message. 740 * 741 * @param correlationExpression the expression used to calculate the 742 * correlation key. For a JMS message this could be the 743 * expression <code>header("JMSDestination")</code> or 744 * <code>header("JMSCorrelationID")</code> 745 */ 746 public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 747 AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy); 748 addOutput(answer); 749 return answer; 750 } 751 752 /** 753 * Creates the <a 754 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 755 * where an expression is used to calculate the time which the message will 756 * be dispatched on 757 * 758 * @param processAtExpression an expression to calculate the time at which 759 * the messages should be processed 760 * @return the builder 761 */ 762 public DelayerType delayer(Expression<Exchange> processAtExpression) { 763 return delayer(processAtExpression, 0L); 764 } 765 766 /** 767 * Creates the <a 768 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 769 * where an expression is used to calculate the time which the message will 770 * be dispatched on 771 * 772 * @param processAtExpression an expression to calculate the time at which 773 * the messages should be processed 774 * @param delay the delay in milliseconds which is added to the 775 * processAtExpression to determine the time the message 776 * should be processed 777 * @return the builder 778 */ 779 public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) { 780 DelayerType answer = new DelayerType(processAtExpression, delay); 781 addOutput(answer); 782 return answer; 783 } 784 785 /** 786 * Creates the <a 787 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 788 * where an expression is used to calculate the time which the message will 789 * be dispatched on 790 * @return the expression clause to create the expression 791 */ 792 public ExpressionClause<DelayerType> delayer() { 793 DelayerType answer = new DelayerType(); 794 addOutput(answer); 795 return ExpressionClause.createAndSetExpression(answer); 796 } 797 798 /** 799 * Creates the <a 800 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 801 * where a fixed amount of milliseconds are used to delay processing of a 802 * message exchange 803 * 804 * @param delay the default delay in milliseconds 805 * @return the builder 806 */ 807 public DelayerType delayer(long delay) { 808 return delayer(null, delay); 809 } 810 811 /** 812 * Creates the <a 813 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 814 * where an expression is used to calculate the time which the message will 815 * be dispatched on 816 * 817 * @return the builder 818 */ 819 public ThrottlerType throttler(long maximumRequestCount) { 820 ThrottlerType answer = new ThrottlerType(maximumRequestCount); 821 addOutput(answer); 822 return answer; 823 } 824 825 826 public Type throwFault(Throwable fault) { 827 ThrowFaultType answer = new ThrowFaultType(); 828 answer.setFault(fault); 829 addOutput(answer); 830 return (Type) this; 831 } 832 833 public Type throwFault(String message) { 834 return throwFault(new CamelException(message)); 835 } 836 837 public Type interceptor(String ref) { 838 InterceptorRef interceptor = new InterceptorRef(ref); 839 addInterceptor(interceptor); 840 return (Type) this; 841 } 842 843 844 public Type intercept(DelegateProcessor interceptor) { 845 addInterceptor(new InterceptorRef(interceptor)); 846 lastInterceptor = interceptor; 847 return (Type) this; 848 } 849 850 public InterceptType intercept() { 851 InterceptType answer = new InterceptType(); 852 addOutput(answer); 853 return answer; 854 } 855 856 public void addInterceptor(InterceptorType interceptor) { 857 addOutput(interceptor); 858 pushBlock(interceptor); 859 } 860 861 protected void pushBlock(Block block) { 862 blocks.add(block); 863 } 864 865 protected Block popBlock() { 866 return blocks.isEmpty() ? null : blocks.removeLast(); 867 } 868 869 public Type proceed() { 870 ProceedType proceed = null; 871 if (this instanceof InterceptType) { 872 proceed = ((InterceptType) this).getProceed(); 873 } 874 if (proceed == null) { 875 for (ProcessorType node = parent; node != null; node = node.getParent()) { 876 if (node instanceof InterceptType) { 877 InterceptType intercept = (InterceptType)node; 878 proceed = intercept.getProceed(); 879 break; 880 } 881 } 882 } 883 884 if (this instanceof InterceptType) { 885 proceed = ((InterceptType)this).getProceed(); 886 } 887 888 if (proceed == null) { 889 throw new IllegalArgumentException( 890 "Cannot use proceed() without being within an intercept() block"); 891 } 892 893 addOutput(proceed); 894 return (Type) this; 895 } 896 897 public ExceptionType exception(Class exceptionType) { 898 ExceptionType answer = new ExceptionType(exceptionType); 899 addOutput(answer); 900 return answer; 901 } 902 903 /** 904 * Apply an interceptor route if the predicate is true 905 */ 906 public ChoiceType intercept(Predicate predicate) { 907 InterceptType answer = new InterceptType(); 908 addOutput(answer); 909 return answer.when(predicate); 910 } 911 912 public Type interceptors(String... refs) { 913 for (String ref : refs) { 914 interceptor(ref); 915 } 916 return (Type) this; 917 } 918 919 /** 920 * Trace logs the exchange before it goes to the next processing step using 921 * the {@link #DEFAULT_TRACE_CATEGORY} logging category. 922 */ 923 public Type trace() { 924 return trace(DEFAULT_TRACE_CATEGORY); 925 } 926 927 /** 928 * Trace logs the exchange before it goes to the next processing step using 929 * the specified logging category. 930 * 931 * @param category the logging category trace messages will sent to. 932 */ 933 public Type trace(String category) { 934 final Log log = LogFactory.getLog(category); 935 return intercept(new DelegateProcessor() { 936 @Override 937 public void process(Exchange exchange) throws Exception { 938 log.trace(exchange); 939 processNext(exchange); 940 } 941 }); 942 } 943 944 public PolicyRef policies() { 945 PolicyRef answer = new PolicyRef(); 946 addOutput(answer); 947 return answer; 948 } 949 950 public PolicyRef policy(Policy policy) { 951 PolicyRef answer = new PolicyRef(policy); 952 addOutput(answer); 953 return answer; 954 } 955 956 /** 957 * Forces handling of faults as exceptions 958 * 959 * @return the current builder with the fault handler configured 960 */ 961 public Type handleFault() { 962 HandleFaultType interceptor = new HandleFaultType(); 963 addInterceptor(interceptor); 964 return (Type) this; 965 } 966 967 /** 968 * Installs the given error handler builder 969 * 970 * @param errorHandlerBuilder the error handler to be used by default for 971 * all child routes 972 * @return the current builder with the error handler configured 973 */ 974 public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { 975 setErrorHandlerBuilder(errorHandlerBuilder); 976 return (Type) this; 977 } 978 979 /** 980 * Configures whether or not the error handler is inherited by every 981 * processing node (or just the top most one) 982 * 983 * @param condition the flag as to whether error handlers should be 984 * inherited or not 985 * @return the current builder 986 */ 987 public Type inheritErrorHandler(boolean condition) { 988 setInheritErrorHandlerFlag(condition); 989 return (Type) this; 990 } 991 992 // Transformers 993 // ------------------------------------------------------------------------- 994 995 /** 996 * Adds the custom processor to this destination which could be a final 997 * destination, or could be a transformation in a pipeline 998 */ 999 public Type process(Processor processor) { 1000 ProcessorRef answer = new ProcessorRef(processor); 1001 addOutput(answer); 1002 return (Type) this; 1003 } 1004 1005 /** 1006 * Adds the custom processor reference to this destination which could be a final 1007 * destination, or could be a transformation in a pipeline 1008 */ 1009 public Type processRef(String ref) { 1010 ProcessorRef answer = new ProcessorRef(); 1011 answer.setRef(ref); 1012 addOutput(answer); 1013 return (Type) this; 1014 } 1015 1016 /** 1017 * Adds a bean which is invoked which could be a final destination, or could 1018 * be a transformation in a pipeline 1019 */ 1020 public Type bean(Object bean) { 1021 BeanRef answer = new BeanRef(); 1022 answer.setBean(bean); 1023 addOutput(answer); 1024 return (Type) this; 1025 } 1026 1027 /** 1028 * Adds a bean and method which is invoked which could be a final 1029 * destination, or could be a transformation in a pipeline 1030 */ 1031 public Type bean(Object bean, String method) { 1032 BeanRef answer = new BeanRef(); 1033 answer.setBean(bean); 1034 answer.setMethod(method); 1035 addOutput(answer); 1036 return (Type) this; 1037 } 1038 1039 /** 1040 * Adds a bean by type which is invoked which could be a final destination, or could 1041 * be a transformation in a pipeline 1042 */ 1043 public Type bean(Class beanType) { 1044 BeanRef answer = new BeanRef(); 1045 answer.setBeanType(beanType); 1046 addOutput(answer); 1047 return (Type) this; 1048 } 1049 1050 /** 1051 * Adds a bean type and method which is invoked which could be a final 1052 * destination, or could be a transformation in a pipeline 1053 */ 1054 public Type bean(Class beanType, String method) { 1055 BeanRef answer = new BeanRef(); 1056 answer.setBeanType(beanType); 1057 answer.setMethod(method); 1058 addOutput(answer); 1059 return (Type) this; 1060 } 1061 1062 /** 1063 * Adds a bean which is invoked which could be a final destination, or could 1064 * be a transformation in a pipeline 1065 */ 1066 public Type beanRef(String ref) { 1067 BeanRef answer = new BeanRef(ref); 1068 addOutput(answer); 1069 return (Type) this; 1070 } 1071 1072 /** 1073 * Adds a bean and method which is invoked which could be a final 1074 * destination, or could be a transformation in a pipeline 1075 */ 1076 public Type beanRef(String ref, String method) { 1077 BeanRef answer = new BeanRef(ref, method); 1078 addOutput(answer); 1079 return (Type) this; 1080 } 1081 1082 /** 1083 * Adds a processor which sets the body on the IN message 1084 */ 1085 public ExpressionClause<ProcessorType<Type>> setBody() { 1086 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1087 process(ProcessorBuilder.setBody(clause)); 1088 return clause; 1089 } 1090 1091 /** 1092 * Adds a processor which sets the body on the IN message 1093 */ 1094 public Type setBody(Expression expression) { 1095 return process(ProcessorBuilder.setBody(expression)); 1096 } 1097 1098 /** 1099 * Adds a processor which sets the body on the OUT message 1100 * 1101 * @deprecated Please use {@link #transform(Expression)} instead 1102 */ 1103 public Type setOutBody(Expression expression) { 1104 return transform(expression); 1105 } 1106 1107 /** 1108 * Adds a processor which sets the body on the OUT message 1109 * 1110 * @deprecated Please use {@link #transform()} instead 1111 */ 1112 public ExpressionClause<ProcessorType<Type>> setOutBody() { 1113 return transform(); 1114 } 1115 1116 /** 1117 * Adds a processor which sets the body on the OUT message 1118 */ 1119 public Type transform(Expression expression) { 1120 TransformType answer = new TransformType(expression); 1121 addOutput(answer); 1122 return (Type) this; 1123 } 1124 1125 /** 1126 * Adds a processor which sets the body on the OUT message 1127 */ 1128 public ExpressionClause<ProcessorType<Type>> transform() { 1129 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>( 1130 (Type) this); 1131 TransformType answer = new TransformType(clause); 1132 addOutput(answer); 1133 return clause; 1134 } 1135 1136 /** 1137 * Adds a processor which sets the body on the FAULT message 1138 */ 1139 public Type setFaultBody(Expression expression) { 1140 return process(ProcessorBuilder.setFaultBody(expression)); 1141 } 1142 1143 /** 1144 * Adds a processor which sets the header on the IN message 1145 */ 1146 public ExpressionClause<ProcessorType<Type>> setHeader(String name) { 1147 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1148 SetHeaderType answer = new SetHeaderType(name, clause); 1149 addOutput(answer); 1150 return clause; 1151 } 1152 1153 /** 1154 * Adds a processor which sets the header on the IN message 1155 */ 1156 public Type setHeader(String name, Expression expression) { 1157 SetHeaderType answer = new SetHeaderType(name, expression); 1158 addOutput(answer); 1159 return (Type) this; 1160 } 1161 1162 /** 1163 * Adds a processor which sets the header on the IN message to the given value 1164 */ 1165 public Type setHeader(String name, String value) { 1166 SetHeaderType answer = new SetHeaderType(name, value); 1167 addOutput(answer); 1168 return (Type) this; 1169 } 1170 1171 /** 1172 * Adds a processor which sets the header on the OUT message 1173 */ 1174 public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) { 1175 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1176 process(ProcessorBuilder.setOutHeader(name, clause)); 1177 return clause; 1178 } 1179 1180 /** 1181 * Adds a processor which sets the header on the OUT message 1182 */ 1183 public Type setOutHeader(String name, Expression expression) { 1184 return process(ProcessorBuilder.setOutHeader(name, expression)); 1185 } 1186 1187 /** 1188 * Adds a processor which sets the header on the OUT message 1189 */ 1190 public Type setOutHeader(String name, String value) { 1191 return (Type) setOutHeader(name).constant(value); 1192 } 1193 1194 /** 1195 * Adds a processor which sets the header on the FAULT message 1196 */ 1197 public Type setFaultHeader(String name, Expression expression) { 1198 return process(ProcessorBuilder.setFaultHeader(name, expression)); 1199 } 1200 1201 /** 1202 * Adds a processor which sets the exchange property 1203 */ 1204 public Type setProperty(String name, Expression expression) { 1205 return process(ProcessorBuilder.setProperty(name, expression)); 1206 } 1207 1208 1209 /** 1210 * Adds a processor which sets the exchange property 1211 */ 1212 public ExpressionClause<ProcessorType<Type>> setProperty(String name) { 1213 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1214 process(ProcessorBuilder.setProperty(name, clause)); 1215 return clause; 1216 } 1217 1218 /** 1219 * Adds a processor which removes the header on the IN message 1220 */ 1221 public Type removeHeader(String name) { 1222 return process(ProcessorBuilder.removeHeader(name)); 1223 } 1224 1225 /** 1226 * Adds a processor which removes the header on the OUT message 1227 */ 1228 public Type removeOutHeader(String name) { 1229 return process(ProcessorBuilder.removeOutHeader(name)); 1230 } 1231 1232 /** 1233 * Adds a processor which removes the header on the FAULT message 1234 */ 1235 public Type removeFaultHeader(String name) { 1236 return process(ProcessorBuilder.removeFaultHeader(name)); 1237 } 1238 1239 /** 1240 * Adds a processor which removes the exchange property 1241 */ 1242 public Type removeProperty(String name) { 1243 return process(ProcessorBuilder.removeProperty(name)); 1244 } 1245 1246 /** 1247 * Converts the IN message body to the specified type 1248 */ 1249 public Type convertBodyTo(Class type) { 1250 addOutput(new ConvertBodyType(type)); 1251 return (Type) this; 1252 } 1253 1254 /** 1255 * Converts the OUT message body to the specified type 1256 * 1257 * @deprecated Please use {@link #convertBodyTo(Class)} instead 1258 */ 1259 public Type convertOutBodyTo(Class type) { 1260 // TODO deprecate method? 1261 //return process(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type))); 1262 return process(new ConvertBodyProcessor(type)); 1263 } 1264 1265 /** 1266 * Converts the FAULT message body to the specified type 1267 */ 1268 public Type convertFaultBodyTo(Class type) { 1269 // TODO deprecate method? 1270 //return process(ProcessorBuilder.setFaultBody(Builder.faultBody().convertTo(type))); 1271 return process(new ConvertBodyProcessor(type)); 1272 } 1273 1274 // DataFormat support 1275 // ------------------------------------------------------------------------- 1276 1277 /** 1278 * Unmarshals the in body using a {@link DataFormat} expression to define 1279 * the format of the input message and the output will be set on the out message body. 1280 * 1281 * @return the expression to create the {@link DataFormat} 1282 */ 1283 public DataFormatClause<ProcessorType<Type>> unmarshal() { 1284 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal); 1285 } 1286 1287 /** 1288 * Unmarshals the in body using the specified {@link DataFormat} 1289 * and sets the output on the out message body. 1290 * 1291 * @return this object 1292 */ 1293 public Type unmarshal(DataFormatType dataFormatType) { 1294 addOutput(new UnmarshalType(dataFormatType)); 1295 return (Type) this; 1296 } 1297 1298 /** 1299 * Unmarshals the in body using the specified {@link DataFormat} 1300 * and sets the output on the out message body. 1301 * 1302 * @return this object 1303 */ 1304 public Type unmarshal(DataFormat dataFormat) { 1305 return unmarshal(new DataFormatType(dataFormat)); 1306 } 1307 1308 /** 1309 * Unmarshals the in body using the specified {@link DataFormat} 1310 * reference in the {@link Registry} and sets the output on the out message body. 1311 * 1312 * @return this object 1313 */ 1314 public Type unmarshal(String dataTypeRef) { 1315 addOutput(new UnmarshalType(dataTypeRef)); 1316 return (Type) this; 1317 } 1318 1319 /** 1320 * Marshals the in body using a {@link DataFormat} expression to define 1321 * the format of the output which will be added to the out body. 1322 * 1323 * @return the expression to create the {@link DataFormat} 1324 */ 1325 public DataFormatClause<ProcessorType<Type>> marshal() { 1326 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal); 1327 } 1328 1329 /** 1330 * Marshals the in body using the specified {@link DataFormat} 1331 * and sets the output on the out message body. 1332 * 1333 * @return this object 1334 */ 1335 public Type marshal(DataFormatType dataFormatType) { 1336 addOutput(new MarshalType(dataFormatType)); 1337 return (Type) this; 1338 } 1339 1340 /** 1341 * Marshals the in body using the specified {@link DataFormat} 1342 * and sets the output on the out message body. 1343 * 1344 * @return this object 1345 */ 1346 public Type marshal(DataFormat dataFormat) { 1347 return marshal(new DataFormatType(dataFormat)); 1348 } 1349 1350 /** 1351 * Marshals the in body the specified {@link DataFormat} 1352 * reference in the {@link Registry} and sets the output on the out message body. 1353 * 1354 * @return this object 1355 */ 1356 public Type marshal(String dataTypeRef) { 1357 addOutput(new MarshalType(dataTypeRef)); 1358 return (Type) this; 1359 } 1360 1361 // Properties 1362 // ------------------------------------------------------------------------- 1363 @XmlTransient 1364 public ProcessorType<? extends ProcessorType> getParent() { 1365 return parent; 1366 } 1367 1368 public void setParent(ProcessorType<? extends ProcessorType> parent) { 1369 this.parent = parent; 1370 } 1371 1372 @XmlTransient 1373 public ErrorHandlerBuilder getErrorHandlerBuilder() { 1374 if (errorHandlerBuilder == null) { 1375 errorHandlerBuilder = createErrorHandlerBuilder(); 1376 } 1377 return errorHandlerBuilder; 1378 } 1379 1380 /** 1381 * Sets the error handler to use with processors created by this builder 1382 */ 1383 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { 1384 this.errorHandlerBuilder = errorHandlerBuilder; 1385 } 1386 1387 @XmlTransient 1388 public boolean isInheritErrorHandler() { 1389 return ObjectConverter.toBoolean(getInheritErrorHandlerFlag()); 1390 } 1391 1392 @XmlAttribute(name = "inheritErrorHandler", required = false) 1393 public Boolean getInheritErrorHandlerFlag() { 1394 return inheritErrorHandlerFlag; 1395 } 1396 1397 public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) { 1398 this.inheritErrorHandlerFlag = inheritErrorHandlerFlag; 1399 } 1400 1401 @XmlTransient 1402 public NodeFactory getNodeFactory() { 1403 if (nodeFactory == null) { 1404 nodeFactory = new NodeFactory(); 1405 } 1406 return nodeFactory; 1407 } 1408 1409 public void setNodeFactory(NodeFactory nodeFactory) { 1410 this.nodeFactory = nodeFactory; 1411 } 1412 1413 /** 1414 * Returns a label to describe this node such as the expression if some kind of expression node 1415 */ 1416 public String getLabel() { 1417 return ""; 1418 } 1419 1420 // Implementation methods 1421 // ------------------------------------------------------------------------- 1422 1423 /** 1424 * Creates the processor and wraps it in any necessary interceptors and 1425 * error handlers 1426 */ 1427 protected Processor makeProcessor(RouteContext routeContext) throws Exception { 1428 Processor processor = createProcessor(routeContext); 1429 return wrapProcessor(routeContext, processor); 1430 } 1431 1432 /** 1433 * A strategy method which allows derived classes to wrap the child 1434 * processor in some kind of interceptor 1435 * 1436 * @param routeContext 1437 * @param target the processor which can be wrapped 1438 * @return the original processor or a new wrapped interceptor 1439 */ 1440 protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception { 1441 // The target is required. 1442 if (target == null) { 1443 throw new RuntimeCamelException("target provided."); 1444 } 1445 1446 // Interceptors are optional 1447 DelegateProcessor first = null; 1448 DelegateProcessor last = null; 1449 /* 1450 1451 List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute() 1452 .getInterceptors()); 1453 List<InterceptorType> list = getInterceptors(); 1454 for (InterceptorType interceptorType : list) { 1455 if (!interceptors.contains(interceptorType)) { 1456 interceptors.add(interceptorType); 1457 } 1458 } 1459 for (InterceptorType interceptorRef : interceptors) { 1460 DelegateProcessor p = interceptorRef.createInterceptor(routeContext); 1461 if (first == null) { 1462 first = p; 1463 } 1464 if (last != null) { 1465 last.setProcessor(p); 1466 } 1467 last = p; 1468 } 1469 1470 if (last != null) { 1471 last.setProcessor(target); 1472 } 1473 */ 1474 return first == null ? target : first; 1475 } 1476 1477 /** 1478 * A strategy method to allow newly created processors to be wrapped in an 1479 * error handler. 1480 */ 1481 protected Processor wrapInErrorHandler(Processor processor) throws Exception { 1482 return getErrorHandlerBuilder().createErrorHandler(processor); 1483 } 1484 1485 protected ErrorHandlerBuilder createErrorHandlerBuilder() { 1486 if (isInheritErrorHandler()) { 1487 return new DeadLetterChannelBuilder(); 1488 } else { 1489 return new NoErrorHandlerBuilder(); 1490 } 1491 } 1492 1493 protected void configureChild(ProcessorType output) { 1494 output.setNodeFactory(getNodeFactory()); 1495 } 1496 1497 public void addOutput(ProcessorType processorType) { 1498 processorType.setParent(this); 1499 configureChild(processorType); 1500 if (blocks.isEmpty()) { 1501 getOutputs().add(processorType); 1502 } else { 1503 Block block = blocks.getLast(); 1504 block.addOutput(processorType); 1505 } 1506 } 1507 1508 /** 1509 * Creates a new instance of some kind of composite processor which defaults 1510 * to using a {@link Pipeline} but derived classes could change the 1511 * behaviour 1512 */ 1513 protected Processor createCompositeProcessor(List<Processor> list) { 1514 // return new MulticastProcessor(list); 1515 return new Pipeline(list); 1516 } 1517 1518 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs) 1519 throws Exception { 1520 List<Processor> list = new ArrayList<Processor>(); 1521 for (ProcessorType output : outputs) { 1522 Processor processor = output.createProcessor(routeContext); 1523 list.add(processor); 1524 } 1525 Processor processor = null; 1526 if (!list.isEmpty()) { 1527 if (list.size() == 1) { 1528 processor = list.get(0); 1529 } else { 1530 processor = createCompositeProcessor(list); 1531 } 1532 } 1533 return processor; 1534 } 1535 1536 public void clearOutput() { 1537 getOutputs().clear(); 1538 blocks.clear(); 1539 } 1540 }