001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.TimeUnit; 020 import java.util.concurrent.locks.Condition; 021 import java.util.concurrent.locks.Lock; 022 import java.util.concurrent.locks.ReentrantLock; 023 024 import org.apache.camel.Exchange; 025 import org.apache.camel.Processor; 026 import org.apache.camel.impl.LoggingExceptionHandler; 027 import org.apache.camel.impl.ServiceSupport; 028 import org.apache.camel.processor.resequencer.ResequencerEngine; 029 import org.apache.camel.processor.resequencer.SequenceElementComparator; 030 import org.apache.camel.processor.resequencer.SequenceSender; 031 import org.apache.camel.spi.ExceptionHandler; 032 import org.apache.camel.util.ServiceHelper; 033 034 /** 035 * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The 036 * algorithm implemented by {@link ResequencerEngine} is based on the detection 037 * of gaps in a message stream rather than on a fixed batch size. Gap detection 038 * in combination with timeouts removes the constraint of having to know the 039 * number of messages of a sequence (i.e. the batch size) in advance. 040 * <p> 041 * Messages must contain a unique sequence number for which a predecessor and a 042 * successor is known. For example a message with the sequence number 3 has a 043 * predecessor message with the sequence number 2 and a successor message with 044 * the sequence number 4. The message sequence 2,3,5 has a gap because the 045 * sucessor of 3 is missing. The resequencer therefore has to retain message 5 046 * until message 4 arrives (or a timeout occurs). 047 * <p> 048 * Instances of this class poll for {@link Exchange}s from a given 049 * <code>endpoint</code>. Resequencing work and the delivery of messages to 050 * the next <code>processor</code> is done within the single polling thread. 051 * 052 * @version $Revision: 14059 $ 053 * 054 * @see ResequencerEngine 055 */ 056 public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, Processor { 057 058 private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; 059 060 private ExceptionHandler exceptionHandler; 061 private ResequencerEngine<Exchange> engine; 062 private Processor processor; 063 private Delivery delivery; 064 private int capacity; 065 066 /** 067 * Creates a new {@link StreamResequencer} instance. 068 * 069 * @param endpoint 070 * endpoint to poll exchanges from. 071 * @param processor 072 * next processor that processes re-ordered exchanges. 073 * @param comparator 074 * a sequence element comparator for exchanges. 075 */ 076 public StreamResequencer(Processor processor, SequenceElementComparator<Exchange> comparator) { 077 this.exceptionHandler = new LoggingExceptionHandler(getClass()); 078 this.engine = new ResequencerEngine<Exchange>(comparator); 079 this.engine.setSequenceSender(this); 080 this.processor = processor; 081 } 082 083 /** 084 * Returns this resequencer's exception handler. 085 * 086 * @return this resequencer's exception handler. 087 */ 088 public ExceptionHandler getExceptionHandler() { 089 return exceptionHandler; 090 } 091 092 /** 093 * Returns the next processor. 094 * 095 * @return the next processor. 096 */ 097 public Processor getProcessor() { 098 return processor; 099 } 100 101 /** 102 * Returns this resequencer's capacity. The capacity is the maximum number 103 * of exchanges that can be managed by this resequencer at a given point in 104 * time. If the capacity if reached, polling from the endpoint will be 105 * skipped for <code>timeout</code> milliseconds giving exchanges the 106 * possibility to time out and to be delivered after the waiting period. 107 * 108 * @return this resequencer's capacity. 109 */ 110 public int getCapacity() { 111 return capacity; 112 } 113 114 /** 115 * Returns this resequencer's timeout. This sets the resequencer engine's 116 * timeout via {@link ResequencerEngine#setTimeout(long)}. This value is 117 * also used to define the polling timeout from the endpoint. 118 * 119 * @return this resequencer's timeout. 120 * (Processor) 121 * @see ResequencerEngine#setTimeout(long) 122 */ 123 public long getTimeout() { 124 return engine.getTimeout(); 125 } 126 127 public void setCapacity(int capacity) { 128 this.capacity = capacity; 129 } 130 131 public void setTimeout(long timeout) { 132 engine.setTimeout(timeout); 133 } 134 135 @Override 136 public String toString() { 137 return "StreamResequencer[to: " + processor + "]"; 138 } 139 140 @Override 141 protected void doStart() throws Exception { 142 ServiceHelper.startServices(processor); 143 delivery = new Delivery(); 144 engine.start(); 145 delivery.start(); 146 } 147 148 @Override 149 protected void doStop() throws Exception { 150 // let's stop everything in the reverse order 151 // no need to stop the worker thread -- it will stop automatically when this service is stopped 152 engine.stop(); 153 ServiceHelper.stopServices(processor); 154 } 155 156 /** 157 * Sends the <code>exchange</code> to the next <code>processor</code>. 158 * 159 * @param o 160 * exchange to send. 161 */ 162 public void sendElement(Exchange o) throws Exception { 163 processor.process(o); 164 } 165 166 public void process(Exchange exchange) throws Exception { 167 while (engine.size() >= capacity) { 168 Thread.sleep(getTimeout()); 169 } 170 engine.insert(exchange); 171 delivery.request(); 172 } 173 174 private class Delivery extends Thread { 175 176 private Lock deliveryRequestLock = new ReentrantLock(); 177 private Condition deliveryRequestCondition = deliveryRequestLock.newCondition(); 178 179 public Delivery() { 180 super("Delivery Thread"); 181 } 182 183 @Override 184 public void run() { 185 while (true) { 186 try { 187 deliveryRequestLock.lock(); 188 try { 189 deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS); 190 } finally { 191 deliveryRequestLock.unlock(); 192 } 193 } catch (InterruptedException e) { 194 break; 195 } 196 try { 197 engine.deliver(); 198 } catch (Exception e) { 199 exceptionHandler.handleException(e); 200 } 201 } 202 } 203 204 public void cancel() { 205 interrupt(); 206 } 207 208 public void request() { 209 deliveryRequestLock.lock(); 210 try { 211 deliveryRequestCondition.signal(); 212 } finally { 213 deliveryRequestLock.unlock(); 214 } 215 } 216 217 } 218 219 }