001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.RejectedExecutionException; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Message; 025 import org.apache.camel.Predicate; 026 import org.apache.camel.Processor; 027 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; 028 import org.apache.camel.model.ExceptionType; 029 import org.apache.camel.model.LoggingLevel; 030 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 031 import org.apache.camel.util.AsyncProcessorHelper; 032 import org.apache.camel.util.ServiceHelper; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 036 /** 037 * Implements a <a 038 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter 039 * Channel</a> after attempting to redeliver the message using the 040 * {@link RedeliveryPolicy} 041 * 042 * @version $Revision: 61064 $ 043 */ 044 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor { 045 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter"; 046 public static final String REDELIVERED = "org.apache.camel.Redelivered"; 047 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException"; 048 049 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); 050 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; 051 private Processor output; 052 private Processor deadLetter; 053 private AsyncProcessor outputAsync; 054 private RedeliveryPolicy redeliveryPolicy; 055 private Logger logger; 056 057 private class RedeliveryData { 058 int redeliveryCounter; 059 long redeliveryDelay; 060 boolean sync = true; 061 Predicate handledPredicate; 062 063 // default behavior which can be overloaded on a per exception basis 064 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 065 Processor failureProcessor = deadLetter; 066 067 } 068 069 public DeadLetterChannel(Processor output, Processor deadLetter) { 070 this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(), 071 ErrorHandlerSupport.createDefaultExceptionPolicyStrategy()); 072 } 073 074 public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) { 075 this.deadLetter = deadLetter; 076 this.output = output; 077 this.outputAsync = AsyncProcessorTypeConverter.convert(output); 078 079 this.redeliveryPolicy = redeliveryPolicy; 080 this.logger = logger; 081 setExceptionPolicy(exceptionPolicyStrategy); 082 } 083 084 public static <E extends Exchange> Logger createDefaultLogger() { 085 return new Logger(LOG, LoggingLevel.ERROR); 086 } 087 088 @Override 089 public String toString() { 090 return "DeadLetterChannel[" + output + ", " + deadLetter + "]"; 091 } 092 093 public boolean process(Exchange exchange, final AsyncCallback callback) { 094 return process(exchange, callback, new RedeliveryData()); 095 } 096 097 public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 098 099 while (true) { 100 // we can't keep retrying if the route is being shutdown. 101 if (!isRunAllowed()) { 102 if (exchange.getException() == null) { 103 exchange.setException(new RejectedExecutionException()); 104 } 105 callback.done(data.sync); 106 return data.sync; 107 } 108 109 // if the exchange is transacted then let the underlying system handle the redelivery etc. 110 // this DeadLetterChannel is only for non transacted exchanges 111 if (exchange.isTransacted() && exchange.getException() != null) { 112 if (LOG.isDebugEnabled()) { 113 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange); 114 } 115 return data.sync; 116 } 117 118 // did previous processing caused an exception? 119 if (exchange.getException() != null) { 120 Throwable e = exchange.getException(); 121 // set the original caused exception 122 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e); 123 124 // find the error handler to use (if any) 125 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e); 126 if (exceptionPolicy != null) { 127 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy); 128 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 129 Processor processor = exceptionPolicy.getErrorHandler(); 130 if (processor != null) { 131 data.failureProcessor = processor; 132 } 133 } 134 135 logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e); 136 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e); 137 } 138 139 // should we redeliver or not? 140 if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) { 141 // we did not success with the redelivery so now we let the failure processor handle it 142 setFailureHandled(exchange); 143 // must decrement the redelivery counter as we didn't process the redelivery but is 144 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 145 decrementRedeliveryCounter(exchange); 146 147 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor); 148 boolean sync = afp.process(exchange, new AsyncCallback() { 149 public void done(boolean sync) { 150 restoreExceptionOnExchange(exchange, data.handledPredicate); 151 callback.done(data.sync); 152 } 153 }); 154 155 // The line below shouldn't be needed, it is invoked by the AsyncCallback above 156 //restoreExceptionOnExchange(exchange, data.handledPredicate); 157 logFailedDelivery("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor, data, null); 158 return sync; 159 } 160 161 // should we redeliver 162 if (data.redeliveryCounter > 0) { 163 // okay we will give it another go so clear the exception so we can try again 164 if (exchange.getException() != null) { 165 exchange.setException(null); 166 } 167 168 // wait until we should redeliver 169 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 170 } 171 172 // process the exchange 173 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 174 public void done(boolean sync) { 175 // Only handle the async case... 176 if (sync) { 177 return; 178 } 179 data.sync = false; 180 // only process if the exchange hasn't failed 181 // and it has not been handled by the error processor 182 if (exchange.getException() != null && !isFailureHandled(exchange)) { 183 process(exchange, callback, data); 184 } else { 185 callback.done(sync); 186 } 187 } 188 }); 189 if (!sync) { 190 // It is going to be processed async.. 191 return false; 192 } 193 if (exchange.getException() == null || isFailureHandled(exchange)) { 194 // If everything went well.. then we exit here.. 195 callback.done(true); 196 return true; 197 } 198 // error occurred so loop back around..... 199 } 200 201 } 202 203 private void logFailedDelivery(String message, RedeliveryData data, Throwable e) { 204 LoggingLevel newLogLevel = null; 205 if (data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) { 206 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 207 } else { 208 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 209 } 210 if (e != null) { 211 logger.log(message, e, newLogLevel); 212 } else { 213 logger.log(message, newLogLevel); 214 } 215 } 216 217 public static boolean isFailureHandled(Exchange exchange) { 218 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null; 219 } 220 221 public static void setFailureHandled(Exchange exchange) { 222 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); 223 exchange.setException(null); 224 } 225 226 protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) { 227 if (handledPredicate == null || !handledPredicate.matches(exchange)) { 228 if (LOG.isDebugEnabled()) { 229 LOG.debug("This exchange is not handled so its marked as failed: " + exchange); 230 } 231 // exception not handled, put exception back in the exchange 232 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); 233 } else { 234 if (LOG.isDebugEnabled()) { 235 LOG.debug("This exchange is handled so its marked as not failed: " + exchange); 236 } 237 exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE); 238 } 239 } 240 241 public void process(Exchange exchange) throws Exception { 242 AsyncProcessorHelper.process(this, exchange); 243 } 244 245 // Properties 246 // ------------------------------------------------------------------------- 247 248 /** 249 * Returns the output processor 250 */ 251 public Processor getOutput() { 252 return output; 253 } 254 255 /** 256 * Returns the dead letter that message exchanges will be sent to if the 257 * redelivery attempts fail 258 */ 259 public Processor getDeadLetter() { 260 return deadLetter; 261 } 262 263 public RedeliveryPolicy getRedeliveryPolicy() { 264 return redeliveryPolicy; 265 } 266 267 /** 268 * Sets the redelivery policy 269 */ 270 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 271 this.redeliveryPolicy = redeliveryPolicy; 272 } 273 274 public Logger getLogger() { 275 return logger; 276 } 277 278 /** 279 * Sets the logger strategy; which {@link Log} to use and which 280 * {@link LoggingLevel} to use 281 */ 282 public void setLogger(Logger logger) { 283 this.logger = logger; 284 } 285 286 // Implementation methods 287 // ------------------------------------------------------------------------- 288 289 /** 290 * Increments the redelivery counter and adds the redelivered flag if the 291 * message has been redelivered 292 */ 293 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) { 294 Message in = exchange.getIn(); 295 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 296 int next = 1; 297 if (counter != null) { 298 next = counter + 1; 299 } 300 in.setHeader(REDELIVERY_COUNTER, next); 301 in.setHeader(REDELIVERED, Boolean.TRUE); 302 return next; 303 } 304 305 /** 306 * Prepares the redelivery counter and boolean flag for the failure handle processor 307 */ 308 private void decrementRedeliveryCounter(Exchange exchange) { 309 Message in = exchange.getIn(); 310 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 311 if (counter != null) { 312 int prev = counter - 1; 313 in.setHeader(REDELIVERY_COUNTER, prev); 314 // set boolean flag according to counter 315 in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 316 } else { 317 // not redelivered 318 in.setHeader(REDELIVERY_COUNTER, 0); 319 in.setHeader(REDELIVERED, Boolean.FALSE); 320 } 321 } 322 323 @Override 324 protected void doStart() throws Exception { 325 ServiceHelper.startServices(output, deadLetter); 326 } 327 328 @Override 329 protected void doStop() throws Exception { 330 ServiceHelper.stopServices(deadLetter, output); 331 } 332 }