001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Timer; 020 import java.util.TimerTask; 021 import java.util.concurrent.RejectedExecutionException; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Message; 027 import org.apache.camel.Predicate; 028 import org.apache.camel.Processor; 029 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; 030 import org.apache.camel.model.ExceptionType; 031 import org.apache.camel.model.LoggingLevel; 032 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 033 import org.apache.camel.util.AsyncProcessorHelper; 034 import org.apache.camel.util.ServiceHelper; 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 /** 039 * Implements a <a 040 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter 041 * Channel</a> after attempting to redeliver the message using the 042 * {@link RedeliveryPolicy} 043 * 044 * @version $Revision: 65782 $ 045 */ 046 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor { 047 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter"; 048 public static final String REDELIVERED = "org.apache.camel.Redelivered"; 049 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException"; 050 public static final String CAUGHT_EXCEPTION_HEADER = "org.apache.camel.CamelCaughtException"; 051 052 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); 053 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; 054 055 private static Timer timer = new Timer(); 056 private Processor output; 057 private Processor deadLetter; 058 private AsyncProcessor outputAsync; 059 private RedeliveryPolicy redeliveryPolicy; 060 private Logger logger; 061 private Processor redeliveryProcessor; 062 063 private class RedeliveryData { 064 int redeliveryCounter; 065 long redeliveryDelay; 066 boolean sync = true; 067 Predicate handledPredicate; 068 069 // default behavior which can be overloaded on a per exception basis 070 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 071 Processor failureProcessor = deadLetter; 072 } 073 074 private class RedeliverTimerTask extends TimerTask { 075 private final Exchange exchange; 076 private final AsyncCallback callback; 077 private final RedeliveryData data; 078 079 public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 080 this.exchange = exchange; 081 this.callback = callback; 082 this.data = data; 083 } 084 085 @Override 086 public void run() { 087 //only handle the real AsyncProcess the exchange 088 outputAsync.process(exchange, new AsyncCallback() { 089 public void done(boolean sync) { 090 // Only handle the async case... 091 if (sync) { 092 return; 093 } 094 data.sync = false; 095 // only process if the exchange hasn't failed 096 // and it has not been handled by the error processor 097 if (exchange.getException() != null && !isFailureHandled(exchange)) { 098 // if we are redelivering then sleep before trying again 099 asyncProcess(exchange, callback, data); 100 } else { 101 callback.done(sync); 102 } 103 } 104 }); 105 } 106 } 107 108 public DeadLetterChannel(Processor output, Processor deadLetter, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) { 109 this.output = output; 110 this.deadLetter = deadLetter; 111 this.redeliveryProcessor = redeliveryProcessor; 112 this.outputAsync = AsyncProcessorTypeConverter.convert(output); 113 this.redeliveryPolicy = redeliveryPolicy; 114 this.logger = logger; 115 setExceptionPolicy(exceptionPolicyStrategy); 116 } 117 118 public static <E extends Exchange> Logger createDefaultLogger() { 119 return new Logger(LOG, LoggingLevel.ERROR); 120 } 121 122 @Override 123 public String toString() { 124 return "DeadLetterChannel[" + output + ", " + deadLetter + "]"; 125 } 126 127 public void process(Exchange exchange) throws Exception { 128 AsyncProcessorHelper.process(this, exchange); 129 } 130 131 public boolean process(Exchange exchange, final AsyncCallback callback) { 132 return process(exchange, callback, new RedeliveryData()); 133 } 134 135 /** 136 * Processes the exchange using decorated with this dead letter channel. 137 */ 138 protected boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 139 140 while (true) { 141 // we can't keep retrying if the route is being shutdown. 142 if (!isRunAllowed()) { 143 if (exchange.getException() == null) { 144 exchange.setException(new RejectedExecutionException()); 145 } 146 callback.done(data.sync); 147 return data.sync; 148 } 149 150 // if the exchange is transacted then let the underlying system handle the redelivery etc. 151 // this DeadLetterChannel is only for non transacted exchanges 152 if (exchange.isTransacted() && exchange.getException() != null) { 153 if (LOG.isDebugEnabled()) { 154 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange); 155 } 156 return data.sync; 157 } 158 159 // did previous processing caused an exception? 160 if (exchange.getException() != null) { 161 handleException(exchange, data); 162 } 163 164 // compute if we should redeliver or not 165 boolean shouldRedeliver = shouldRedeliver(exchange, data); 166 if (!shouldRedeliver) { 167 return deliverToFaultProcessor(exchange, callback, data); 168 } 169 170 // if we are redelivering then sleep before trying again 171 if (data.redeliveryCounter > 0) { 172 // okay we will give it another go so clear the exception so we can try again 173 if (exchange.getException() != null) { 174 exchange.setException(null); 175 } 176 177 // wait until we should redeliver 178 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 179 180 // letting onRedeliver be executed 181 deliverToRedeliveryProcessor(exchange, callback, data); 182 } 183 184 // process the exchange 185 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 186 public void done(boolean sync) { 187 // Only handle the async case... 188 if (sync) { 189 return; 190 } 191 data.sync = false; 192 // only process if the exchange hasn't failed 193 // and it has not been handled by the error processor 194 if (exchange.getException() != null && !isFailureHandled(exchange)) { 195 //TODO Call the Timer for the asyncProcessor 196 asyncProcess(exchange, callback, data); 197 } else { 198 callback.done(sync); 199 } 200 } 201 }); 202 if (!sync) { 203 // It is going to be processed async.. 204 return false; 205 } 206 if (exchange.getException() == null || isFailureHandled(exchange)) { 207 // If everything went well.. then we exit here.. 208 callback.done(true); 209 return true; 210 } 211 // error occurred so loop back around..... 212 } 213 214 } 215 216 protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 217 // set the timer here 218 if (!isRunAllowed()) { 219 if (exchange.getException() == null) { 220 exchange.setException(new RejectedExecutionException()); 221 } 222 callback.done(data.sync); 223 return; 224 } 225 226 // if the exchange is transacted then let the underlying system handle the redelivery etc. 227 // this DeadLetterChannel is only for non transacted exchanges 228 if (exchange.isTransacted() && exchange.getException() != null) { 229 if (LOG.isDebugEnabled()) { 230 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange); 231 } 232 return; 233 } 234 235 // did previous processing caused an exception? 236 if (exchange.getException() != null) { 237 handleException(exchange, data); 238 } 239 240 // compute if we should redeliver or not 241 boolean shouldRedeliver = shouldRedeliver(exchange, data); 242 if (!shouldRedeliver) { 243 deliverToFaultProcessor(exchange, callback, data); 244 return; 245 } 246 247 // process the next try 248 // if we are redelivering then sleep before trying again 249 if (data.redeliveryCounter > 0) { 250 // okay we will give it another go so clear the exception so we can try again 251 if (exchange.getException() != null) { 252 exchange.setException(null); 253 } 254 // wait until we should redeliver 255 data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay); 256 timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay); 257 258 // letting onRedeliver be executed 259 deliverToRedeliveryProcessor(exchange, callback, data); 260 } 261 } 262 263 private void handleException(Exchange exchange, RedeliveryData data) { 264 Throwable e = exchange.getException(); 265 // set the original caused exception 266 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e); 267 268 // find the error handler to use (if any) 269 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e); 270 if (exceptionPolicy != null) { 271 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 272 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 273 Processor processor = exceptionPolicy.getErrorHandler(); 274 if (processor != null) { 275 data.failureProcessor = processor; 276 } 277 } 278 279 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() 280 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 281 logFailedDelivery(true, exchange, msg, data, e); 282 283 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e); 284 } 285 286 /** 287 * Gives an optional configure redelivery processor a chance to process before the Exchange 288 * will be redelivered. This can be used to alter the Exchange. 289 */ 290 private boolean deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback, 291 final RedeliveryData data) { 292 if (redeliveryProcessor == null) { 293 return true; 294 } 295 296 if (LOG.isTraceEnabled()) { 297 LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange before its redelivered"); 298 } 299 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor); 300 boolean sync = afp.process(exchange, new AsyncCallback() { 301 public void done(boolean sync) { 302 callback.done(data.sync); 303 } 304 }); 305 306 return sync; 307 } 308 309 private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback, 310 final RedeliveryData data) { 311 // we did not success with the redelivery so now we let the failure processor handle it 312 setFailureHandled(exchange); 313 // must decrement the redelivery counter as we didn't process the redelivery but is 314 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 315 decrementRedeliveryCounter(exchange); 316 317 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor); 318 boolean sync = afp.process(exchange, new AsyncCallback() { 319 public void done(boolean sync) { 320 restoreExceptionOnExchange(exchange, data.handledPredicate); 321 callback.done(data.sync); 322 } 323 }); 324 325 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() 326 + ". Handled by the failure processor: " + data.failureProcessor; 327 logFailedDelivery(false, exchange, msg, data, null); 328 329 return sync; 330 } 331 332 // Properties 333 // ------------------------------------------------------------------------- 334 335 public static boolean isFailureHandled(Exchange exchange) { 336 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null 337 || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null; 338 } 339 340 public static void setFailureHandled(Exchange exchange) { 341 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); 342 exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException()); 343 exchange.setException(null); 344 } 345 346 /** 347 * Returns the output processor 348 */ 349 public Processor getOutput() { 350 return output; 351 } 352 353 /** 354 * Returns the dead letter that message exchanges will be sent to if the 355 * redelivery attempts fail 356 */ 357 public Processor getDeadLetter() { 358 return deadLetter; 359 } 360 361 public RedeliveryPolicy getRedeliveryPolicy() { 362 return redeliveryPolicy; 363 } 364 365 /** 366 * Sets the redelivery policy 367 */ 368 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 369 this.redeliveryPolicy = redeliveryPolicy; 370 } 371 372 public Logger getLogger() { 373 return logger; 374 } 375 376 /** 377 * Sets the logger strategy; which {@link Log} to use and which 378 * {@link LoggingLevel} to use 379 */ 380 public void setLogger(Logger logger) { 381 this.logger = logger; 382 } 383 384 // Implementation methods 385 // ------------------------------------------------------------------------- 386 387 protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) { 388 if (handledPredicate == null || !handledPredicate.matches(exchange)) { 389 if (LOG.isDebugEnabled()) { 390 LOG.debug("This exchange is not handled so its marked as failed: " + exchange); 391 } 392 // exception not handled, put exception back in the exchange 393 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); 394 } else { 395 if (LOG.isDebugEnabled()) { 396 LOG.debug("This exchange is handled so its marked as not failed: " + exchange); 397 } 398 exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE); 399 } 400 } 401 402 private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) { 403 LoggingLevel newLogLevel; 404 if (shouldRedeliver) { 405 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 406 } else { 407 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 408 } 409 if (e != null) { 410 logger.log(message, e, newLogLevel); 411 } else { 412 logger.log(message, newLogLevel); 413 } 414 } 415 416 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) { 417 return data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter); 418 } 419 420 /** 421 * Increments the redelivery counter and adds the redelivered flag if the 422 * message has been redelivered 423 */ 424 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) { 425 Message in = exchange.getIn(); 426 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 427 int next = 1; 428 if (counter != null) { 429 next = counter + 1; 430 } 431 in.setHeader(REDELIVERY_COUNTER, next); 432 in.setHeader(REDELIVERED, Boolean.TRUE); 433 return next; 434 } 435 436 /** 437 * Prepares the redelivery counter and boolean flag for the failure handle processor 438 */ 439 private void decrementRedeliveryCounter(Exchange exchange) { 440 Message in = exchange.getIn(); 441 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 442 if (counter != null) { 443 int prev = counter - 1; 444 in.setHeader(REDELIVERY_COUNTER, prev); 445 // set boolean flag according to counter 446 in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 447 } else { 448 // not redelivered 449 in.setHeader(REDELIVERY_COUNTER, 0); 450 in.setHeader(REDELIVERED, Boolean.FALSE); 451 } 452 } 453 454 @Override 455 protected void doStart() throws Exception { 456 ServiceHelper.startServices(output, deadLetter); 457 } 458 459 @Override 460 protected void doStop() throws Exception { 461 ServiceHelper.stopServices(deadLetter, output); 462 } 463 464 }