001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel;
019
020 import org.apache.camel.impl.ServiceSupport;
021 import org.apache.camel.util.ProducerCache;
022
023 /**
024 * A Client object for working with Camel and invoking {@link Endpoint} instances with {@link Exchange} instances
025 *
026 * @version $Revision: 535136 $
027 */
028 public class CamelClient<E extends Exchange> extends ServiceSupport {
029 private CamelContext context;
030 private ProducerCache<E> producerCache = new ProducerCache<E>();
031
032 public CamelClient(CamelContext context) {
033 this.context = context;
034 }
035
036 /**
037 * Sends the exchange to the given endpoint
038 *
039 * @param endpointUri the endpoint URI to send the exchange to
040 * @param exchange the exchange to send
041 */
042 public E send(String endpointUri, E exchange) {
043 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
044 send(endpoint, exchange);
045 return exchange;
046 }
047
048 /**
049 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
050 *
051 * @param endpointUri the endpoint URI to send the exchange to
052 * @param processor the transformer used to populate the new exchange
053 */
054 public E send(String endpointUri, Processor processor) {
055 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
056 return send(endpoint, processor);
057 }
058
059 /**
060 * Sends the exchange to the given endpoint
061 *
062 * @param endpoint the endpoint to send the exchange to
063 * @param exchange the exchange to send
064 */
065 public E send(Endpoint<E> endpoint, E exchange) {
066 E convertedExchange = endpoint.toExchangeType(exchange);
067 producerCache.send(endpoint, convertedExchange);
068 return exchange;
069 }
070
071 /**
072 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
073 *
074 * @param endpoint the endpoint to send the exchange to
075 * @param processor the transformer used to populate the new exchange
076 */
077 public E send(Endpoint<E> endpoint, Processor processor) {
078 return producerCache.send(endpoint, processor);
079 }
080
081 /**
082 * Send the body to an endpoint
083 *
084 * @param endpointUri
085 * @param body = the payload
086 * @return the result
087 */
088 public Object sendBody(String endpointUri, final Object body) {
089 E result = send(endpointUri, new Processor() {
090 public void process(Exchange exchange) {
091 Message in = exchange.getIn();
092 in.setBody(body);
093 }
094 });
095 return extractResultBody(result);
096 }
097
098 /**
099 * Sends the body to an endpoint with a specified header and header value
100 *
101 * @param endpointUri the endpoint URI to send to
102 * @param body the payload send
103 * @param header the header name
104 * @param headerValue the header value
105 * @return the result
106 */
107 public Object sendBody(String endpointUri, final Object body, final String header, final Object headerValue) {
108 E result = send(endpointUri, new Processor() {
109 public void process(Exchange exchange) {
110 Message in = exchange.getIn();
111 in.setHeader(header, headerValue);
112 in.setBody(body);
113 }
114 });
115 return extractResultBody(result);
116 }
117
118 public Producer<E> getProducer(Endpoint<E> endpoint) {
119 return producerCache.getProducer(endpoint);
120 }
121
122 public CamelContext getContext() {
123 return context;
124 }
125
126 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
127 Endpoint endpoint = context.getEndpoint(endpointUri);
128 if (endpoint == null) {
129 throw new NoSuchEndpointException(endpointUri);
130 }
131 return endpoint;
132 }
133
134 protected void doStart() throws Exception {
135 producerCache.start();
136 }
137
138 protected void doStop() throws Exception {
139 producerCache.stop();
140 }
141
142 protected Object extractResultBody(E result) {
143 return result != null ? result.getOut().getBody() : null;
144 }
145 }