001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.processor;
019
020 import org.apache.camel.Exchange;
021 import org.apache.camel.Processor;
022 import org.apache.camel.Message;
023 import org.apache.camel.impl.ServiceSupport;
024 import org.apache.camel.util.ServiceHelper;
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027
028 /**
029 * Implements a
030 * <a href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter Channel</a>
031 * after attempting to redeliver the message using the {@link RedeliveryPolicy}
032 *
033 * @version $Revision: 534145 $
034 */
035 public class DeadLetterChannel extends ServiceSupport implements ErrorHandler {
036 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
037 public static final String REDELIVERED = "org.apache.camel.Redelivered";
038
039 private static final transient Log log = LogFactory.getLog(DeadLetterChannel.class);
040 private Processor output;
041 private Processor deadLetter;
042 private RedeliveryPolicy redeliveryPolicy;
043 private Logger logger;
044
045 public static <E extends Exchange> Logger createDefaultLogger() {
046 return new Logger(log, LoggingLevel.ERROR);
047 }
048
049 public DeadLetterChannel(Processor output, Processor deadLetter) {
050 this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
051 }
052
053 public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger) {
054 this.deadLetter = deadLetter;
055 this.output = output;
056 this.redeliveryPolicy = redeliveryPolicy;
057 this.logger = logger;
058 }
059
060 @Override
061 public String toString() {
062 return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
063 }
064
065 public void process(Exchange exchange) throws Exception {
066 int redeliveryCounter = 0;
067 long redeliveryDelay = 0;
068
069 do {
070 if (redeliveryCounter > 0) {
071 // Figure out how long we should wait to resend this message.
072 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
073 sleep(redeliveryDelay);
074 }
075
076 try {
077 output.process(exchange);
078 return;
079 }
080 catch (RuntimeException e) {
081 logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
082 }
083 redeliveryCounter = incrementRedeliveryCounter(exchange);
084 }
085 while (redeliveryPolicy.shouldRedeliver(redeliveryCounter));
086
087 // now lets send to the dead letter queue
088 deadLetter.process(exchange);
089 }
090
091 // Properties
092 //-------------------------------------------------------------------------
093
094 /**
095 * Returns the output processor
096 */
097 public Processor getOutput() {
098 return output;
099 }
100
101 /**
102 * Returns the dead letter that message exchanges will be sent to if the redelivery attempts fail
103 */
104 public Processor getDeadLetter() {
105 return deadLetter;
106 }
107
108 public RedeliveryPolicy getRedeliveryPolicy() {
109 return redeliveryPolicy;
110 }
111
112 /**
113 * Sets the redelivery policy
114 */
115 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
116 this.redeliveryPolicy = redeliveryPolicy;
117 }
118
119 public Logger getLogger() {
120 return logger;
121 }
122
123 /**
124 * Sets the logger strategy; which {@link Log} to use and which {@link LoggingLevel} to use
125 */
126 public void setLogger(Logger logger) {
127 this.logger = logger;
128 }
129
130 // Implementation methods
131 //-------------------------------------------------------------------------
132
133 /**
134 * Increments the redelivery counter and adds the redelivered flag if the message has been redelivered
135 */
136 protected int incrementRedeliveryCounter(Exchange exchange) {
137 Message in = exchange.getIn();
138 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
139 int next = 1;
140 if (counter != null) {
141 next = counter + 1;
142 }
143 in.setHeader(REDELIVERY_COUNTER, next);
144 in.setHeader(REDELIVERED, true);
145 return next;
146 }
147
148 protected void sleep(long redeliveryDelay) {
149 if (redeliveryDelay > 0) {
150 if (log.isDebugEnabled()) {
151 log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
152 }
153 try {
154 Thread.sleep(redeliveryDelay);
155 }
156 catch (InterruptedException e) {
157 if (log.isDebugEnabled()) {
158 log.debug("Thread interupted: " + e, e);
159 }
160 }
161 }
162 }
163
164 protected void doStart() throws Exception {
165 ServiceHelper.startServices(output, deadLetter);
166 }
167
168 protected void doStop() throws Exception {
169 ServiceHelper.stopServices(deadLetter, output);
170 }
171 }