001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.jms;
018
019 import javax.jms.ConnectionFactory;
020 import javax.jms.ExceptionListener;
021
022 import org.apache.camel.RuntimeCamelException;
023 import org.apache.camel.util.ObjectHelper;
024
025 import org.springframework.core.task.TaskExecutor;
026 import org.springframework.jms.core.JmsOperations;
027 import org.springframework.jms.core.JmsTemplate;
028 import org.springframework.jms.core.JmsTemplate102;
029 import org.springframework.jms.listener.AbstractMessageListenerContainer;
030 import org.springframework.jms.listener.DefaultMessageListenerContainer;
031 import org.springframework.jms.listener.DefaultMessageListenerContainer102;
032 import org.springframework.jms.listener.SimpleMessageListenerContainer;
033 import org.springframework.jms.listener.SimpleMessageListenerContainer102;
034 import org.springframework.jms.listener.serversession.ServerSessionFactory;
035 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer;
036 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102;
037 import org.springframework.jms.support.converter.MessageConverter;
038 import org.springframework.transaction.PlatformTransactionManager;
039
040 /**
041 * @version $Revision: 563665 $
042 */
043 public class JmsConfiguration implements Cloneable {
044 protected static final String TRANSACTED = "TRANSACTED";
045 protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
046 protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
047 protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
048
049 private ConnectionFactory connectionFactory;
050 private ConnectionFactory templateConnectionFactory;
051 private ConnectionFactory listenerConnectionFactory;
052 private int acknowledgementMode = -1;
053 private String acknowledgementModeName = AUTO_ACKNOWLEDGE;
054 // Used to configure the spring Container
055 private ExceptionListener exceptionListener;
056 private ConsumerType consumerType = ConsumerType.Default;
057 private boolean autoStartup = true;
058 private boolean acceptMessagesWhileStopping;
059 private String clientId;
060 private String durableSubscriptionName;
061 private boolean subscriptionDurable;
062 private boolean exposeListenerSession = true;
063 private TaskExecutor taskExecutor;
064 private boolean pubSubNoLocal;
065 private int concurrentConsumers = 1;
066 private int maxMessagesPerTask = 1;
067 private ServerSessionFactory serverSessionFactory;
068 private int cacheLevel = -1;
069 private String cacheLevelName = "CACHE_CONSUMER";
070 private long recoveryInterval = -1;
071 private long receiveTimeout = -1;
072 private int idleTaskExecutionLimit = 1;
073 private int maxConcurrentConsumers = 1;
074 // JmsTemplate only
075 private boolean useVersion102;
076 private boolean explicitQosEnabled;
077 private boolean deliveryPersistent = true;
078 private long timeToLive = -1;
079 private MessageConverter messageConverter;
080 private boolean messageIdEnabled = true;
081 private boolean messageTimestampEnabled = true;
082 private int priority = -1;
083 // Transaction related configuration
084 private boolean transacted;
085 private PlatformTransactionManager transactionManager;
086 private String transactionName;
087 private int transactionTimeout = -1;
088
089 public JmsConfiguration() {
090 }
091
092 public JmsConfiguration(ConnectionFactory connectionFactory) {
093 this.connectionFactory = connectionFactory;
094 }
095
096 /**
097 * Returns a copy of this configuration
098 */
099 public JmsConfiguration copy() {
100 try {
101 return (JmsConfiguration)clone();
102 } catch (CloneNotSupportedException e) {
103 throw new RuntimeCamelException(e);
104 }
105 }
106
107 public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) {
108 ConnectionFactory factory = getTemplateConnectionFactory();
109 JmsTemplate template = useVersion102 ? new JmsTemplate102(factory, pubSubDomain) : new JmsTemplate(factory);
110 template.setPubSubDomain(pubSubDomain);
111 template.setDefaultDestinationName(destination);
112
113 template.setExplicitQosEnabled(explicitQosEnabled);
114 template.setDeliveryPersistent(deliveryPersistent);
115 if (messageConverter != null) {
116 template.setMessageConverter(messageConverter);
117 }
118 template.setMessageIdEnabled(messageIdEnabled);
119 template.setMessageTimestampEnabled(messageTimestampEnabled);
120 if (priority >= 0) {
121 template.setPriority(priority);
122 }
123 template.setPubSubNoLocal(pubSubNoLocal);
124 if (receiveTimeout >= 0) {
125 template.setReceiveTimeout(receiveTimeout);
126 }
127 if (timeToLive >= 0) {
128 template.setTimeToLive(timeToLive);
129 }
130
131 template.setSessionTransacted(transacted);
132
133 // This is here for completeness, but the template should not get used
134 // for receiving messages.
135 if (acknowledgementMode >= 0) {
136 template.setSessionAcknowledgeMode(acknowledgementMode);
137 } else if (acknowledgementModeName != null) {
138 template.setSessionAcknowledgeModeName(acknowledgementModeName);
139 }
140 return template;
141 }
142
143 public AbstractMessageListenerContainer createMessageListenerContainer() {
144 AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
145 configureMessageListenerContainer(container);
146 return container;
147 }
148
149 protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
150 container.setConnectionFactory(getListenerConnectionFactory());
151 if (autoStartup) {
152 container.setAutoStartup(true);
153 }
154 if (clientId != null) {
155 container.setClientId(clientId);
156 }
157 container.setSubscriptionDurable(subscriptionDurable);
158 if (durableSubscriptionName != null) {
159 container.setDurableSubscriptionName(durableSubscriptionName);
160 }
161
162 // lets default to durable subscription if the subscriber name and
163 // client ID are specified (as there's
164 // no reason to specify them if not! :)
165 if (durableSubscriptionName != null && clientId != null) {
166 container.setSubscriptionDurable(true);
167 }
168
169 if (exceptionListener != null) {
170 container.setExceptionListener(exceptionListener);
171 }
172
173 container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
174 container.setExposeListenerSession(exposeListenerSession);
175 container.setSessionTransacted(transacted);
176
177 if (acknowledgementMode >= 0) {
178 container.setSessionAcknowledgeMode(acknowledgementMode);
179 } else if (acknowledgementModeName != null) {
180 container.setSessionAcknowledgeModeName(acknowledgementModeName);
181 }
182
183 if (container instanceof DefaultMessageListenerContainer) {
184 // this includes DefaultMessageListenerContainer102
185 DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
186 if (concurrentConsumers >= 0) {
187 listenerContainer.setConcurrentConsumers(concurrentConsumers);
188 }
189
190 if (cacheLevel >= 0) {
191 listenerContainer.setCacheLevel(cacheLevel);
192 } else if (cacheLevelName != null) {
193 listenerContainer.setCacheLevelName(cacheLevelName);
194 } else {
195 // Default to CACHE_CONSUMER unless specified. This works best
196 // with most JMS providers.
197 listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
198 }
199
200 if (idleTaskExecutionLimit >= 0) {
201 listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
202 }
203 if (maxConcurrentConsumers >= 0) {
204 listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
205 }
206 if (maxMessagesPerTask >= 0) {
207 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
208 }
209 listenerContainer.setPubSubNoLocal(pubSubNoLocal);
210 if (receiveTimeout >= 0) {
211 listenerContainer.setReceiveTimeout(receiveTimeout);
212 }
213 if (recoveryInterval >= 0) {
214 listenerContainer.setRecoveryInterval(recoveryInterval);
215 }
216 if (taskExecutor != null) {
217 listenerContainer.setTaskExecutor(taskExecutor);
218 }
219 if (transactionManager != null) {
220 listenerContainer.setTransactionManager(transactionManager);
221 }
222 if (transactionName != null) {
223 listenerContainer.setTransactionName(transactionName);
224 }
225 if (transactionTimeout >= 0) {
226 listenerContainer.setTransactionTimeout(transactionTimeout);
227 }
228 } else if (container instanceof ServerSessionMessageListenerContainer) {
229 // this includes ServerSessionMessageListenerContainer102
230 ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer)container;
231 if (maxMessagesPerTask >= 0) {
232 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
233 }
234 if (serverSessionFactory != null) {
235 listenerContainer.setServerSessionFactory(serverSessionFactory);
236 }
237 } else if (container instanceof SimpleMessageListenerContainer) {
238 // this includes SimpleMessageListenerContainer102
239 SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container;
240 if (concurrentConsumers >= 0) {
241 listenerContainer.setConcurrentConsumers(concurrentConsumers);
242 }
243 listenerContainer.setPubSubNoLocal(pubSubNoLocal);
244 if (taskExecutor != null) {
245 listenerContainer.setTaskExecutor(taskExecutor);
246 }
247 }
248 }
249
250 // Properties
251 // -------------------------------------------------------------------------
252 public ConnectionFactory getConnectionFactory() {
253 if (connectionFactory == null) {
254 connectionFactory = createConnectionFactory();
255 }
256 return connectionFactory;
257 }
258
259 /**
260 * Sets the default connection factory to be used if a connection factory is
261 * not specified for either
262 * {@link #setTemplateConnectionFactory(ConnectionFactory)} or
263 * {@link #setListenerConnectionFactory(ConnectionFactory)}
264 *
265 * @param connectionFactory the default connection factory to use
266 */
267 public void setConnectionFactory(ConnectionFactory connectionFactory) {
268 this.connectionFactory = connectionFactory;
269 }
270
271 public ConnectionFactory getListenerConnectionFactory() {
272 if (listenerConnectionFactory == null) {
273 listenerConnectionFactory = createListenerConnectionFactory();
274 }
275 return listenerConnectionFactory;
276 }
277
278 /**
279 * Sets the connection factory to be used for consuming messages via the
280 * {@link #createMessageListenerContainer()}
281 *
282 * @param listenerConnectionFactory the connection factory to use for
283 * consuming messages
284 */
285 public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
286 this.listenerConnectionFactory = listenerConnectionFactory;
287 }
288
289 public ConnectionFactory getTemplateConnectionFactory() {
290 if (templateConnectionFactory == null) {
291 templateConnectionFactory = createTemplateConnectionFactory();
292 }
293 return templateConnectionFactory;
294 }
295
296 /**
297 * Sets the connection factory to be used for sending messages via the
298 * {@link JmsTemplate} via {@link #createJmsOperations(boolean, String)}
299 *
300 * @param templateConnectionFactory the connection factory for sending
301 * messages
302 */
303 public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) {
304 this.templateConnectionFactory = templateConnectionFactory;
305 }
306
307 public boolean isUseVersion102() {
308 return useVersion102;
309 }
310
311 public void setUseVersion102(boolean useVersion102) {
312 this.useVersion102 = useVersion102;
313 }
314
315 public boolean isAutoStartup() {
316 return autoStartup;
317 }
318
319 public void setAutoStartup(boolean autoStartup) {
320 this.autoStartup = autoStartup;
321 }
322
323 public boolean isAcceptMessagesWhileStopping() {
324 return acceptMessagesWhileStopping;
325 }
326
327 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
328 this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
329 }
330
331 public String getClientId() {
332 return clientId;
333 }
334
335 public void setClientId(String consumerClientId) {
336 this.clientId = consumerClientId;
337 }
338
339 public String getDurableSubscriptionName() {
340 return durableSubscriptionName;
341 }
342
343 public void setDurableSubscriptionName(String durableSubscriptionName) {
344 this.durableSubscriptionName = durableSubscriptionName;
345 }
346
347 public ExceptionListener getExceptionListener() {
348 return exceptionListener;
349 }
350
351 public void setExceptionListener(ExceptionListener exceptionListener) {
352 this.exceptionListener = exceptionListener;
353 }
354
355 public boolean isSubscriptionDurable() {
356 return subscriptionDurable;
357 }
358
359 public void setSubscriptionDurable(boolean subscriptionDurable) {
360 this.subscriptionDurable = subscriptionDurable;
361 }
362
363 public String getAcknowledgementModeName() {
364 return acknowledgementModeName;
365 }
366
367 public void setAcknowledgementModeName(String consumerAcknowledgementMode) {
368 this.acknowledgementModeName = consumerAcknowledgementMode;
369 this.acknowledgementMode = -1;
370 }
371
372 public boolean isExposeListenerSession() {
373 return exposeListenerSession;
374 }
375
376 public void setExposeListenerSession(boolean exposeListenerSession) {
377 this.exposeListenerSession = exposeListenerSession;
378 }
379
380 public TaskExecutor getTaskExecutor() {
381 return taskExecutor;
382 }
383
384 public void setTaskExecutor(TaskExecutor taskExecutor) {
385 this.taskExecutor = taskExecutor;
386 }
387
388 public boolean isPubSubNoLocal() {
389 return pubSubNoLocal;
390 }
391
392 public void setPubSubNoLocal(boolean pubSubNoLocal) {
393 this.pubSubNoLocal = pubSubNoLocal;
394 }
395
396 public int getConcurrentConsumers() {
397 return concurrentConsumers;
398 }
399
400 public void setConcurrentConsumers(int concurrentConsumers) {
401 this.concurrentConsumers = concurrentConsumers;
402 }
403
404 public int getMaxMessagesPerTask() {
405 return maxMessagesPerTask;
406 }
407
408 public void setMaxMessagesPerTask(int maxMessagesPerTask) {
409 this.maxMessagesPerTask = maxMessagesPerTask;
410 }
411
412 public ServerSessionFactory getServerSessionFactory() {
413 return serverSessionFactory;
414 }
415
416 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) {
417 this.serverSessionFactory = serverSessionFactory;
418 }
419
420 public int getCacheLevel() {
421 return cacheLevel;
422 }
423
424 public void setCacheLevel(int cacheLevel) {
425 this.cacheLevel = cacheLevel;
426 }
427
428 public String getCacheLevelName() {
429 return cacheLevelName;
430 }
431
432 public void setCacheLevelName(String cacheName) {
433 this.cacheLevelName = cacheName;
434 }
435
436 public long getRecoveryInterval() {
437 return recoveryInterval;
438 }
439
440 public void setRecoveryInterval(long recoveryInterval) {
441 this.recoveryInterval = recoveryInterval;
442 }
443
444 public long getReceiveTimeout() {
445 return receiveTimeout;
446 }
447
448 public void setReceiveTimeout(long receiveTimeout) {
449 this.receiveTimeout = receiveTimeout;
450 }
451
452 public PlatformTransactionManager getTransactionManager() {
453 return transactionManager;
454 }
455
456 public void setTransactionManager(PlatformTransactionManager transactionManager) {
457 this.transactionManager = transactionManager;
458 }
459
460 public String getTransactionName() {
461 return transactionName;
462 }
463
464 public void setTransactionName(String transactionName) {
465 this.transactionName = transactionName;
466 }
467
468 public int getTransactionTimeout() {
469 return transactionTimeout;
470 }
471
472 public void setTransactionTimeout(int transactionTimeout) {
473 this.transactionTimeout = transactionTimeout;
474 }
475
476 public int getIdleTaskExecutionLimit() {
477 return idleTaskExecutionLimit;
478 }
479
480 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
481 this.idleTaskExecutionLimit = idleTaskExecutionLimit;
482 }
483
484 public int getMaxConcurrentConsumers() {
485 return maxConcurrentConsumers;
486 }
487
488 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
489 this.maxConcurrentConsumers = maxConcurrentConsumers;
490 }
491
492 public boolean isExplicitQosEnabled() {
493 return explicitQosEnabled;
494 }
495
496 public void setExplicitQosEnabled(boolean explicitQosEnabled) {
497 this.explicitQosEnabled = explicitQosEnabled;
498 }
499
500 public boolean isDeliveryPersistent() {
501 return deliveryPersistent;
502 }
503
504 public void setDeliveryPersistent(boolean deliveryPersistent) {
505 this.deliveryPersistent = deliveryPersistent;
506 }
507
508 public long getTimeToLive() {
509 return timeToLive;
510 }
511
512 public void setTimeToLive(long timeToLive) {
513 this.timeToLive = timeToLive;
514 }
515
516 public MessageConverter getMessageConverter() {
517 return messageConverter;
518 }
519
520 public void setMessageConverter(MessageConverter messageConverter) {
521 this.messageConverter = messageConverter;
522 }
523
524 public boolean isMessageIdEnabled() {
525 return messageIdEnabled;
526 }
527
528 public void setMessageIdEnabled(boolean messageIdEnabled) {
529 this.messageIdEnabled = messageIdEnabled;
530 }
531
532 public boolean isMessageTimestampEnabled() {
533 return messageTimestampEnabled;
534 }
535
536 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
537 this.messageTimestampEnabled = messageTimestampEnabled;
538 }
539
540 public int getPriority() {
541 return priority;
542 }
543
544 public void setPriority(int priority) {
545 this.priority = priority;
546 }
547
548 public ConsumerType getConsumerType() {
549 return consumerType;
550 }
551
552 public void setConsumerType(ConsumerType consumerType) {
553 this.consumerType = consumerType;
554 }
555
556 public int getAcknowledgementMode() {
557 return acknowledgementMode;
558 }
559
560 public void setAcknowledgementMode(int consumerAcknowledgementMode) {
561 this.acknowledgementMode = consumerAcknowledgementMode;
562 this.acknowledgementModeName = null;
563 }
564
565 public boolean isTransacted() {
566 return transacted;
567 }
568
569 public void setTransacted(boolean consumerTransacted) {
570 this.transacted = consumerTransacted;
571 }
572
573 // Implementation methods
574 // -------------------------------------------------------------------------
575 protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
576 // TODO we could allow a spring container to auto-inject these objects?
577 switch (consumerType) {
578 case Simple:
579 return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
580 case ServerSessionPool:
581 return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
582 case Default:
583 return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
584 default:
585 throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
586 }
587 }
588
589 /**
590 * Factory method which allows derived classes to customize the lazy
591 * creation
592 */
593 protected ConnectionFactory createConnectionFactory() {
594 ObjectHelper.notNull(connectionFactory, "connectionFactory");
595 return null;
596 }
597
598 /**
599 * Factory method which allows derived classes to customize the lazy
600 * creation
601 */
602 protected ConnectionFactory createListenerConnectionFactory() {
603 return getConnectionFactory();
604 }
605
606 /**
607 * Factory method which allows derived classes to customize the lazy
608 * creation
609 */
610 protected ConnectionFactory createTemplateConnectionFactory() {
611 return getConnectionFactory();
612 }
613 }