001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Timer; 020 import java.util.TimerTask; 021 import java.util.concurrent.RejectedExecutionException; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Message; 027 import org.apache.camel.Predicate; 028 import org.apache.camel.Processor; 029 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; 030 import org.apache.camel.model.ExceptionType; 031 import org.apache.camel.model.LoggingLevel; 032 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 033 import org.apache.camel.util.AsyncProcessorHelper; 034 import org.apache.camel.util.MessageHelper; 035 import org.apache.camel.util.ServiceHelper; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * Implements a <a 041 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter 042 * Channel</a> after attempting to redeliver the message using the 043 * {@link RedeliveryPolicy} 044 * 045 * @version $Revision: 14178 $ 046 */ 047 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor { 048 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter"; 049 public static final String REDELIVERED = "org.apache.camel.Redelivered"; 050 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException"; 051 public static final String CAUGHT_EXCEPTION_HEADER = "org.apache.camel.CamelCaughtException"; 052 053 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); 054 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; 055 056 private static Timer timer = new Timer("Camel DeadLetterChannel Redeliver Timer", true); 057 private Processor output; 058 private Processor deadLetter; 059 private AsyncProcessor outputAsync; 060 private RedeliveryPolicy redeliveryPolicy; 061 private Logger logger; 062 private Processor redeliveryProcessor; 063 064 private class RedeliveryData { 065 int redeliveryCounter; 066 long redeliveryDelay; 067 boolean sync = true; 068 Predicate handledPredicate; 069 070 // default behavior which can be overloaded on a per exception basis 071 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 072 Processor failureProcessor = deadLetter; 073 } 074 075 private class RedeliverTimerTask extends TimerTask { 076 private final Exchange exchange; 077 private final AsyncCallback callback; 078 private final RedeliveryData data; 079 080 public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 081 this.exchange = exchange; 082 this.callback = callback; 083 this.data = data; 084 } 085 086 @Override 087 public void run() { 088 //only handle the real AsyncProcess the exchange 089 outputAsync.process(exchange, new AsyncCallback() { 090 public void done(boolean sync) { 091 // Only handle the async case... 092 if (sync) { 093 return; 094 } 095 data.sync = false; 096 // only process if the exchange hasn't failed 097 // and it has not been handled by the error processor 098 if (exchange.getException() != null && !isFailureHandled(exchange)) { 099 // if we are redelivering then sleep before trying again 100 asyncProcess(exchange, callback, data); 101 } else { 102 callback.done(sync); 103 } 104 } 105 }); 106 } 107 } 108 109 public DeadLetterChannel(Processor output, Processor deadLetter, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) { 110 this.output = output; 111 this.deadLetter = deadLetter; 112 this.redeliveryProcessor = redeliveryProcessor; 113 this.outputAsync = AsyncProcessorTypeConverter.convert(output); 114 this.redeliveryPolicy = redeliveryPolicy; 115 this.logger = logger; 116 setExceptionPolicy(exceptionPolicyStrategy); 117 } 118 119 public static <E extends Exchange> Logger createDefaultLogger() { 120 return new Logger(LOG, LoggingLevel.ERROR); 121 } 122 123 @Override 124 public String toString() { 125 return "DeadLetterChannel[" + output + ", " + deadLetter + "]"; 126 } 127 128 public void process(Exchange exchange) throws Exception { 129 AsyncProcessorHelper.process(this, exchange); 130 } 131 132 public boolean process(Exchange exchange, final AsyncCallback callback) { 133 return process(exchange, callback, new RedeliveryData()); 134 } 135 136 /** 137 * Processes the exchange using decorated with this dead letter channel. 138 */ 139 protected boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 140 141 while (true) { 142 // we can't keep retrying if the route is being shutdown. 143 if (!isRunAllowed()) { 144 if (exchange.getException() == null) { 145 exchange.setException(new RejectedExecutionException()); 146 } 147 callback.done(data.sync); 148 return data.sync; 149 } 150 151 // if the exchange is transacted then let the underlying system handle the redelivery etc. 152 // this DeadLetterChannel is only for non transacted exchanges 153 if (exchange.isTransacted() && exchange.getException() != null) { 154 if (LOG.isDebugEnabled()) { 155 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange); 156 } 157 return data.sync; 158 } 159 160 // did previous processing caused an exception? 161 if (exchange.getException() != null) { 162 handleException(exchange, data); 163 } 164 165 // compute if we should redeliver or not 166 boolean shouldRedeliver = shouldRedeliver(exchange, data); 167 if (!shouldRedeliver) { 168 return deliverToFaultProcessor(exchange, callback, data); 169 } 170 171 // if we are redelivering then sleep before trying again 172 if (data.redeliveryCounter > 0) { 173 // okay we will give it another go so clear the exception so we can try again 174 if (exchange.getException() != null) { 175 exchange.setException(null); 176 } 177 178 // reset cached streams so they can be read again 179 MessageHelper.resetStreamCache(exchange.getIn()); 180 181 // wait until we should redeliver 182 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 183 184 // letting onRedeliver be executed 185 deliverToRedeliveryProcessor(exchange, callback, data); 186 } 187 188 // process the exchange 189 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 190 public void done(boolean sync) { 191 // Only handle the async case... 192 if (sync) { 193 return; 194 } 195 data.sync = false; 196 // only process if the exchange hasn't failed 197 // and it has not been handled by the error processor 198 if (exchange.getException() != null && !isFailureHandled(exchange)) { 199 //TODO Call the Timer for the asyncProcessor 200 asyncProcess(exchange, callback, data); 201 } else { 202 callback.done(sync); 203 } 204 } 205 }); 206 if (!sync) { 207 // It is going to be processed async.. 208 return false; 209 } 210 if (exchange.getException() == null || isFailureHandled(exchange)) { 211 // If everything went well.. then we exit here.. 212 callback.done(true); 213 return true; 214 } 215 // error occurred so loop back around..... 216 } 217 218 } 219 220 protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 221 // set the timer here 222 if (!isRunAllowed()) { 223 if (exchange.getException() == null) { 224 exchange.setException(new RejectedExecutionException()); 225 } 226 callback.done(data.sync); 227 return; 228 } 229 230 // if the exchange is transacted then let the underlying system handle the redelivery etc. 231 // this DeadLetterChannel is only for non transacted exchanges 232 if (exchange.isTransacted() && exchange.getException() != null) { 233 if (LOG.isDebugEnabled()) { 234 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange); 235 } 236 return; 237 } 238 239 // did previous processing caused an exception? 240 if (exchange.getException() != null) { 241 handleException(exchange, data); 242 } 243 244 // compute if we should redeliver or not 245 boolean shouldRedeliver = shouldRedeliver(exchange, data); 246 if (!shouldRedeliver) { 247 deliverToFaultProcessor(exchange, callback, data); 248 return; 249 } 250 251 // process the next try 252 // if we are redelivering then sleep before trying again 253 if (data.redeliveryCounter > 0) { 254 // okay we will give it another go so clear the exception so we can try again 255 if (exchange.getException() != null) { 256 exchange.setException(null); 257 } 258 // wait until we should redeliver 259 data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay); 260 timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay); 261 262 // letting onRedeliver be executed 263 deliverToRedeliveryProcessor(exchange, callback, data); 264 } 265 } 266 267 private void handleException(Exchange exchange, RedeliveryData data) { 268 Throwable e = exchange.getException(); 269 // set the original caused exception 270 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e); 271 272 // find the error handler to use (if any) 273 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e); 274 if (exceptionPolicy != null) { 275 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 276 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 277 Processor processor = exceptionPolicy.getErrorHandler(); 278 if (processor != null) { 279 data.failureProcessor = processor; 280 } 281 } 282 283 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() 284 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 285 logFailedDelivery(true, exchange, msg, data, e); 286 287 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e); 288 } 289 290 /** 291 * Gives an optional configure redelivery processor a chance to process before the Exchange 292 * will be redelivered. This can be used to alter the Exchange. 293 */ 294 private void deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback, 295 final RedeliveryData data) { 296 if (redeliveryProcessor == null) { 297 return; 298 } 299 300 if (LOG.isTraceEnabled()) { 301 LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered"); 302 } 303 304 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor); 305 afp.process(exchange, new AsyncCallback() { 306 public void done(boolean sync) { 307 LOG.trace("Redelivery processor done"); 308 // do NOT call done on callback as this is the redelivery processor that 309 // is done. we should not mark the entire exchange as done. 310 } 311 }); 312 } 313 314 private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback, 315 final RedeliveryData data) { 316 // we did not success with the redelivery so now we let the failure processor handle it 317 setFailureHandled(exchange); 318 // must decrement the redelivery counter as we didn't process the redelivery but is 319 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 320 decrementRedeliveryCounter(exchange); 321 322 // reset cached streams so they can be read again 323 MessageHelper.resetStreamCache(exchange.getIn()); 324 325 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor); 326 boolean sync = afp.process(exchange, new AsyncCallback() { 327 public void done(boolean sync) { 328 restoreExceptionOnExchange(exchange, data.handledPredicate); 329 callback.done(data.sync); 330 } 331 }); 332 333 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() 334 + ". Handled by the failure processor: " + data.failureProcessor; 335 logFailedDelivery(false, exchange, msg, data, null); 336 337 return sync; 338 } 339 340 // Properties 341 // ------------------------------------------------------------------------- 342 343 public static boolean isFailureHandled(Exchange exchange) { 344 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null 345 || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null; 346 } 347 348 public static void setFailureHandled(Exchange exchange) { 349 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); 350 exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException()); 351 exchange.setException(null); 352 } 353 354 /** 355 * Returns the output processor 356 */ 357 public Processor getOutput() { 358 return output; 359 } 360 361 /** 362 * Returns the dead letter that message exchanges will be sent to if the 363 * redelivery attempts fail 364 */ 365 public Processor getDeadLetter() { 366 return deadLetter; 367 } 368 369 public RedeliveryPolicy getRedeliveryPolicy() { 370 return redeliveryPolicy; 371 } 372 373 /** 374 * Sets the redelivery policy 375 */ 376 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 377 this.redeliveryPolicy = redeliveryPolicy; 378 } 379 380 public Logger getLogger() { 381 return logger; 382 } 383 384 /** 385 * Sets the logger strategy; which {@link Log} to use and which 386 * {@link LoggingLevel} to use 387 */ 388 public void setLogger(Logger logger) { 389 this.logger = logger; 390 } 391 392 // Implementation methods 393 // ------------------------------------------------------------------------- 394 395 protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) { 396 if (handledPredicate == null || !handledPredicate.matches(exchange)) { 397 if (LOG.isDebugEnabled()) { 398 LOG.debug("This exchange is not handled so its marked as failed: " + exchange); 399 } 400 // exception not handled, put exception back in the exchange 401 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); 402 } else { 403 if (LOG.isDebugEnabled()) { 404 LOG.debug("This exchange is handled so its marked as not failed: " + exchange); 405 } 406 exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE); 407 } 408 } 409 410 private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) { 411 LoggingLevel newLogLevel; 412 if (shouldRedeliver) { 413 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 414 } else { 415 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 416 } 417 if (e != null) { 418 logger.log(message, e, newLogLevel); 419 } else { 420 logger.log(message, newLogLevel); 421 } 422 } 423 424 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) { 425 return data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter); 426 } 427 428 /** 429 * Increments the redelivery counter and adds the redelivered flag if the 430 * message has been redelivered 431 */ 432 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) { 433 Message in = exchange.getIn(); 434 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 435 int next = 1; 436 if (counter != null) { 437 next = counter + 1; 438 } 439 in.setHeader(REDELIVERY_COUNTER, next); 440 in.setHeader(REDELIVERED, Boolean.TRUE); 441 return next; 442 } 443 444 /** 445 * Prepares the redelivery counter and boolean flag for the failure handle processor 446 */ 447 private void decrementRedeliveryCounter(Exchange exchange) { 448 Message in = exchange.getIn(); 449 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 450 if (counter != null) { 451 int prev = counter - 1; 452 in.setHeader(REDELIVERY_COUNTER, prev); 453 // set boolean flag according to counter 454 in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 455 } else { 456 // not redelivered 457 in.setHeader(REDELIVERY_COUNTER, 0); 458 in.setHeader(REDELIVERED, Boolean.FALSE); 459 } 460 } 461 462 @Override 463 protected void doStart() throws Exception { 464 ServiceHelper.startServices(output, deadLetter); 465 } 466 467 @Override 468 protected void doStop() throws Exception { 469 ServiceHelper.stopServices(deadLetter, output); 470 } 471 472 }