001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.jms;
018
019 import org.apache.activemq.ActiveMQConnection;
020 import org.apache.activemq.ActiveMQSession;
021 import org.apache.activemq.CustomDestination;
022 import org.apache.camel.CamelContext;
023 import org.apache.camel.CamelContextAware;
024 import org.apache.camel.Endpoint;
025 import org.apache.camel.component.jms.JmsBinding;
026
027 import javax.jms.JMSException;
028 import javax.jms.MessageConsumer;
029 import javax.jms.MessageProducer;
030 import javax.jms.QueueReceiver;
031 import javax.jms.QueueSender;
032 import javax.jms.TopicPublisher;
033 import javax.jms.TopicSubscriber;
034
035 /**
036 * @version $Revision: $
037 */
038 public class CamelDestination implements CustomDestination, CamelContextAware {
039 private String uri;
040 private Endpoint endpoint;
041 private CamelContext camelContext;
042 private JmsBinding binding = new JmsBinding();
043
044 public CamelDestination() {
045 }
046
047 public CamelDestination(String uri) {
048 this.uri = uri;
049 }
050
051 public String toString() {
052 return uri.toString();
053 }
054
055 // CustomDestination interface
056 //-----------------------------------------------------------------------
057 public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) {
058 return createConsumer(session, messageSelector, false);
059 }
060
061 public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) {
062 return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal);
063 }
064
065 public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) {
066 return createDurableSubscriber(session, null, messageSelector, noLocal);
067 }
068
069 public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
070 throw new UnsupportedOperationException("This destination is not a Topic: " + this);
071 }
072
073 public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
074 throw new UnsupportedOperationException("This destination is not a Queue: " + this);
075 }
076
077 // Producers
078 //-----------------------------------------------------------------------
079 public MessageProducer createProducer(ActiveMQSession session) throws JMSException {
080 return new CamelMessageProducer(this, resolveEndpoint(session), session);
081 }
082
083 public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
084 throw new UnsupportedOperationException("This destination is not a Topic: " + this);
085 }
086
087 public QueueSender createSender(ActiveMQSession session) throws JMSException {
088 throw new UnsupportedOperationException("This destination is not a Queue: " + this);
089 }
090
091 // Properties
092 //-----------------------------------------------------------------------
093
094 public String getUri() {
095 return uri;
096 }
097
098 public void setUri(String uri) {
099 this.uri = uri;
100 }
101
102 public Endpoint getEndpoint() {
103 return endpoint;
104 }
105
106 public void setEndpoint(Endpoint endpoint) {
107 this.endpoint = endpoint;
108 }
109
110 public CamelContext getCamelContext() {
111 return camelContext;
112 }
113
114 public void setCamelContext(CamelContext camelContext) {
115 this.camelContext = camelContext;
116 }
117
118 public JmsBinding getBinding() {
119 return binding;
120 }
121
122 public void setBinding(JmsBinding binding) {
123 this.binding = binding;
124 }
125
126 // Implementation methods
127 //-----------------------------------------------------------------------
128
129 /**
130 * Resolves the Camel Endpoint for this destination
131 *
132 * @return
133 */
134 protected Endpoint resolveEndpoint(ActiveMQSession session) {
135 Endpoint answer = getEndpoint();
136 if (answer == null) {
137 answer = resolveCamelContext(session).getEndpoint(getUri());
138 if (answer == null) {
139 throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri());
140 }
141 }
142 return answer;
143 }
144
145 protected CamelContext resolveCamelContext(ActiveMQSession session) {
146 CamelContext answer = getCamelContext();
147 if (answer == null) {
148 ActiveMQConnection connection = session.getConnection();
149 if (connection instanceof CamelConnection) {
150 CamelConnection camelConnection = (CamelConnection) connection;
151 answer = camelConnection.getCamelContext();
152 }
153 }
154 if (answer == null) {
155 throw new IllegalArgumentException("No CamelContext has been configured");
156 }
157 return answer;
158 }
159 }