001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel;
018
019 import java.util.HashMap;
020 import java.util.Map;
021
022 import org.apache.camel.impl.ServiceSupport;
023 import org.apache.camel.util.ObjectHelper;
024 import org.apache.camel.util.ProducerCache;
025
026 /**
027 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
028 * et al) for working with Camel and sending {@link Message} instances in an
029 * {@link Exchange} to an {@link Endpoint}.
030 *
031 * @version $Revision: 563607 $
032 */
033 public class CamelTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> {
034 private CamelContext context;
035 private ProducerCache<E> producerCache = new ProducerCache<E>();
036 private boolean useEndpointCache = true;
037 private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
038 private Endpoint<E> defaultEndpoint;
039
040 public CamelTemplate(CamelContext context) {
041 this.context = context;
042 }
043
044 public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) {
045 this(context);
046 this.defaultEndpoint = defaultEndpoint;
047 }
048
049 /**
050 * Sends the exchange to the given endpoint
051 *
052 * @param endpointUri the endpoint URI to send the exchange to
053 * @param exchange the exchange to send
054 */
055 public E send(String endpointUri, E exchange) {
056 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
057 send(endpoint, exchange);
058 return exchange;
059 }
060
061 /**
062 * Sends an exchange to an endpoint using a supplied
063 *
064 * @{link Processor} to populate the exchange
065 *
066 * @param endpointUri the endpoint URI to send the exchange to
067 * @param processor the transformer used to populate the new exchange
068 */
069 public E send(String endpointUri, Processor processor) {
070 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
071 return send(endpoint, processor);
072 }
073
074 /**
075 * Sends the exchange to the given endpoint
076 *
077 * @param endpoint the endpoint to send the exchange to
078 * @param exchange the exchange to send
079 */
080 public E send(Endpoint<E> endpoint, E exchange) {
081 E convertedExchange = endpoint.toExchangeType(exchange);
082 producerCache.send(endpoint, convertedExchange);
083 return exchange;
084 }
085
086 /**
087 * Sends an exchange to an endpoint using a supplied
088 *
089 * @{link Processor} to populate the exchange
090 *
091 * @param endpoint the endpoint to send the exchange to
092 * @param processor the transformer used to populate the new exchange
093 */
094 public E send(Endpoint<E> endpoint, Processor processor) {
095 return producerCache.send(endpoint, processor);
096 }
097
098 /**
099 * Send the body to an endpoint
100 *
101 * @param endpoint
102 * @param body = the payload
103 * @return the result
104 */
105 public Object sendBody(Endpoint<E> endpoint, final Object body) {
106 E result = send(endpoint, new Processor() {
107 public void process(Exchange exchange) {
108 Message in = exchange.getIn();
109 in.setBody(body);
110 }
111 });
112 return extractResultBody(result);
113 }
114
115 /**
116 * Send the body to an endpoint
117 *
118 * @param endpointUri
119 * @param body = the payload
120 * @return the result
121 */
122 public Object sendBody(String endpointUri, final Object body) {
123 E result = send(endpointUri, new Processor() {
124 public void process(Exchange exchange) {
125 Message in = exchange.getIn();
126 in.setBody(body);
127 }
128 });
129 return extractResultBody(result);
130 }
131
132 /**
133 * Sends the body to an endpoint with a specified header and header value
134 *
135 * @param endpointUri the endpoint URI to send to
136 * @param body the payload send
137 * @param header the header name
138 * @param headerValue the header value
139 * @return the result
140 */
141 public Object sendBodyAndHeader(String endpointUri, final Object body, final String header,
142 final Object headerValue) {
143 return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
144 }
145
146 /**
147 * Sends the body to an endpoint with a specified header and header value
148 *
149 * @param endpoint the Endpoint to send to
150 * @param body the payload send
151 * @param header the header name
152 * @param headerValue the header value
153 * @return the result
154 */
155 public Object sendBodyAndHeader(Endpoint endpoint, final Object body, final String header,
156 final Object headerValue) {
157 E result = send(endpoint, new Processor() {
158 public void process(Exchange exchange) {
159 Message in = exchange.getIn();
160 in.setHeader(header, headerValue);
161 in.setBody(body);
162 }
163 });
164 return extractResultBody(result);
165 }
166
167 /**
168 * Sends the body to an endpoint with the specified headers and header
169 * values
170 *
171 * @param endpointUri the endpoint URI to send to
172 * @param body the payload send
173 * @return the result
174 */
175 public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
176 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
177 }
178
179 /**
180 * Sends the body to an endpoint with the specified headers and header
181 * values
182 *
183 * @param endpoint the endpoint URI to send to
184 * @param body the payload send
185 * @return the result
186 */
187 public Object sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
188 E result = send(endpoint, new Processor() {
189 public void process(Exchange exchange) {
190 Message in = exchange.getIn();
191 for (Map.Entry<String, Object> header : headers.entrySet()) {
192 in.setHeader(header.getKey(), header.getValue());
193 }
194 in.setBody(body);
195 }
196 });
197 return extractResultBody(result);
198 }
199
200 // Methods using the default endpoint
201 // -----------------------------------------------------------------------
202
203 /**
204 * Sends the body to the default endpoint and returns the result content
205 *
206 * @param body the body to send
207 * @return the returned message body
208 */
209 public Object sendBody(Object body) {
210 return sendBody(getMandatoryDefaultEndpoint(), body);
211 }
212
213 /**
214 * Sends the exchange to the default endpoint
215 *
216 * @param exchange the exchange to send
217 */
218 public E send(E exchange) {
219 return send(getMandatoryDefaultEndpoint(), exchange);
220 }
221
222 /**
223 * Sends an exchange to the default endpoint using a supplied
224 *
225 * @{link Processor} to populate the exchange
226 *
227 * @param processor the transformer used to populate the new exchange
228 */
229 public E send(Processor processor) {
230 return send(getMandatoryDefaultEndpoint(), processor);
231 }
232
233 public Object sendBodyAndHeader(Object body, String header, Object headerValue) {
234 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
235 }
236
237 public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) {
238 return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
239 }
240
241 // Properties
242 // -----------------------------------------------------------------------
243 public Producer<E> getProducer(Endpoint<E> endpoint) {
244 return producerCache.getProducer(endpoint);
245 }
246
247 public CamelContext getContext() {
248 return context;
249 }
250
251 public Endpoint<E> getDefaultEndpoint() {
252 return defaultEndpoint;
253 }
254
255 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
256 this.defaultEndpoint = defaultEndpoint;
257 }
258
259 /**
260 * Sets the default endpoint to use if none is specified
261 */
262 public void setDefaultEndpointUri(String endpointUri) {
263 setDefaultEndpoint(getContext().getEndpoint(endpointUri));
264 }
265
266 public boolean isUseEndpointCache() {
267 return useEndpointCache;
268 }
269
270 public void setUseEndpointCache(boolean useEndpointCache) {
271 this.useEndpointCache = useEndpointCache;
272 }
273
274 // Implementation methods
275 // -----------------------------------------------------------------------
276
277 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
278 Endpoint endpoint = null;
279
280 if (isUseEndpointCache()) {
281 synchronized (endpointCache) {
282 endpoint = endpointCache.get(endpointUri);
283 if (endpoint == null) {
284 endpoint = context.getEndpoint(endpointUri);
285 if (endpoint != null) {
286 endpointCache.put(endpointUri, endpoint);
287 }
288 }
289 }
290 } else {
291 endpoint = context.getEndpoint(endpointUri);
292 }
293 if (endpoint == null) {
294 throw new NoSuchEndpointException(endpointUri);
295 }
296 return endpoint;
297 }
298
299 protected Endpoint<E> getMandatoryDefaultEndpoint() {
300 Endpoint<E> answer = getDefaultEndpoint();
301 ObjectHelper.notNull(answer, "defaultEndpoint");
302 return answer;
303 }
304
305 protected void doStart() throws Exception {
306 producerCache.start();
307 }
308
309 protected void doStop() throws Exception {
310 producerCache.stop();
311 }
312
313 protected Object extractResultBody(E result) {
314 Object answer = null;
315 if (result != null) {
316 answer = result.getOut().getBody();
317 if (answer == null) {
318 answer = result.getIn().getBody();
319 }
320 }
321 return answer;
322 }
323 }