001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.RejectedExecutionException;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Message;
025    import org.apache.camel.Processor;
026    import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
027    import org.apache.camel.model.ExceptionType;
028    import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
029    import org.apache.camel.util.AsyncProcessorHelper;
030    import org.apache.camel.util.ServiceHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * Implements a <a
036     * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
037     * Channel</a> after attempting to redeliver the message using the
038     * {@link RedeliveryPolicy}
039     *
040     * @version $Revision: 37863 $
041     */
042    public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
043        public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
044        public static final String REDELIVERED = "org.apache.camel.Redelivered";
045        public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException";
046    
047        private class RedeliveryData {
048            int redeliveryCounter;
049            long redeliveryDelay;
050            boolean sync = true;
051    
052            // default behaviour which can be overloaded on a per exception basis
053            RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
054            Processor failureProcessor = deadLetter;
055        }
056    
057        private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
058        private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
059        private Processor output;
060        private Processor deadLetter;
061        private AsyncProcessor outputAsync;
062        private RedeliveryPolicy redeliveryPolicy;
063        private Logger logger;
064    
065        public DeadLetterChannel(Processor output, Processor deadLetter) {
066            this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
067                ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
068        }
069    
070        public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
071            this.deadLetter = deadLetter;
072            this.output = output;
073            this.outputAsync = AsyncProcessorTypeConverter.convert(output);
074    
075            this.redeliveryPolicy = redeliveryPolicy;
076            this.logger = logger;
077            setExceptionPolicy(exceptionPolicyStrategy);
078        }
079    
080        public static <E extends Exchange> Logger createDefaultLogger() {
081            return new Logger(LOG, LoggingLevel.ERROR);
082        }
083    
084        @Override
085        public String toString() {
086            return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
087        }
088    
089        public boolean process(Exchange exchange, final AsyncCallback callback) {
090            return process(exchange, callback, new RedeliveryData());
091        }
092    
093        public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
094    
095            while (true) {
096    
097                // We can't keep retrying if the route is being shutdown.
098                if (!isRunAllowed()) {
099                    if (exchange.getException() == null) {
100                        exchange.setException(new RejectedExecutionException());
101                    }
102                    callback.done(data.sync);
103                    return data.sync;
104                }
105    
106                if (exchange.getException() != null) {
107                    Throwable e = exchange.getException();
108                    exchange.setException(null); // Reset it since we are handling it.
109    
110                    logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, e);
111                    data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
112    
113                    ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
114                    if (exceptionPolicy != null) {
115                        data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
116                        Processor processor = exceptionPolicy.getErrorHandler();
117                        if (processor != null) {
118                            data.failureProcessor = processor;
119                        }
120                    }
121                }
122    
123                if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
124                    setFailureHandled(exchange, true);
125                    AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
126                    boolean sync = afp.process(exchange, new AsyncCallback() {
127                        public void done(boolean sync) {
128                            restoreExceptionOnExchange(exchange);
129                            callback.done(data.sync);
130                        }
131                    });
132    
133                    restoreExceptionOnExchange(exchange);
134                    logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor);
135                    return sync;
136                }
137    
138                if (data.redeliveryCounter > 0) {
139                    // Figure out how long we should wait to resend this message.
140                    data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
141                    sleep(data.redeliveryDelay);
142                }
143    
144                exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, exchange.getException());
145                exchange.setException(null);
146                boolean sync = outputAsync.process(exchange, new AsyncCallback() {
147                    public void done(boolean sync) {
148                        // Only handle the async case...
149                        if (sync) {
150                            return;
151                        }
152                        data.sync = false;
153                        if (exchange.getException() != null) {
154                            process(exchange, callback, data);
155                        } else {
156                            callback.done(sync);
157                        }
158                    }
159                });
160                if (!sync) {
161                    // It is going to be processed async..
162                    return false;
163                }
164                if (exchange.getException() == null || isFailureHandled(exchange)) {
165                    // If everything went well.. then we exit here..
166                    callback.done(true);
167                    return true;
168                }
169                // error occurred so loop back around.....
170            }
171    
172        }
173    
174        public static boolean isFailureHandled(Exchange exchange) {
175            return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null;
176        }
177    
178        public static void setFailureHandled(Exchange exchange, boolean isHandled) {
179            if (isHandled) {
180                exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
181                exchange.setException(null);
182            } else {
183                exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
184                exchange.removeProperty(FAILURE_HANDLED_PROPERTY);
185            }
186    
187        }
188    
189        public static void restoreExceptionOnExchange(Exchange exchange) {
190            exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
191        }
192    
193        public void process(Exchange exchange) throws Exception {
194            AsyncProcessorHelper.process(this, exchange);
195        }
196    
197        // Properties
198        // -------------------------------------------------------------------------
199    
200        /**
201         * Returns the output processor
202         */
203        public Processor getOutput() {
204            return output;
205        }
206    
207        /**
208         * Returns the dead letter that message exchanges will be sent to if the
209         * redelivery attempts fail
210         */
211        public Processor getDeadLetter() {
212            return deadLetter;
213        }
214    
215        public RedeliveryPolicy getRedeliveryPolicy() {
216            return redeliveryPolicy;
217        }
218    
219        /**
220         * Sets the redelivery policy
221         */
222        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
223            this.redeliveryPolicy = redeliveryPolicy;
224        }
225    
226        public Logger getLogger() {
227            return logger;
228        }
229    
230        /**
231         * Sets the logger strategy; which {@link Log} to use and which
232         * {@link LoggingLevel} to use
233         */
234        public void setLogger(Logger logger) {
235            this.logger = logger;
236        }
237    
238        // Implementation methods
239        // -------------------------------------------------------------------------
240    
241        /**
242         * Increments the redelivery counter and adds the redelivered flag if the
243         * message has been redelivered
244         */
245        protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
246            Message in = exchange.getIn();
247            Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
248            int next = 1;
249            if (counter != null) {
250                next = counter + 1;
251            }
252            in.setHeader(REDELIVERY_COUNTER, next);
253            in.setHeader(REDELIVERED, true);
254            exchange.setException(e);
255            return next;
256        }
257    
258        protected void sleep(long redeliveryDelay) {
259            if (redeliveryDelay > 0) {
260                if (LOG.isDebugEnabled()) {
261                    LOG.debug("Sleeping for: " + redeliveryDelay + " millis until attempting redelivery");
262                }
263                try {
264                    Thread.sleep(redeliveryDelay);
265                } catch (InterruptedException e) {
266                    if (LOG.isDebugEnabled()) {
267                        LOG.debug("Thread interrupted: " + e, e);
268                    }
269                }
270            }
271        }
272    
273        protected void doStart() throws Exception {
274            ServiceHelper.startServices(output, deadLetter);
275        }
276    
277        protected void doStop() throws Exception {
278            ServiceHelper.stopServices(deadLetter, output);
279        }
280    
281    }