001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel;
019
020 import org.apache.camel.impl.ServiceSupport;
021 import org.apache.camel.util.ObjectHelper;
022 import org.apache.camel.util.ProducerCache;
023
024 import java.util.HashMap;
025 import java.util.Map;
026
027 /**
028 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate et al)
029 * for working with Camel and sending {@link Message} instances in an {@link Exchange}
030 * to an {@link Endpoint}.
031 *
032 * @version $Revision: 537937 $
033 */
034 public class CamelTemplate<E extends Exchange> extends ServiceSupport {
035 private CamelContext context;
036 private ProducerCache<E> producerCache = new ProducerCache<E>();
037 private boolean useEndpointCache = true;
038 private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
039 private Endpoint<E> defaultEndpoint;
040
041
042 public CamelTemplate(CamelContext context) {
043 this.context = context;
044 }
045
046 public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) {
047 this(context);
048 this.defaultEndpoint = defaultEndpoint;
049 }
050
051 /**
052 * Sends the exchange to the given endpoint
053 *
054 * @param endpointUri the endpoint URI to send the exchange to
055 * @param exchange the exchange to send
056 */
057 public E send(String endpointUri, E exchange) {
058 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
059 send(endpoint, exchange);
060 return exchange;
061 }
062
063 /**
064 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
065 *
066 * @param endpointUri the endpoint URI to send the exchange to
067 * @param processor the transformer used to populate the new exchange
068 */
069 public E send(String endpointUri, Processor processor) {
070 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
071 return send(endpoint, processor);
072 }
073
074 /**
075 * Sends the exchange to the given endpoint
076 *
077 * @param endpoint the endpoint to send the exchange to
078 * @param exchange the exchange to send
079 */
080 public E send(Endpoint<E> endpoint, E exchange) {
081 E convertedExchange = endpoint.toExchangeType(exchange);
082 producerCache.send(endpoint, convertedExchange);
083 return exchange;
084 }
085
086 /**
087 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
088 *
089 * @param endpoint the endpoint to send the exchange to
090 * @param processor the transformer used to populate the new exchange
091 */
092 public E send(Endpoint<E> endpoint, Processor processor) {
093 return producerCache.send(endpoint, processor);
094 }
095
096 /**
097 * Send the body to an endpoint
098 *
099 * @param endpoint
100 * @param body = the payload
101 * @return the result
102 */
103 public Object sendBody(Endpoint<E> endpoint, final Object body) {
104 E result = send(endpoint, new Processor() {
105 public void process(Exchange exchange) {
106 Message in = exchange.getIn();
107 in.setBody(body);
108 }
109 });
110 return extractResultBody(result);
111 }
112
113 /**
114 * Send the body to an endpoint
115 *
116 * @param endpointUri
117 * @param body = the payload
118 * @return the result
119 */
120 public Object sendBody(String endpointUri, final Object body) {
121 E result = send(endpointUri, new Processor() {
122 public void process(Exchange exchange) {
123 Message in = exchange.getIn();
124 in.setBody(body);
125 }
126 });
127 return extractResultBody(result);
128 }
129
130 /**
131 * Sends the body to an endpoint with a specified header and header value
132 *
133 * @param endpointUri the endpoint URI to send to
134 * @param body the payload send
135 * @param header the header name
136 * @param headerValue the header value
137 * @return the result
138 */
139 public Object sendBody(String endpointUri, final Object body, final String header, final Object headerValue) {
140 E result = send(endpointUri, new Processor() {
141 public void process(Exchange exchange) {
142 Message in = exchange.getIn();
143 in.setHeader(header, headerValue);
144 in.setBody(body);
145 }
146 });
147 return extractResultBody(result);
148 }
149
150 /**
151 * Sends the body to an endpoint with the specified headers and header values
152 *
153 * @param endpointUri the endpoint URI to send to
154 * @param body the payload send
155 * @return the result
156 */
157 public Object sendBody(String endpointUri, final Object body, final Map<String, Object> headers) {
158 E result = send(endpointUri, new Processor() {
159 public void process(Exchange exchange) {
160 Message in = exchange.getIn();
161 for (Map.Entry<String, Object> header : headers.entrySet()) {
162 in.setHeader(header.getKey(), header.getValue());
163 }
164 in.setBody(body);
165 }
166 });
167 return extractResultBody(result);
168 }
169
170 // Methods using the default endpoint
171 //-----------------------------------------------------------------------
172
173 /**
174 * Sends the body to the default endpoint and returns the result content
175 *
176 * @param body the body to send
177 * @return the returned message body
178 */
179 public Object sendBody(Object body) {
180 return sendBody(getMandatoryDefaultEndpoint(), body);
181 }
182
183 /**
184 * Sends the exchange to the default endpoint
185 *
186 * @param exchange the exchange to send
187 */
188 public E send(E exchange) {
189 return send(getMandatoryDefaultEndpoint(), exchange);
190 }
191
192 /**
193 * Sends an exchange to the default endpoint
194 * using a supplied @{link Processor} to populate the exchange
195 *
196 * @param processor the transformer used to populate the new exchange
197 */
198 public E send(Processor processor) {
199 return send(getMandatoryDefaultEndpoint(), processor);
200 }
201
202
203 // Properties
204 //-----------------------------------------------------------------------
205 public Producer<E> getProducer(Endpoint<E> endpoint) {
206 return producerCache.getProducer(endpoint);
207 }
208
209 public CamelContext getContext() {
210 return context;
211 }
212
213 public Endpoint<E> getDefaultEndpoint() {
214 return defaultEndpoint;
215 }
216
217 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
218 this.defaultEndpoint = defaultEndpoint;
219 }
220
221 /**
222 * Sets the default endpoint to use if none is specified
223 */
224 public void setDefaultEndpointUri(String endpointUri) {
225 setDefaultEndpoint(getContext().getEndpoint(endpointUri));
226 }
227
228 public boolean isUseEndpointCache() {
229 return useEndpointCache;
230 }
231
232 public void setUseEndpointCache(boolean useEndpointCache) {
233 this.useEndpointCache = useEndpointCache;
234 }
235
236 // Implementation methods
237 //-----------------------------------------------------------------------
238
239 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
240 Endpoint endpoint = null;
241
242 if (isUseEndpointCache()) {
243 synchronized (endpointCache) {
244 endpoint = endpointCache.get(endpointUri);
245 if (endpoint == null) {
246 endpoint = context.getEndpoint(endpointUri);
247 if (endpoint != null) {
248 endpointCache.put(endpointUri, endpoint);
249 }
250 }
251 }
252 }
253 else {
254 endpoint = context.getEndpoint(endpointUri);
255 }
256 if (endpoint == null) {
257 throw new NoSuchEndpointException(endpointUri);
258 }
259 return endpoint;
260 }
261
262 protected Endpoint<E> getMandatoryDefaultEndpoint() {
263 Endpoint<E> answer = getDefaultEndpoint();
264 ObjectHelper.notNull(answer, "defaultEndpoint");
265 return answer;
266 }
267
268 protected void doStart() throws Exception {
269 producerCache.start();
270 }
271
272 protected void doStop() throws Exception {
273 producerCache.stop();
274 }
275
276 protected Object extractResultBody(E result) {
277 return result != null ? result.getOut().getBody() : null;
278 }
279 }