001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Collection; 020 import java.util.Iterator; 021 import java.util.LinkedList; 022 import java.util.Queue; 023 import java.util.concurrent.TimeUnit; 024 import java.util.concurrent.locks.Condition; 025 import java.util.concurrent.locks.Lock; 026 import java.util.concurrent.locks.ReentrantLock; 027 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Processor; 030 import org.apache.camel.impl.LoggingExceptionHandler; 031 import org.apache.camel.impl.ServiceSupport; 032 import org.apache.camel.spi.ExceptionHandler; 033 import org.apache.camel.util.ServiceHelper; 034 035 /** 036 * A base class for any kind of {@link Processor} which implements some kind of batch processing. 037 * 038 * @version $Revision: 13212 $ 039 */ 040 public class BatchProcessor extends ServiceSupport implements Processor { 041 042 public static final long DEFAULT_BATCH_TIMEOUT = 1000L; 043 public static final int DEFAULT_BATCH_SIZE = 100; 044 045 private long batchTimeout = DEFAULT_BATCH_TIMEOUT; 046 private int batchSize = DEFAULT_BATCH_SIZE; 047 private int outBatchSize; 048 049 private Processor processor; 050 private Collection<Exchange> collection; 051 private ExceptionHandler exceptionHandler; 052 053 private BatchSender sender; 054 055 public BatchProcessor(Processor processor, Collection<Exchange> collection) { 056 this.processor = processor; 057 this.collection = collection; 058 this.sender = new BatchSender(); 059 } 060 061 @Override 062 public String toString() { 063 return "BatchProcessor[to: " + processor + "]"; 064 } 065 066 // Properties 067 // ------------------------------------------------------------------------- 068 public ExceptionHandler getExceptionHandler() { 069 if (exceptionHandler == null) { 070 exceptionHandler = new LoggingExceptionHandler(getClass()); 071 } 072 return exceptionHandler; 073 } 074 075 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 076 this.exceptionHandler = exceptionHandler; 077 } 078 079 public int getBatchSize() { 080 return batchSize; 081 } 082 083 /** 084 * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will 085 * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}. 086 * 087 * @param batchSize the size 088 */ 089 public void setBatchSize(int batchSize) { 090 // setting batch size to 0 or negative is like disabling it, so we set it as the max value 091 // as the code logic is dependt on a batch size having 1..n value 092 if (batchSize <= 0) { 093 this.batchSize = Integer.MAX_VALUE; 094 } else { 095 this.batchSize = batchSize; 096 } 097 } 098 099 public int getOutBatchSize() { 100 return outBatchSize; 101 } 102 103 /** 104 * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the 105 * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain 106 * number of exchanges has been collected. By default this feature is <b>not</b> enabled. 107 * 108 * @param outBatchSize the size 109 */ 110 public void setOutBatchSize(int outBatchSize) { 111 this.outBatchSize = outBatchSize; 112 } 113 114 public long getBatchTimeout() { 115 return batchTimeout; 116 } 117 118 public void setBatchTimeout(long batchTimeout) { 119 this.batchTimeout = batchTimeout; 120 } 121 122 public Processor getProcessor() { 123 return processor; 124 } 125 126 /** 127 * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in 128 * the in queue should be drained to the "out" collection. 129 */ 130 private boolean isInBatchCompleted(int num) { 131 return num >= batchSize; 132 } 133 134 /** 135 * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in 136 * the out collection should be sent. 137 */ 138 private boolean isOutBatchCompleted() { 139 if (outBatchSize == 0) { 140 // out batch is disabled, so go ahead and send. 141 return true; 142 } 143 return collection.size() > 0 && collection.size() >= outBatchSize; 144 } 145 146 /** 147 * Strategy Method to process an exchange in the batch. This method allows derived classes to perform 148 * custom processing before or after an individual exchange is processed 149 */ 150 protected void processExchange(Exchange exchange) throws Exception { 151 processor.process(exchange); 152 } 153 154 protected void doStart() throws Exception { 155 ServiceHelper.startServices(processor); 156 sender.start(); 157 } 158 159 protected void doStop() throws Exception { 160 sender.cancel(); 161 ServiceHelper.stopServices(processor); 162 collection.clear(); 163 } 164 165 /** 166 * Enqueues an exchange for later batch processing. 167 */ 168 public void process(Exchange exchange) throws Exception { 169 sender.enqueueExchange(exchange); 170 } 171 172 /** 173 * Sender thread for queued-up exchanges. 174 */ 175 private class BatchSender extends Thread { 176 177 private Queue<Exchange> queue; 178 private Lock queueLock = new ReentrantLock(); 179 private boolean exchangeEnqueued; 180 private Condition exchangeEnqueuedCondition = queueLock.newCondition(); 181 182 public BatchSender() { 183 super("Batch Sender"); 184 this.queue = new LinkedList<Exchange>(); 185 } 186 187 @Override 188 public void run() { 189 // Wait until one of either: 190 // * an exchange being queued; 191 // * the batch timeout expiring; or 192 // * the thread being cancelled. 193 // 194 // If an exchange is queued then we need to determine whether the 195 // batch is complete. If it is complete then we send out the batched 196 // exchanges. Otherwise we move back into our wait state. 197 // 198 // If the batch times out then we send out the batched exchanges 199 // collected so far. 200 // 201 // If we receive an interrupt then all blocking operations are 202 // interrupted and our thread terminates. 203 // 204 // The goal of the following algorithm in terms of synchronisation 205 // is to provide fine grained locking i.e. retaining the lock only 206 // when required. Special consideration is given to releasing the 207 // lock when calling an overloaded method i.e. sendExchanges. 208 // Unlocking is important as the process of sending out the exchanges 209 // would otherwise block new exchanges from being queued. 210 211 queueLock.lock(); 212 try { 213 do { 214 try { 215 if (!exchangeEnqueued) { 216 exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); 217 } 218 219 if (!exchangeEnqueued) { 220 drainQueueTo(collection, batchSize); 221 } else { 222 exchangeEnqueued = false; 223 while (isInBatchCompleted(queue.size())) { 224 drainQueueTo(collection, batchSize); 225 } 226 227 if (!isOutBatchCompleted()) { 228 continue; 229 } 230 } 231 232 queueLock.unlock(); 233 try { 234 try { 235 sendExchanges(); 236 } catch (Exception e) { 237 getExceptionHandler().handleException(e); 238 } 239 } finally { 240 queueLock.lock(); 241 } 242 243 } catch (InterruptedException e) { 244 break; 245 } 246 247 } while (true); 248 249 } finally { 250 queueLock.unlock(); 251 } 252 } 253 254 /** 255 * This method should be called with queueLock held 256 */ 257 private void drainQueueTo(Collection<Exchange> collection, int batchSize) { 258 for (int i = 0; i < batchSize; ++i) { 259 Exchange e = queue.poll(); 260 if (e != null) { 261 collection.add(e); 262 } else { 263 break; 264 } 265 } 266 } 267 268 public void cancel() { 269 interrupt(); 270 } 271 272 public void enqueueExchange(Exchange exchange) { 273 queueLock.lock(); 274 try { 275 queue.add(exchange); 276 exchangeEnqueued = true; 277 exchangeEnqueuedCondition.signal(); 278 } finally { 279 queueLock.unlock(); 280 } 281 } 282 283 private void sendExchanges() throws Exception { 284 Iterator<Exchange> iter = collection.iterator(); 285 while (iter.hasNext()) { 286 Exchange exchange = iter.next(); 287 iter.remove(); 288 processExchange(exchange); 289 } 290 } 291 } 292 293 }