001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.LinkedList; 022 import java.util.List; 023 import java.util.concurrent.ArrayBlockingQueue; 024 import java.util.concurrent.RejectedExecutionException; 025 import java.util.concurrent.RejectedExecutionHandler; 026 import java.util.concurrent.ThreadPoolExecutor; 027 import java.util.concurrent.TimeUnit; 028 import java.util.concurrent.atomic.AtomicBoolean; 029 030 import org.apache.camel.AsyncCallback; 031 import org.apache.camel.Endpoint; 032 import org.apache.camel.Exchange; 033 import org.apache.camel.Processor; 034 import org.apache.camel.impl.ServiceSupport; 035 import org.apache.camel.processor.aggregate.AggregationStrategy; 036 import org.apache.camel.util.ExchangeHelper; 037 import org.apache.camel.util.ServiceHelper; 038 import org.apache.camel.util.concurrent.AtomicExchange; 039 import org.apache.camel.util.concurrent.CountingLatch; 040 041 import static org.apache.camel.util.ObjectHelper.notNull; 042 043 /** 044 * Implements the Multicast pattern to send a message exchange to a number of 045 * endpoints, each endpoint receiving a copy of the message exchange. 046 * 047 * @see Pipeline 048 * @version $Revision: 63283 $ 049 */ 050 public class MulticastProcessor extends ServiceSupport implements Processor { 051 static class ProcessorExchangePair { 052 private final Processor processor; 053 private final Exchange exchange; 054 055 public ProcessorExchangePair(Processor processor, Exchange exchange) { 056 this.processor = processor; 057 this.exchange = exchange; 058 } 059 060 public Processor getProcessor() { 061 return processor; 062 } 063 064 public Exchange getExchange() { 065 return exchange; 066 } 067 } 068 069 private Collection<Processor> processors; 070 private AggregationStrategy aggregationStrategy; 071 private boolean isParallelProcessing; 072 private ThreadPoolExecutor executor; 073 private final boolean streaming; 074 private final AtomicBoolean shutdown = new AtomicBoolean(true); 075 076 public MulticastProcessor(Collection<Processor> processors) { 077 this(processors, null); 078 } 079 080 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) { 081 this(processors, aggregationStrategy, false, null); 082 } 083 084 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) { 085 this(processors, aggregationStrategy, parallelProcessing, executor, false); 086 } 087 088 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor, boolean streaming) { 089 notNull(processors, "processors"); 090 this.processors = processors; 091 this.aggregationStrategy = aggregationStrategy; 092 this.isParallelProcessing = parallelProcessing; 093 if (isParallelProcessing) { 094 if (executor != null) { 095 this.executor = executor; 096 } else { 097 // setup default Executor 098 this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size())); 099 } 100 } 101 this.streaming = streaming; 102 } 103 104 /** 105 * A helper method to convert a list of endpoints into a list of processors 106 */ 107 public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints) 108 throws Exception { 109 Collection<Processor> answer = new ArrayList<Processor>(); 110 for (Endpoint endpoint : endpoints) { 111 answer.add(endpoint.createProducer()); 112 } 113 return answer; 114 } 115 116 @Override 117 public String toString() { 118 return "Multicast" + getProcessors(); 119 } 120 121 class ProcessCall implements Runnable { 122 private final Exchange exchange; 123 private final AsyncCallback callback; 124 private final Processor processor; 125 126 public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) { 127 this.exchange = exchange; 128 this.callback = callback; 129 this.processor = processor; 130 } 131 132 public void run() { 133 if (shutdown.get()) { 134 exchange.setException(new RejectedExecutionException()); 135 callback.done(false); 136 } else { 137 try { 138 processor.process(exchange); 139 } catch (Exception ex) { 140 exchange.setException(ex); 141 } 142 callback.done(false); 143 } 144 } 145 } 146 147 public void process(Exchange exchange) throws Exception { 148 final AtomicExchange result = new AtomicExchange(); 149 150 Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange); 151 152 // Parallel Processing the producer 153 if (isParallelProcessing) { 154 List<Exchange> exchanges = new LinkedList<Exchange>(); 155 final CountingLatch completedExchanges = new CountingLatch(); 156 int i = 0; 157 for (ProcessorExchangePair pair : pairs) { 158 Processor producer = pair.getProcessor(); 159 final Exchange subExchange = pair.getExchange(); 160 updateNewExchange(subExchange, i, pairs); 161 exchanges.add(subExchange); 162 completedExchanges.increment(); 163 ProcessCall call = new ProcessCall(subExchange, producer, new AsyncCallback() { 164 public void done(boolean doneSynchronously) { 165 if (streaming && aggregationStrategy != null) { 166 doAggregate(result, subExchange); 167 } 168 completedExchanges.decrement(); 169 } 170 171 }); 172 executor.execute(call); 173 i++; 174 } 175 completedExchanges.await(); 176 if (!streaming && aggregationStrategy != null) { 177 for (Exchange resultExchange : exchanges) { 178 doAggregate(result, resultExchange); 179 } 180 } 181 182 } else { 183 // we call the producer one by one sequentially 184 int i = 0; 185 for (ProcessorExchangePair pair : pairs) { 186 Processor producer = pair.getProcessor(); 187 Exchange subExchange = pair.getExchange(); 188 updateNewExchange(subExchange, i, pairs); 189 190 producer.process(subExchange); 191 doAggregate(result, subExchange); 192 i++; 193 } 194 } 195 if (result.get() != null) { 196 ExchangeHelper.copyResults(exchange, result.get()); 197 } 198 } 199 200 /** 201 * Aggregate the {@link Exchange} with the current result 202 * 203 * @param result the current result 204 * @param exchange the exchange to be added to the result 205 */ 206 protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) { 207 if (aggregationStrategy != null) { 208 if (result.get() == null) { 209 result.set(exchange); 210 } else { 211 result.set(aggregationStrategy.aggregate(result.get(), exchange)); 212 } 213 } 214 } 215 216 protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> allPairs) { 217 // No updates needed 218 } 219 220 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) { 221 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); 222 Processor[] processorsArray = processors.toArray(new Processor[processors.size()]); 223 for (int i = 0; i < processorsArray.length; i++) { 224 result.add(new ProcessorExchangePair(processorsArray[i], exchange.copy())); 225 } 226 return result; 227 } 228 229 protected void doStop() throws Exception { 230 shutdown.set(true); 231 if (executor != null) { 232 executor.shutdown(); 233 executor.awaitTermination(0, TimeUnit.SECONDS); 234 } 235 ServiceHelper.stopServices(processors); 236 } 237 238 protected void doStart() throws Exception { 239 shutdown.set(false); 240 if (executor != null) { 241 executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { 242 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 243 ProcessCall call = (ProcessCall)runnable; 244 call.exchange.setException(new RejectedExecutionException()); 245 call.callback.done(false); 246 } 247 }); 248 } 249 ServiceHelper.startServices(processors); 250 } 251 252 /** 253 * Is the multicast processor working in streaming mode? 254 * 255 * In streaming mode: 256 * <ul> 257 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li> 258 * <li>for parallel processing, we start aggregating responses as they get send back to the processor; 259 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li> 260 * </ul> 261 */ 262 protected boolean isStreaming() { 263 return streaming; 264 } 265 266 /** 267 * Returns the producers to multicast to 268 */ 269 public Collection<Processor> getProcessors() { 270 return processors; 271 } 272 273 public AggregationStrategy getAggregationStrategy() { 274 return aggregationStrategy; 275 } 276 }