001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.Processor; 021 import org.apache.camel.impl.LoggingExceptionHandler; 022 import org.apache.camel.impl.ServiceSupport; 023 import org.apache.camel.processor.resequencer.ResequencerEngine; 024 import org.apache.camel.processor.resequencer.SequenceElementComparator; 025 import org.apache.camel.processor.resequencer.SequenceSender; 026 import org.apache.camel.spi.ExceptionHandler; 027 import org.apache.camel.util.ServiceHelper; 028 029 /** 030 * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The 031 * algorithm implemented by {@link ResequencerEngine} is based on the detection 032 * of gaps in a message stream rather than on a fixed batch size. Gap detection 033 * in combination with timeouts removes the constraint of having to know the 034 * number of messages of a sequence (i.e. the batch size) in advance. 035 * <p> 036 * Messages must contain a unique sequence number for which a predecessor and a 037 * successor is known. For example a message with the sequence number 3 has a 038 * predecessor message with the sequence number 2 and a successor message with 039 * the sequence number 4. The message sequence 2,3,5 has a gap because the 040 * sucessor of 3 is missing. The resequencer therefore has to retain message 5 041 * until message 4 arrives (or a timeout occurs). 042 * <p> 043 * Instances of this class poll for {@link Exchange}s from a given 044 * <code>endpoint</code>. Resequencing work and the delivery of messages to 045 * the next <code>processor</code> is done within the single polling thread. 046 * 047 * @author Martin Krasser 048 * 049 * @version $Revision: 64127 $ 050 * 051 * @see ResequencerEngine 052 */ 053 public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, Processor { 054 055 private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; 056 057 private ExceptionHandler exceptionHandler; 058 private ResequencerEngine<Exchange> engine; 059 private Processor processor; 060 private Delivery delivery; 061 private int capacity; 062 063 /** 064 * Creates a new {@link StreamResequencer} instance. 065 * 066 * @param endpoint 067 * endpoint to poll exchanges from. 068 * @param processor 069 * next processor that processes re-ordered exchanges. 070 * @param comparator 071 * a sequence element comparator for exchanges. 072 */ 073 public StreamResequencer(Processor processor, SequenceElementComparator<Exchange> comparator) { 074 this.exceptionHandler = new LoggingExceptionHandler(getClass()); 075 this.engine = new ResequencerEngine<Exchange>(comparator); 076 this.engine.setSequenceSender(this); 077 this.processor = processor; 078 } 079 080 /** 081 * Returns this resequencer's exception handler. 082 * 083 * @return this resequencer's exception handler. 084 */ 085 public ExceptionHandler getExceptionHandler() { 086 return exceptionHandler; 087 } 088 089 /** 090 * Returns the next processor. 091 * 092 * @return the next processor. 093 */ 094 public Processor getProcessor() { 095 return processor; 096 } 097 098 /** 099 * Returns this resequencer's capacity. The capacity is the maximum number 100 * of exchanges that can be managed by this resequencer at a given point in 101 * time. If the capacity if reached, polling from the endpoint will be 102 * skipped for <code>timeout</code> milliseconds giving exchanges the 103 * possibility to time out and to be delivered after the waiting period. 104 * 105 * @return this resequencer's capacity. 106 */ 107 public int getCapacity() { 108 return capacity; 109 } 110 111 /** 112 * Returns this resequencer's timeout. This sets the resequencer engine's 113 * timeout via {@link ResequencerEngine#setTimeout(long)}. This value is 114 * also used to define the polling timeout from the endpoint. 115 * 116 * @return this resequencer's timeout. 117 * (Processor) 118 * @see ResequencerEngine#setTimeout(long) 119 */ 120 public long getTimeout() { 121 return engine.getTimeout(); 122 } 123 124 public void setCapacity(int capacity) { 125 this.capacity = capacity; 126 } 127 128 public void setTimeout(long timeout) { 129 engine.setTimeout(timeout); 130 } 131 132 @Override 133 public String toString() { 134 return "StreamResequencer[to: " + processor + "]"; 135 } 136 137 @Override 138 protected void doStart() throws Exception { 139 ServiceHelper.startServices(processor); 140 delivery = new Delivery(); 141 engine.start(); 142 delivery.start(); 143 } 144 145 @Override 146 protected void doStop() throws Exception { 147 // let's stop everything in the reverse order 148 // no need to stop the worker thread -- it will stop automatically when this service is stopped 149 engine.stop(); 150 ServiceHelper.stopServices(processor); 151 } 152 153 /** 154 * Sends the <code>exchange</code> to the next <code>processor</code>. 155 * 156 * @param o 157 * exchange to send. 158 */ 159 public void sendElement(Exchange o) throws Exception { 160 processor.process(o); 161 } 162 163 public void process(Exchange exchange) throws Exception { 164 while (engine.size() >= capacity) { 165 Thread.sleep(getTimeout()); 166 } 167 engine.insert(exchange); 168 delivery.request(); 169 } 170 171 private class Delivery extends Thread { 172 173 private volatile boolean cancelRequested; 174 175 public Delivery() { 176 super("Delivery Thread"); 177 } 178 179 @Override 180 public void run() { 181 while (true) { 182 try { 183 Thread.sleep(DELIVERY_ATTEMPT_INTERVAL); 184 } catch (InterruptedException e) { 185 if (cancelRequested) { 186 return; 187 } 188 } 189 try { 190 engine.deliver(); 191 } catch (Exception e) { 192 exceptionHandler.handleException(e); 193 } 194 } 195 } 196 197 public void cancel() { 198 cancelRequested = true; 199 interrupt(); 200 } 201 202 public void request() { 203 interrupt(); 204 } 205 206 } 207 208 }