001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.processor.idempotent;
019
020 import org.apache.camel.Exchange;
021 import org.apache.camel.Expression;
022 import org.apache.camel.Processor;
023 import org.apache.camel.impl.ServiceSupport;
024 import org.apache.camel.util.ExpressionHelper;
025 import org.apache.camel.util.ServiceHelper;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028
029 /**
030 * An implementation of the
031 * <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a> pattern.
032 *
033 * @version $Revision: 1.1 $
034 */
035 public class IdempotentConsumer extends ServiceSupport implements Processor {
036 private static final transient Log log = LogFactory.getLog(IdempotentConsumer.class);
037 private Expression<Exchange> messageIdExpression;
038 private Processor nextProcessor;
039 private MessageIdRepository messageIdRepository;
040
041 public IdempotentConsumer(Expression<Exchange> messageIdExpression, MessageIdRepository messageIdRepository, Processor nextProcessor) {
042 this.messageIdExpression = messageIdExpression;
043 this.messageIdRepository = messageIdRepository;
044 this.nextProcessor = nextProcessor;
045 }
046
047 @Override
048 public String toString() {
049 return "IdempotentConsumer[expression=" + messageIdExpression + ", repository=" + messageIdRepository + ", processor=" + nextProcessor + "]";
050 }
051
052 public void process(Exchange exchange) throws Exception {
053 String messageId = ExpressionHelper.evaluateAsString(messageIdExpression, exchange);
054 if (messageId == null) {
055 throw new NoMessageIdException(exchange, messageIdExpression);
056 }
057 if (!messageIdRepository.contains(messageId)) {
058 nextProcessor.process(exchange);
059 }
060 else {
061 onDuplicateMessage(exchange, messageId);
062 }
063 }
064
065 // Properties
066 //-------------------------------------------------------------------------
067 public Expression<Exchange> getMessageIdExpression() {
068 return messageIdExpression;
069 }
070
071 public MessageIdRepository getMessageIdRepository() {
072 return messageIdRepository;
073 }
074
075 public Processor getNextProcessor() {
076 return nextProcessor;
077 }
078
079
080 // Implementation methods
081 //-------------------------------------------------------------------------
082
083 protected void doStart() throws Exception {
084 ServiceHelper.startServices(nextProcessor);
085 }
086
087 protected void doStop() throws Exception {
088 ServiceHelper.stopServices(nextProcessor);
089 }
090
091 /**
092 * A strategy method to allow derived classes to overload the behaviour of processing a duplicate message
093 *
094 * @param exchange the exchange
095 * @param messageId the message ID of this exchange
096 */
097 protected void onDuplicateMessage(Exchange exchange, String messageId) {
098 if (log.isDebugEnabled()) {
099 log.debug("Ignoring duplicate message with id: " + messageId + " for exchange: " + exchange);
100 }
101 }
102 }