001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.concurrent.LinkedBlockingQueue;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.impl.LoggingExceptionHandler;
026    import org.apache.camel.impl.ServiceSupport;
027    import org.apache.camel.spi.ExceptionHandler;
028    import org.apache.camel.util.ServiceHelper;
029    
030    /**
031     * A base class for any kind of {@link Processor} which implements some kind of
032     * batch processing.
033     * 
034     * @version $Revision: 64696 $
035     */
036    public class BatchProcessor extends ServiceSupport implements Processor {
037        
038        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
039        public static final int DEFAULT_BATCH_SIZE = 100;
040    
041        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
042        private int batchSize = DEFAULT_BATCH_SIZE;
043        private int outBatchSize;
044    
045        private Processor processor;
046        private Collection<Exchange> collection;
047        private ExceptionHandler exceptionHandler;
048    
049        private BatchSender sender;
050        
051        public BatchProcessor(Processor processor, Collection<Exchange> collection) {
052            this.processor = processor;
053            this.collection = collection;
054            this.sender = new BatchSender();
055        }
056    
057        @Override
058        public String toString() {
059            return "BatchProcessor[to: " + processor + "]";
060        }
061    
062        // Properties
063        // -------------------------------------------------------------------------
064        public ExceptionHandler getExceptionHandler() {
065            if (exceptionHandler == null) {
066                exceptionHandler = new LoggingExceptionHandler(getClass());
067            }
068            return exceptionHandler;
069        }
070    
071        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
072            this.exceptionHandler = exceptionHandler;
073        }
074    
075        public int getBatchSize() {
076            return batchSize;
077        }
078    
079        /**
080         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor
081         * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
082         *
083         * @param batchSize the size
084         */
085        public void setBatchSize(int batchSize) {
086            this.batchSize = batchSize;
087        }
088    
089        public int getOutBatchSize() {
090            return outBatchSize;
091        }
092    
093        /**
094         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
095         * the completion is triggered. Can for instance be used to ensure that this batch is completed when
096         * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled.
097         *
098         * @param outBatchSize the size
099         */
100        public void setOutBatchSize(int outBatchSize) {
101            this.outBatchSize = outBatchSize;
102        }
103    
104        public long getBatchTimeout() {
105            return batchTimeout;
106        }
107    
108        public void setBatchTimeout(long batchTimeout) {
109            this.batchTimeout = batchTimeout;
110        }
111    
112        public Processor getProcessor() {
113            return processor;
114        }
115    
116        /**
117         * A strategy method to decide if the "in" batch is completed.  That is, whether the resulting 
118         * exchanges in the in queue should be drained to the "out" collection.
119         */
120        protected boolean isInBatchCompleted(int num) {
121            return num >= batchSize;
122        }
123        
124        /**
125         * A strategy method to decide if the "out" batch is completed. That is, whether the resulting 
126         * exchange in the out collection should be sent.
127         */
128        protected boolean isOutBatchCompleted() {
129            if (outBatchSize == 0) {
130                // out batch is disabled, so go ahead and send.
131                return true;
132            }
133            return collection.size() > 0 && collection.size() >= outBatchSize;
134        }
135    
136        /**
137         * Strategy Method to process an exchange in the batch. This method allows
138         * derived classes to perform custom processing before or after an
139         * individual exchange is processed
140         */
141        protected void processExchange(Exchange exchange) throws Exception {
142            processor.process(exchange);
143        }
144    
145        protected void doStart() throws Exception {
146            ServiceHelper.startServices(processor);
147            sender.start();
148        }
149    
150        protected void doStop() throws Exception {
151            sender.cancel();
152            ServiceHelper.stopServices(processor);
153            collection.clear();
154        }
155    
156        protected Collection<Exchange> getCollection() {
157            return collection;
158        }
159    
160        /**
161         * Enqueues an exchange for later batch processing.
162         */
163        public void process(Exchange exchange) throws Exception {
164            sender.enqueueExchange(exchange);
165        }
166    
167        /**
168         * Sender thread for queued-up exchanges.
169         */
170        private class BatchSender extends Thread {
171            
172            private volatile boolean cancelRequested;
173    
174            private LinkedBlockingQueue<Exchange> queue;
175            
176            public BatchSender() {
177                super("Batch Sender");
178                this.queue = new LinkedBlockingQueue<Exchange>();
179            }
180    
181            @Override
182            public void run() {
183                while (true) {
184                    try {
185                        Thread.sleep(batchTimeout);
186                        queue.drainTo(collection, batchSize);  
187                    } catch (InterruptedException e) {
188                        if (cancelRequested) {
189                            return;
190                        }
191                        
192                        while (isInBatchCompleted(queue.size())) {
193                            queue.drainTo(collection, batchSize);  
194                        }
195                        
196                        if (!isOutBatchCompleted()) {
197                            continue;
198                        }
199                    }
200                    try {
201                        sendExchanges();
202                    } catch (Exception e) {
203                        getExceptionHandler().handleException(e);
204                    }
205                }
206            }
207            
208            public void cancel() {
209                cancelRequested = true;
210                interrupt();
211            }
212         
213            public void enqueueExchange(Exchange exchange) {
214                queue.add(exchange);
215                interrupt();
216            }
217            
218            private void sendExchanges() throws Exception {
219                Iterator<Exchange> iter = collection.iterator();
220                while (iter.hasNext()) {
221                    Exchange exchange = iter.next();
222                    iter.remove();
223                    processExchange(exchange);
224                }
225            }
226        }
227        
228    }