001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.jms;
018
019 import javax.jms.Message;
020
021 import org.apache.camel.Processor;
022 import org.apache.camel.PollingConsumer;
023 import org.apache.camel.impl.DefaultEndpoint;
024 import org.springframework.jms.core.JmsOperations;
025 import org.springframework.jms.core.JmsTemplate;
026 import org.springframework.jms.listener.AbstractMessageListenerContainer;
027
028 /**
029 * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
030 *
031 * @version $Revision:520964 $
032 */
033 public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
034 private JmsBinding binding;
035 private String destination;
036 private final boolean pubSubDomain;
037 private String selector;
038 private JmsConfiguration configuration;
039
040 public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
041 super(uri, component);
042 this.configuration = configuration;
043 this.destination = destination;
044 this.pubSubDomain = pubSubDomain;
045 }
046
047 public JmsProducer createProducer() throws Exception {
048 JmsOperations template = createJmsOperations();
049 return createProducer(template);
050 }
051
052 /**
053 * Creates a producer using the given template
054 */
055 public JmsProducer createProducer(JmsOperations template) throws Exception {
056 if (template instanceof JmsTemplate) {
057 JmsTemplate jmsTemplate = (JmsTemplate) template;
058 jmsTemplate.setPubSubDomain(pubSubDomain);
059 jmsTemplate.setDefaultDestinationName(destination);
060 }
061 return new JmsProducer(this, template);
062 }
063
064 public JmsConsumer createConsumer(Processor processor) throws Exception {
065 AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer();
066 return createConsumer(processor, listenerContainer);
067 }
068
069 /**
070 * Creates a consumer using the given processor and listener container
071 *
072 * @param processor the processor to use to process the messages
073 * @param listenerContainer the listener container
074 * @return a newly created consumer
075 * @throws Exception if the consumer cannot be created
076 */
077 public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
078 listenerContainer.setDestinationName(destination);
079 listenerContainer.setPubSubDomain(pubSubDomain);
080 if (selector != null) {
081 listenerContainer.setMessageSelector(selector);
082 }
083 return new JmsConsumer(this, processor, listenerContainer);
084 }
085
086 @Override
087 public PollingConsumer<JmsExchange> createPollingConsumer() throws Exception {
088 JmsOperations template = createJmsOperations();
089 return new JmsPollingConsumer(this, template);
090 }
091
092 public JmsExchange createExchange() {
093 return new JmsExchange(getContext(), getBinding());
094 }
095
096 public JmsExchange createExchange(Message message) {
097 return new JmsExchange(getContext(), getBinding(), message);
098 }
099
100 // Properties
101 //-------------------------------------------------------------------------
102 public JmsBinding getBinding() {
103 if (binding == null) {
104 binding = new JmsBinding();
105 }
106 return binding;
107 }
108
109 /**
110 * Sets the binding used to convert from a Camel message to and from a JMS message
111 *
112 * @param binding the binding to use
113 */
114 public void setBinding(JmsBinding binding) {
115 this.binding = binding;
116 }
117
118 public String getDestination() {
119 return destination;
120 }
121
122 public JmsConfiguration getConfiguration() {
123 return configuration;
124 }
125
126 public String getSelector() {
127 return selector;
128 }
129
130 /**
131 * Sets the JMS selector to use
132 */
133 public void setSelector(String selector) {
134 this.selector = selector;
135 }
136
137 public boolean isSingleton() {
138 return false;
139 }
140
141
142 protected JmsOperations createJmsOperations() {
143 return configuration.createJmsOperations(pubSubDomain, destination);
144 }
145
146 }