001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.Timer;
020 import java.util.TimerTask;
021 import java.util.concurrent.RejectedExecutionException;
022
023 import org.apache.camel.AsyncCallback;
024 import org.apache.camel.AsyncProcessor;
025 import org.apache.camel.Exchange;
026 import org.apache.camel.Message;
027 import org.apache.camel.Predicate;
028 import org.apache.camel.Processor;
029 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
030 import org.apache.camel.model.ExceptionType;
031 import org.apache.camel.model.LoggingLevel;
032 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
033 import org.apache.camel.util.AsyncProcessorHelper;
034 import org.apache.camel.util.MessageHelper;
035 import org.apache.camel.util.ServiceHelper;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 /**
040 * Implements a <a
041 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
042 * Channel</a> after attempting to redeliver the message using the
043 * {@link RedeliveryPolicy}
044 *
045 * @version $Revision: 740263 $
046 */
047 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
048 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
049 public static final String REDELIVERED = "org.apache.camel.Redelivered";
050 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException";
051 public static final String CAUGHT_EXCEPTION_HEADER = "org.apache.camel.CamelCaughtException";
052
053 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
054 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
055
056 private static Timer timer = new Timer();
057 private Processor output;
058 private Processor deadLetter;
059 private AsyncProcessor outputAsync;
060 private RedeliveryPolicy redeliveryPolicy;
061 private Logger logger;
062 private Processor redeliveryProcessor;
063
064 private class RedeliveryData {
065 int redeliveryCounter;
066 long redeliveryDelay;
067 boolean sync = true;
068 Predicate handledPredicate;
069
070 // default behavior which can be overloaded on a per exception basis
071 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
072 Processor failureProcessor = deadLetter;
073 }
074
075 private class RedeliverTimerTask extends TimerTask {
076 private final Exchange exchange;
077 private final AsyncCallback callback;
078 private final RedeliveryData data;
079
080 public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
081 this.exchange = exchange;
082 this.callback = callback;
083 this.data = data;
084 }
085
086 @Override
087 public void run() {
088 //only handle the real AsyncProcess the exchange
089 outputAsync.process(exchange, new AsyncCallback() {
090 public void done(boolean sync) {
091 // Only handle the async case...
092 if (sync) {
093 return;
094 }
095 data.sync = false;
096 // only process if the exchange hasn't failed
097 // and it has not been handled by the error processor
098 if (exchange.getException() != null && !isFailureHandled(exchange)) {
099 // if we are redelivering then sleep before trying again
100 asyncProcess(exchange, callback, data);
101 } else {
102 callback.done(sync);
103 }
104 }
105 });
106 }
107 }
108
109 public DeadLetterChannel(Processor output, Processor deadLetter, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
110 this.output = output;
111 this.deadLetter = deadLetter;
112 this.redeliveryProcessor = redeliveryProcessor;
113 this.outputAsync = AsyncProcessorTypeConverter.convert(output);
114 this.redeliveryPolicy = redeliveryPolicy;
115 this.logger = logger;
116 setExceptionPolicy(exceptionPolicyStrategy);
117 }
118
119 public static <E extends Exchange> Logger createDefaultLogger() {
120 return new Logger(LOG, LoggingLevel.ERROR);
121 }
122
123 @Override
124 public String toString() {
125 return "DeadLetterChannel[" + output + ", " + deadLetter + "]";
126 }
127
128 public void process(Exchange exchange) throws Exception {
129 AsyncProcessorHelper.process(this, exchange);
130 }
131
132 public boolean process(Exchange exchange, final AsyncCallback callback) {
133 return process(exchange, callback, new RedeliveryData());
134 }
135
136 /**
137 * Processes the exchange using decorated with this dead letter channel.
138 */
139 protected boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
140
141 while (true) {
142 // we can't keep retrying if the route is being shutdown.
143 if (!isRunAllowed()) {
144 if (exchange.getException() == null) {
145 exchange.setException(new RejectedExecutionException());
146 }
147 callback.done(data.sync);
148 return data.sync;
149 }
150
151 // if the exchange is transacted then let the underlying system handle the redelivery etc.
152 // this DeadLetterChannel is only for non transacted exchanges
153 if (exchange.isTransacted() && exchange.getException() != null) {
154 if (LOG.isDebugEnabled()) {
155 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange);
156 }
157 return data.sync;
158 }
159
160 // did previous processing caused an exception?
161 if (exchange.getException() != null) {
162 handleException(exchange, data);
163 }
164
165 // compute if we should redeliver or not
166 boolean shouldRedeliver = shouldRedeliver(exchange, data);
167 if (!shouldRedeliver) {
168 return deliverToFaultProcessor(exchange, callback, data);
169 }
170
171 // if we are redelivering then sleep before trying again
172 if (data.redeliveryCounter > 0) {
173 // okay we will give it another go so clear the exception so we can try again
174 if (exchange.getException() != null) {
175 exchange.setException(null);
176 }
177
178 // reset cached streams so they can be read again
179 MessageHelper.resetStreamCache(exchange.getIn());
180
181 // wait until we should redeliver
182 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
183
184 // letting onRedeliver be executed
185 deliverToRedeliveryProcessor(exchange, callback, data);
186 }
187
188 // process the exchange
189 boolean sync = outputAsync.process(exchange, new AsyncCallback() {
190 public void done(boolean sync) {
191 // Only handle the async case...
192 if (sync) {
193 return;
194 }
195 data.sync = false;
196 // only process if the exchange hasn't failed
197 // and it has not been handled by the error processor
198 if (exchange.getException() != null && !isFailureHandled(exchange)) {
199 //TODO Call the Timer for the asyncProcessor
200 asyncProcess(exchange, callback, data);
201 } else {
202 callback.done(sync);
203 }
204 }
205 });
206 if (!sync) {
207 // It is going to be processed async..
208 return false;
209 }
210 if (exchange.getException() == null || isFailureHandled(exchange)) {
211 // If everything went well.. then we exit here..
212 callback.done(true);
213 return true;
214 }
215 // error occurred so loop back around.....
216 }
217
218 }
219
220 protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
221 // set the timer here
222 if (!isRunAllowed()) {
223 if (exchange.getException() == null) {
224 exchange.setException(new RejectedExecutionException());
225 }
226 callback.done(data.sync);
227 return;
228 }
229
230 // if the exchange is transacted then let the underlying system handle the redelivery etc.
231 // this DeadLetterChannel is only for non transacted exchanges
232 if (exchange.isTransacted() && exchange.getException() != null) {
233 if (LOG.isDebugEnabled()) {
234 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange);
235 }
236 return;
237 }
238
239 // did previous processing caused an exception?
240 if (exchange.getException() != null) {
241 handleException(exchange, data);
242 }
243
244 // compute if we should redeliver or not
245 boolean shouldRedeliver = shouldRedeliver(exchange, data);
246 if (!shouldRedeliver) {
247 deliverToFaultProcessor(exchange, callback, data);
248 return;
249 }
250
251 // process the next try
252 // if we are redelivering then sleep before trying again
253 if (data.redeliveryCounter > 0) {
254 // okay we will give it another go so clear the exception so we can try again
255 if (exchange.getException() != null) {
256 exchange.setException(null);
257 }
258 // wait until we should redeliver
259 data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
260 timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
261
262 // letting onRedeliver be executed
263 deliverToRedeliveryProcessor(exchange, callback, data);
264 }
265 }
266
267 private void handleException(Exchange exchange, RedeliveryData data) {
268 Throwable e = exchange.getException();
269 // set the original caused exception
270 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
271
272 // find the error handler to use (if any)
273 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
274 if (exceptionPolicy != null) {
275 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
276 data.handledPredicate = exceptionPolicy.getHandledPolicy();
277 Processor processor = exceptionPolicy.getErrorHandler();
278 if (processor != null) {
279 data.failureProcessor = processor;
280 }
281 }
282
283 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
284 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
285 logFailedDelivery(true, exchange, msg, data, e);
286
287 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
288 }
289
290 /**
291 * Gives an optional configure redelivery processor a chance to process before the Exchange
292 * will be redelivered. This can be used to alter the Exchange.
293 */
294 private boolean deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
295 final RedeliveryData data) {
296 if (redeliveryProcessor == null) {
297 return true;
298 }
299
300 if (LOG.isTraceEnabled()) {
301 LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange before its redelivered");
302 }
303 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor);
304 boolean sync = afp.process(exchange, new AsyncCallback() {
305 public void done(boolean sync) {
306 callback.done(data.sync);
307 }
308 });
309
310 return sync;
311 }
312
313 private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
314 final RedeliveryData data) {
315 // we did not success with the redelivery so now we let the failure processor handle it
316 setFailureHandled(exchange);
317 // must decrement the redelivery counter as we didn't process the redelivery but is
318 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
319 decrementRedeliveryCounter(exchange);
320
321 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
322 boolean sync = afp.process(exchange, new AsyncCallback() {
323 public void done(boolean sync) {
324 restoreExceptionOnExchange(exchange, data.handledPredicate);
325 callback.done(data.sync);
326 }
327 });
328
329 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
330 + ". Handled by the failure processor: " + data.failureProcessor;
331 logFailedDelivery(false, exchange, msg, data, null);
332
333 return sync;
334 }
335
336 // Properties
337 // -------------------------------------------------------------------------
338
339 public static boolean isFailureHandled(Exchange exchange) {
340 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
341 || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
342 }
343
344 public static void setFailureHandled(Exchange exchange) {
345 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
346 exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException());
347 exchange.setException(null);
348 }
349
350 /**
351 * Returns the output processor
352 */
353 public Processor getOutput() {
354 return output;
355 }
356
357 /**
358 * Returns the dead letter that message exchanges will be sent to if the
359 * redelivery attempts fail
360 */
361 public Processor getDeadLetter() {
362 return deadLetter;
363 }
364
365 public RedeliveryPolicy getRedeliveryPolicy() {
366 return redeliveryPolicy;
367 }
368
369 /**
370 * Sets the redelivery policy
371 */
372 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
373 this.redeliveryPolicy = redeliveryPolicy;
374 }
375
376 public Logger getLogger() {
377 return logger;
378 }
379
380 /**
381 * Sets the logger strategy; which {@link Log} to use and which
382 * {@link LoggingLevel} to use
383 */
384 public void setLogger(Logger logger) {
385 this.logger = logger;
386 }
387
388 // Implementation methods
389 // -------------------------------------------------------------------------
390
391 protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
392 if (handledPredicate == null || !handledPredicate.matches(exchange)) {
393 if (LOG.isDebugEnabled()) {
394 LOG.debug("This exchange is not handled so its marked as failed: " + exchange);
395 }
396 // exception not handled, put exception back in the exchange
397 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
398 } else {
399 if (LOG.isDebugEnabled()) {
400 LOG.debug("This exchange is handled so its marked as not failed: " + exchange);
401 }
402 exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
403 }
404 }
405
406 private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
407 LoggingLevel newLogLevel;
408 if (shouldRedeliver) {
409 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
410 } else {
411 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
412 }
413 if (e != null) {
414 logger.log(message, e, newLogLevel);
415 } else {
416 logger.log(message, newLogLevel);
417 }
418 }
419
420 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
421 return data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter);
422 }
423
424 /**
425 * Increments the redelivery counter and adds the redelivered flag if the
426 * message has been redelivered
427 */
428 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
429 Message in = exchange.getIn();
430 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
431 int next = 1;
432 if (counter != null) {
433 next = counter + 1;
434 }
435 in.setHeader(REDELIVERY_COUNTER, next);
436 in.setHeader(REDELIVERED, Boolean.TRUE);
437 return next;
438 }
439
440 /**
441 * Prepares the redelivery counter and boolean flag for the failure handle processor
442 */
443 private void decrementRedeliveryCounter(Exchange exchange) {
444 Message in = exchange.getIn();
445 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
446 if (counter != null) {
447 int prev = counter - 1;
448 in.setHeader(REDELIVERY_COUNTER, prev);
449 // set boolean flag according to counter
450 in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
451 } else {
452 // not redelivered
453 in.setHeader(REDELIVERY_COUNTER, 0);
454 in.setHeader(REDELIVERED, Boolean.FALSE);
455 }
456 }
457
458 @Override
459 protected void doStart() throws Exception {
460 ServiceHelper.startServices(output, deadLetter);
461 }
462
463 @Override
464 protected void doStop() throws Exception {
465 ServiceHelper.stopServices(deadLetter, output);
466 }
467
468 }