001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.Queue;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.locks.Condition;
025    import java.util.concurrent.locks.Lock;
026    import java.util.concurrent.locks.ReentrantLock;
027    
028    import org.apache.camel.Exchange;
029    import org.apache.camel.Processor;
030    import org.apache.camel.impl.LoggingExceptionHandler;
031    import org.apache.camel.impl.ServiceSupport;
032    import org.apache.camel.spi.ExceptionHandler;
033    import org.apache.camel.util.ServiceHelper;
034    
035    /**
036     * A base class for any kind of {@link Processor} which implements some kind of batch processing.
037     * 
038     * @version $Revision: 13212 $
039     */
040    public class BatchProcessor extends ServiceSupport implements Processor {
041    
042        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
043        public static final int DEFAULT_BATCH_SIZE = 100;
044    
045        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
046        private int batchSize = DEFAULT_BATCH_SIZE;
047        private int outBatchSize;
048    
049        private Processor processor;
050        private Collection<Exchange> collection;
051        private ExceptionHandler exceptionHandler;
052    
053        private BatchSender sender;
054        
055        public BatchProcessor(Processor processor, Collection<Exchange> collection) {
056            this.processor = processor;
057            this.collection = collection;
058            this.sender = new BatchSender();
059        }
060    
061        @Override
062        public String toString() {
063            return "BatchProcessor[to: " + processor + "]";
064        }
065    
066        // Properties
067        // -------------------------------------------------------------------------
068        public ExceptionHandler getExceptionHandler() {
069            if (exceptionHandler == null) {
070                exceptionHandler = new LoggingExceptionHandler(getClass());
071            }
072            return exceptionHandler;
073        }
074    
075        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
076            this.exceptionHandler = exceptionHandler;
077        }
078    
079        public int getBatchSize() {
080            return batchSize;
081        }
082    
083        /**
084         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
085         * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
086         * 
087         * @param batchSize the size
088         */
089        public void setBatchSize(int batchSize) {
090            // setting batch size to 0 or negative is like disabling it, so we set it as the max value
091            // as the code logic is dependt on a batch size having 1..n value
092            if (batchSize <= 0) {
093                this.batchSize = Integer.MAX_VALUE;
094            } else {
095                this.batchSize = batchSize;
096            }
097        }
098    
099        public int getOutBatchSize() {
100            return outBatchSize;
101        }
102    
103        /**
104         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
105         * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
106         * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
107         * 
108         * @param outBatchSize the size
109         */
110        public void setOutBatchSize(int outBatchSize) {
111            this.outBatchSize = outBatchSize;
112        }
113    
114        public long getBatchTimeout() {
115            return batchTimeout;
116        }
117    
118        public void setBatchTimeout(long batchTimeout) {
119            this.batchTimeout = batchTimeout;
120        }
121    
122        public Processor getProcessor() {
123            return processor;
124        }
125    
126        /**
127         * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
128         * the in queue should be drained to the "out" collection.
129         */
130        private boolean isInBatchCompleted(int num) {
131            return num >= batchSize;
132        }
133    
134        /**
135         * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
136         * the out collection should be sent.
137         */
138        private boolean isOutBatchCompleted() {
139            if (outBatchSize == 0) {
140                // out batch is disabled, so go ahead and send.
141                return true;
142            }
143            return collection.size() > 0 && collection.size() >= outBatchSize;
144        }
145    
146        /**
147         * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
148         * custom processing before or after an individual exchange is processed
149         */
150        protected void processExchange(Exchange exchange) throws Exception {
151            processor.process(exchange);
152        }
153    
154        protected void doStart() throws Exception {
155            ServiceHelper.startServices(processor);
156            sender.start();
157        }
158    
159        protected void doStop() throws Exception {
160            sender.cancel();
161            ServiceHelper.stopServices(processor);
162            collection.clear();
163        }
164    
165        /**
166         * Enqueues an exchange for later batch processing.
167         */
168        public void process(Exchange exchange) throws Exception {
169            sender.enqueueExchange(exchange);
170        }
171    
172        /**
173         * Sender thread for queued-up exchanges.
174         */
175        private class BatchSender extends Thread {
176    
177            private Queue<Exchange> queue;
178            private Lock queueLock = new ReentrantLock();
179            private boolean exchangeEnqueued;
180            private Condition exchangeEnqueuedCondition = queueLock.newCondition();
181    
182            public BatchSender() {
183                super("Batch Sender");
184                this.queue = new LinkedList<Exchange>();
185            }
186    
187            @Override
188            public void run() {
189                // Wait until one of either:
190                // * an exchange being queued;
191                // * the batch timeout expiring; or
192                // * the thread being cancelled.
193                //
194                // If an exchange is queued then we need to determine whether the
195                // batch is complete. If it is complete then we send out the batched
196                // exchanges. Otherwise we move back into our wait state.
197                //
198                // If the batch times out then we send out the batched exchanges
199                // collected so far.
200                //
201                // If we receive an interrupt then all blocking operations are
202                // interrupted and our thread terminates.
203                //
204                // The goal of the following algorithm in terms of synchronisation
205                // is to provide fine grained locking i.e. retaining the lock only
206                // when required. Special consideration is given to releasing the
207                // lock when calling an overloaded method i.e. sendExchanges. 
208                // Unlocking is important as the process of sending out the exchanges
209                // would otherwise block new exchanges from being queued.
210    
211                queueLock.lock();
212                try {
213                    do {
214                        try {
215                            if (!exchangeEnqueued) {
216                                exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
217                            }
218    
219                            if (!exchangeEnqueued) {
220                                drainQueueTo(collection, batchSize);
221                            } else {             
222                                exchangeEnqueued = false;
223                                while (isInBatchCompleted(queue.size())) {   
224                                    drainQueueTo(collection, batchSize);
225                                }
226                                
227                                if (!isOutBatchCompleted()) {
228                                    continue;
229                                }
230                            }
231    
232                            queueLock.unlock();
233                            try {
234                                try {
235                                    sendExchanges();
236                                } catch (Exception e) {
237                                    getExceptionHandler().handleException(e);
238                                }
239                            } finally {
240                                queueLock.lock();
241                            }
242    
243                        } catch (InterruptedException e) {
244                            break;
245                        }
246    
247                    } while (true);
248    
249                } finally {
250                    queueLock.unlock();
251                }
252            }
253    
254            /**
255             * This method should be called with queueLock held
256             */
257            private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
258                for (int i = 0; i < batchSize; ++i) {
259                    Exchange e = queue.poll();
260                    if (e != null) {
261                        collection.add(e);
262                    } else {
263                        break;
264                    }
265                }
266            }
267    
268            public void cancel() {
269                interrupt();
270            }
271    
272            public void enqueueExchange(Exchange exchange) {
273                queueLock.lock();
274                try {
275                    queue.add(exchange);
276                    exchangeEnqueued = true;
277                    exchangeEnqueuedCondition.signal();
278                } finally {
279                    queueLock.unlock();
280                }
281            }
282    
283            private void sendExchanges() throws Exception {
284                Iterator<Exchange> iter = collection.iterator();
285                while (iter.hasNext()) {
286                    Exchange exchange = iter.next();
287                    iter.remove();
288                    processExchange(exchange);
289                }
290            }
291        }
292    
293    }