001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.component.jms;
019
020 import org.apache.camel.CamelContext;
021 import org.apache.camel.Endpoint;
022 import org.apache.camel.impl.DefaultComponent;
023 import org.apache.camel.util.IntrospectionSupport;
024 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
025 import org.springframework.core.task.TaskExecutor;
026 import org.springframework.jms.listener.serversession.ServerSessionFactory;
027 import org.springframework.jms.support.converter.MessageConverter;
028 import org.springframework.transaction.PlatformTransactionManager;
029
030 import javax.jms.ConnectionFactory;
031 import javax.jms.ExceptionListener;
032 import javax.jms.Session;
033 import java.util.Map;
034
035 /**
036 * A <a href="http://activemq.apache.org/jms.html">JMS Component</a>
037 *
038 * @version $Revision:520964 $
039 */
040 public class JmsComponent extends DefaultComponent<JmsExchange> {
041 public static final String QUEUE_PREFIX = "queue:";
042 public static final String TOPIC_PREFIX = "topic:";
043
044 private JmsConfiguration configuration;
045
046 /**
047 * Static builder method
048 */
049 public static JmsComponent jmsComponent() {
050 return new JmsComponent();
051 }
052
053 /**
054 * Static builder method
055 */
056 public static JmsComponent jmsComponent(JmsConfiguration configuration) {
057 return new JmsComponent(configuration);
058 }
059
060 /**
061 * Static builder method
062 */
063 public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) {
064 return jmsComponent(new JmsConfiguration(connectionFactory));
065 }
066
067 /**
068 * Static builder method
069 */
070 public static JmsComponent jmsComponentClientAcknowledge(ConnectionFactory connectionFactory) {
071 JmsConfiguration template = new JmsConfiguration(connectionFactory);
072 template.setAcknowledgementMode(Session.CLIENT_ACKNOWLEDGE);
073 return jmsComponent(template);
074 }
075
076 /**
077 * Static builder method
078 */
079 public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) {
080 JmsConfiguration template = new JmsConfiguration(connectionFactory);
081 template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
082 return jmsComponent(template);
083 }
084
085 public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory) {
086 JmsConfiguration template = new JmsConfiguration(connectionFactory);
087 template.setTransacted(true);
088 return jmsComponent(template);
089 }
090
091 public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
092 JmsConfiguration template = new JmsConfiguration(connectionFactory);
093 template.setTransactionManager(transactionManager);
094 template.setTransacted(true);
095 return jmsComponent(template);
096 }
097
098 public JmsComponent() {
099 }
100
101 public JmsComponent(JmsConfiguration configuration) {
102 this.configuration = configuration;
103 }
104
105 public JmsComponent(CamelContext context) {
106 super(context);
107 }
108
109 @Override
110 protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
111
112 boolean pubSubDomain = false;
113 if (remaining.startsWith(QUEUE_PREFIX)) {
114 pubSubDomain = false;
115 remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
116 }
117 else if (remaining.startsWith(TOPIC_PREFIX)) {
118 pubSubDomain = true;
119 remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
120 }
121
122 final String subject = convertPathToActualDestination(remaining);
123
124 // lets make sure we copy the configuration as each endpoint can customize its own version
125 JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
126
127 String selector = (String) parameters.remove("selector");
128 if (selector != null) {
129 endpoint.setSelector(selector);
130 }
131 IntrospectionSupport.setProperties(endpoint.getConfiguration(), parameters);
132 return endpoint;
133 }
134
135 public JmsConfiguration getConfiguration() {
136 if (configuration == null) {
137 configuration = createConfiguration();
138 }
139 return configuration;
140 }
141
142 /**
143 * Sets the JMS configuration
144 *
145 * @param configuration the configuration to use by default for endpoints
146 */
147 public void setConfiguration(JmsConfiguration configuration) {
148 this.configuration = configuration;
149 }
150
151
152 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
153 getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
154 }
155
156 public void setAcknowledgementMode(int consumerAcknowledgementMode) {
157 getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode);
158 }
159
160 public void setAcknowledgementModeName(String consumerAcknowledgementMode) {
161 getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode);
162 }
163
164 public void setAutoStartup(boolean autoStartup) {
165 getConfiguration().setAutoStartup(autoStartup);
166 }
167
168 public void setCacheLevel(int cacheLevel) {
169 getConfiguration().setCacheLevel(cacheLevel);
170 }
171
172 public void setCacheLevelName(String cacheName) {
173 getConfiguration().setCacheLevelName(cacheName);
174 }
175
176 public void setClientId(String consumerClientId) {
177 getConfiguration().setClientId(consumerClientId);
178 }
179
180 public void setConcurrentConsumers(int concurrentConsumers) {
181 getConfiguration().setConcurrentConsumers(concurrentConsumers);
182 }
183
184 public void setConnectionFactory(ConnectionFactory connectionFactory) {
185 getConfiguration().setConnectionFactory(connectionFactory);
186 }
187
188 public void setConsumerType(ConsumerType consumerType) {
189 getConfiguration().setConsumerType(consumerType);
190 }
191
192 public void setDeliveryPersistent(boolean deliveryPersistent) {
193 getConfiguration().setDeliveryPersistent(deliveryPersistent);
194 }
195
196 public void setDurableSubscriptionName(String durableSubscriptionName) {
197 getConfiguration().setDurableSubscriptionName(durableSubscriptionName);
198 }
199
200 public void setExceptionListener(ExceptionListener exceptionListener) {
201 getConfiguration().setExceptionListener(exceptionListener);
202 }
203
204 public void setExplicitQosEnabled(boolean explicitQosEnabled) {
205 getConfiguration().setExplicitQosEnabled(explicitQosEnabled);
206 }
207
208 public void setExposeListenerSession(boolean exposeListenerSession) {
209 getConfiguration().setExposeListenerSession(exposeListenerSession);
210 }
211
212 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
213 getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
214 }
215
216 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
217 getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers);
218 }
219
220 public void setMaxMessagesPerTask(int maxMessagesPerTask) {
221 getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask);
222 }
223
224 public void setMessageConverter(MessageConverter messageConverter) {
225 getConfiguration().setMessageConverter(messageConverter);
226 }
227
228 public void setMessageIdEnabled(boolean messageIdEnabled) {
229 getConfiguration().setMessageIdEnabled(messageIdEnabled);
230 }
231
232 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
233 getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled);
234 }
235
236 public void setPriority(int priority) {
237 getConfiguration().setPriority(priority);
238 }
239
240 public void setPubSubNoLocal(boolean pubSubNoLocal) {
241 getConfiguration().setPubSubNoLocal(pubSubNoLocal);
242 }
243
244 public void setReceiveTimeout(long receiveTimeout) {
245 getConfiguration().setReceiveTimeout(receiveTimeout);
246 }
247
248 public void setRecoveryInterval(long recoveryInterval) {
249 getConfiguration().setRecoveryInterval(recoveryInterval);
250 }
251
252 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) {
253 getConfiguration().setServerSessionFactory(serverSessionFactory);
254 }
255
256 public void setSubscriptionDurable(boolean subscriptionDurable) {
257 getConfiguration().setSubscriptionDurable(subscriptionDurable);
258 }
259
260 public void setTaskExecutor(TaskExecutor taskExecutor) {
261 getConfiguration().setTaskExecutor(taskExecutor);
262 }
263
264 public void setTimeToLive(long timeToLive) {
265 getConfiguration().setTimeToLive(timeToLive);
266 }
267
268 public void setTransacted(boolean consumerTransacted) {
269 getConfiguration().setTransacted(consumerTransacted);
270 }
271
272 public void setTransactionManager(PlatformTransactionManager transactionManager) {
273 getConfiguration().setTransactionManager(transactionManager);
274 }
275
276 public void setTransactionName(String transactionName) {
277 getConfiguration().setTransactionName(transactionName);
278 }
279
280 public void setTransactionTimeout(int transactionTimeout) {
281 getConfiguration().setTransactionTimeout(transactionTimeout);
282 }
283
284 public void setUseVersion102(boolean useVersion102) {
285 getConfiguration().setUseVersion102(useVersion102);
286 }
287
288 /**
289 * A strategy method allowing the URI destination to be translated into the actual JMS destination name
290 * (say by looking up in JNDI or something)
291 */
292 protected String convertPathToActualDestination(String path) {
293 return path;
294 }
295
296 /**
297 * Factory method to create the default configuration instance
298 *
299 * @return a newly created configuration object which can then be further customized
300 */
301 protected JmsConfiguration createConfiguration() {
302 return new JmsConfiguration();
303 }
304
305 }