001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 020 import org.apache.camel.Endpoint; 021 import org.apache.camel.Exchange; 022 import org.apache.camel.ExchangePattern; 023 import org.apache.camel.Message; 024 import org.apache.camel.Processor; 025 import org.apache.camel.Producer; 026 import org.apache.camel.impl.ServiceSupport; 027 import org.apache.camel.model.RoutingSlipType; 028 import org.apache.camel.util.CollectionStringBuffer; 029 import org.apache.camel.util.ExchangeHelper; 030 import org.apache.camel.util.ProducerCache; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 import static org.apache.camel.util.ObjectHelper.notNull; 035 036 /** 037 * Implements a <a 038 * href="http://activemq.apache.org/camel/routing-slip.html">Routing Slip</a> 039 * pattern where the list of actual endpoints to send a message exchange to are 040 * dependent on the value of a message header. 041 */ 042 public class RoutingSlip extends ServiceSupport implements Processor { 043 private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class); 044 private final String header; 045 private final String uriDelimiter; 046 047 private ProducerCache<Exchange> producerCache = new ProducerCache<Exchange>(); 048 049 public RoutingSlip(String header) { 050 this(header, RoutingSlipType.DEFAULT_DELIMITER); 051 } 052 053 public RoutingSlip(String header, String uriDelimiter) { 054 notNull(header, "header"); 055 notNull(uriDelimiter, "uriDelimiter"); 056 057 this.header = header; 058 this.uriDelimiter = uriDelimiter; 059 } 060 061 @Override 062 public String toString() { 063 return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]"; 064 } 065 066 public void process(Exchange exchange) throws Exception { 067 Message message = exchange.getIn(); 068 String[] recipients = recipients(message); 069 Exchange current = exchange; 070 071 for (String nextRecipient : recipients) { 072 Endpoint<Exchange> endpoint = resolveEndpoint(exchange, nextRecipient); 073 Producer<Exchange> producer = producerCache.getProducer(endpoint); 074 Exchange ex = endpoint.createExchange(ExchangePattern.InOut); 075 076 updateRoutingSlip(current); 077 copyOutToIn(ex, current); 078 079 producer.process(ex); 080 081 current = ex; 082 } 083 ExchangeHelper.copyResults(exchange, current); 084 } 085 086 protected Endpoint<Exchange> resolveEndpoint(Exchange exchange, Object recipient) { 087 return ExchangeHelper.resolveEndpoint(exchange, recipient); 088 } 089 090 protected void doStop() throws Exception { 091 producerCache.stop(); 092 } 093 094 protected void doStart() throws Exception { 095 } 096 097 private void updateRoutingSlip(Exchange current) { 098 Message message = getResultMessage(current); 099 message.setHeader(header, removeFirstElement(recipients(message))); 100 } 101 102 /** 103 * Returns the outbound message if available. Otherwise return the inbound 104 * message. 105 */ 106 private Message getResultMessage(Exchange exchange) { 107 Message message = exchange.getOut(false); 108 // if this endpoint had no out (like a mock endpoint) 109 // just take the in 110 if (message == null) { 111 message = exchange.getIn(); 112 } 113 return message; 114 } 115 116 /** 117 * Return the list of recipients defined in the routing slip in the 118 * specified message. 119 */ 120 private String[] recipients(Message message) { 121 Object headerValue = message.getHeader(header); 122 if (headerValue != null && !headerValue.equals("")) { 123 return headerValue.toString().split(uriDelimiter); 124 } 125 return new String[] {}; 126 } 127 128 /** 129 * Return a string representation of the element list with the first element 130 * removed. 131 */ 132 private String removeFirstElement(String[] elements) { 133 CollectionStringBuffer updatedElements = new CollectionStringBuffer(uriDelimiter); 134 for (int i = 1; i < elements.length; i++) { 135 updatedElements.append(elements[i]); 136 } 137 return updatedElements.toString(); 138 } 139 140 /** 141 * Copy the outbound data in 'source' to the inbound data in 'result'. 142 */ 143 private void copyOutToIn(Exchange result, Exchange source) { 144 result.setException(source.getException()); 145 146 Message fault = source.getFault(false); 147 if (fault != null) { 148 result.getFault(true).copyFrom(fault); 149 } 150 151 result.setIn(getResultMessage(source)); 152 153 result.getProperties().clear(); 154 result.getProperties().putAll(source.getProperties()); 155 } 156 }