001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.jms;
018
019 import org.apache.activemq.ActiveMQSession;
020 import org.apache.activemq.util.JMSExceptionSupport;
021 import org.apache.camel.Consumer;
022 import org.apache.camel.Endpoint;
023 import org.apache.camel.Exchange;
024 import org.apache.camel.PollingConsumer;
025 import org.apache.camel.Processor;
026
027 import javax.jms.*;
028 import javax.jms.IllegalStateException;
029
030 /**
031 * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a
032 * Camel {@link Endpoint}
033 *
034 * @version $Revision: $
035 */
036 public class CamelMessageConsumer implements MessageConsumer {
037 private final CamelDestination destination;
038 private final Endpoint endpoint;
039 private final ActiveMQSession session;
040 private final String messageSelector;
041 private final boolean noLocal;
042 private MessageListener messageListener;
043 private Consumer consumer;
044 private PollingConsumer pollingConsumer;
045 private boolean closed;
046
047 public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
048 this.destination = destination;
049 this.endpoint = endpoint;
050 this.session = session;
051 this.messageSelector = messageSelector;
052 this.noLocal = noLocal;
053 }
054
055 public void close() throws JMSException {
056 if (!closed) {
057 closed = true;
058 try {
059 if (consumer != null) {
060 consumer.stop();
061 }
062 if (pollingConsumer != null) {
063 pollingConsumer.stop();
064 }
065 }
066 catch (JMSException e) {
067 throw e;
068 }
069 catch (Exception e) {
070 throw JMSExceptionSupport.create(e);
071 }
072 }
073 }
074
075 public MessageListener getMessageListener() throws JMSException {
076 return messageListener;
077 }
078
079 public void setMessageListener(MessageListener messageListener) throws JMSException {
080 this.messageListener = messageListener;
081 if (messageListener != null && consumer == null) {
082 consumer = createConsumer();
083 }
084 }
085
086 public Message receive() throws JMSException {
087 Exchange exchange = getPollingConsumer().receive();
088 return createMessage(exchange);
089 }
090
091 public Message receive(long timeoutMillis) throws JMSException {
092 Exchange exchange = getPollingConsumer().receive(timeoutMillis);
093 return createMessage(exchange);
094 }
095
096 public Message receiveNoWait() throws JMSException {
097 Exchange exchange = getPollingConsumer().receiveNoWait();
098 return createMessage(exchange);
099 }
100
101 // Properties
102 //-----------------------------------------------------------------------
103
104 public CamelDestination getDestination() {
105 return destination;
106 }
107
108 public Endpoint getEndpoint() {
109 return endpoint;
110 }
111
112 public String getMessageSelector() {
113 return messageSelector;
114 }
115
116 public boolean isNoLocal() {
117 return noLocal;
118 }
119
120 public ActiveMQSession getSession() {
121 return session;
122 }
123
124 // Implementation methods
125 //-----------------------------------------------------------------------
126
127 protected PollingConsumer getPollingConsumer() throws JMSException {
128 try {
129 if (pollingConsumer == null) {
130 pollingConsumer = endpoint.createPollingConsumer();
131 pollingConsumer.start();
132 }
133 return pollingConsumer;
134 }
135 catch (JMSException e) {
136 throw e;
137 }
138 catch (Exception e) {
139 throw JMSExceptionSupport.create(e);
140 }
141 }
142
143 protected Message createMessage(Exchange exchange) throws JMSException {
144 if (exchange != null) {
145 Message message = destination.getBinding().makeJmsMessage(exchange, session);
146 return message;
147 }
148 else {
149 return null;
150 }
151 }
152
153 protected Consumer createConsumer() throws JMSException {
154 try {
155 Consumer answer = endpoint.createConsumer(new Processor() {
156 public void process(Exchange exchange) throws Exception {
157 Message message = createMessage(exchange);
158 getMessageListener().onMessage(message);
159 }
160 });
161 answer.start();
162 return answer;
163 }
164 catch (JMSException e) {
165 throw e;
166 }
167 catch (Exception e) {
168 throw JMSExceptionSupport.create(e);
169 }
170 }
171
172 protected void checkClosed() throws javax.jms.IllegalStateException {
173 if (closed) {
174 throw new IllegalStateException("The producer is closed");
175 }
176 }
177 }