001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.concurrent.LinkedBlockingQueue;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.impl.LoggingExceptionHandler;
026    import org.apache.camel.impl.ServiceSupport;
027    import org.apache.camel.spi.ExceptionHandler;
028    import org.apache.camel.util.ServiceHelper;
029    
030    /**
031     * A base class for any kind of {@link Processor} which implements some kind of
032     * batch processing.
033     * 
034     * @version $Revision: 64127 $
035     */
036    public class BatchProcessor extends ServiceSupport implements Processor {
037        
038        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
039        public static final int DEFAULT_BATCH_SIZE = 100;
040    
041        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
042        private int batchSize = DEFAULT_BATCH_SIZE;
043        private int outBatchSize;
044    
045        private Processor processor;
046        private Collection<Exchange> collection;
047        private ExceptionHandler exceptionHandler;
048    
049        private BatchSender sender;
050        
051        public BatchProcessor(Processor processor, Collection<Exchange> collection) {
052            this.processor = processor;
053            this.collection = collection;
054            this.sender = new BatchSender();
055        }
056    
057        @Override
058        public String toString() {
059            return "BatchProcessor[to: " + processor + "]";
060        }
061    
062        // Properties
063        // -------------------------------------------------------------------------
064        public ExceptionHandler getExceptionHandler() {
065            if (exceptionHandler == null) {
066                exceptionHandler = new LoggingExceptionHandler(getClass());
067            }
068            return exceptionHandler;
069        }
070    
071        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
072            this.exceptionHandler = exceptionHandler;
073        }
074    
075        public int getBatchSize() {
076            return batchSize;
077        }
078    
079        /**
080         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor
081         * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
082         *
083         * @param batchSize the size
084         */
085        public void setBatchSize(int batchSize) {
086            this.batchSize = batchSize;
087        }
088    
089        public int getOutBatchSize() {
090            return outBatchSize;
091        }
092    
093        /**
094         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
095         * the completion is triggered. Can for instance be used to ensure that this batch is completed when
096         * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled.
097         *
098         * @param outBatchSize the size
099         */
100        public void setOutBatchSize(int outBatchSize) {
101            this.outBatchSize = outBatchSize;
102        }
103    
104        public long getBatchTimeout() {
105            return batchTimeout;
106        }
107    
108        public void setBatchTimeout(long batchTimeout) {
109            this.batchTimeout = batchTimeout;
110        }
111    
112        public Processor getProcessor() {
113            return processor;
114        }
115    
116        /**
117         * A strategy method to decide if the batch is completed the resulting exchanges should be sent
118         */
119        protected boolean isBatchCompleted(int num) {
120            // out batch size is optional and we should only check it if its enabled (= >0)
121            if (outBatchSize > 0 && collection.size() >= outBatchSize) {
122                return true;
123            }
124            // fallback to regular batch size check
125            return num >= batchSize;
126        }
127    
128        /**
129         * Strategy Method to process an exchange in the batch. This method allows
130         * derived classes to perform custom processing before or after an
131         * individual exchange is processed
132         */
133        protected void processExchange(Exchange exchange) throws Exception {
134            processor.process(exchange);
135        }
136    
137        protected void doStart() throws Exception {
138            ServiceHelper.startServices(processor);
139            sender.start();
140        }
141    
142        protected void doStop() throws Exception {
143            sender.cancel();
144            ServiceHelper.stopServices(processor);
145            collection.clear();
146        }
147    
148        protected Collection<Exchange> getCollection() {
149            return collection;
150        }
151    
152        /**
153         * Enqueues an exchange for later batch processing.
154         */
155        public void process(Exchange exchange) throws Exception {
156            sender.enqueueExchange(exchange);
157        }
158    
159        /**
160         * Sender thread for queued-up exchanges.
161         */
162        private class BatchSender extends Thread {
163            
164            private volatile boolean cancelRequested;
165    
166            private LinkedBlockingQueue<Exchange> queue;
167            
168            public BatchSender() {
169                super("Batch Sender");
170                this.queue = new LinkedBlockingQueue<Exchange>();
171            }
172    
173            @Override
174            public void run() {
175                while (true) {
176                    try {
177                        Thread.sleep(batchTimeout);
178                    } catch (InterruptedException e) {
179                        if (cancelRequested) {
180                            return;
181                        }
182                    }
183                    try {
184                        sendExchanges();
185                    } catch (Exception e) {
186                        getExceptionHandler().handleException(e);
187                    }
188                }
189            }
190            
191            public void cancel() {
192                cancelRequested = true;
193                interrupt();
194            }
195            
196            public void sendBatch() {
197                interrupt();
198            }
199         
200            public void enqueueExchange(Exchange exchange) {
201                queue.add(exchange);
202                if (isBatchCompleted(queue.size())) {
203                    sendBatch();
204                }
205            }
206            
207            private void sendExchanges() throws Exception {
208                queue.drainTo(collection, batchSize);
209                Iterator<Exchange> iter = collection.iterator();
210                while (iter.hasNext()) {
211                    Exchange exchange = iter.next();
212                    iter.remove();
213                    processExchange(exchange);
214                }
215            }
216            
217        }
218        
219    }