001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.util;
019
020 import org.apache.camel.Exchange;
021 import org.apache.camel.CamelContext;
022 import org.apache.camel.Endpoint;
023 import org.apache.camel.NoSuchEndpointException;
024 import org.apache.camel.Processor;
025 import org.apache.camel.Producer;
026 import org.apache.camel.impl.ServiceSupport;
027
028 /**
029 * A helper client for working with Camel
030 *
031 * @version $Revision: 532790 $
032 */
033 public class CamelClient<E extends Exchange> extends ServiceSupport {
034 private CamelContext context;
035 private ProducerCache<E> producerCache = new ProducerCache<E>();
036
037 public CamelClient(CamelContext context) {
038 this.context = context;
039 }
040
041
042 /**
043 * Sends the exchange to the given endpoint
044 *
045 * @param endpointUri the endpoint URI to send the exchange to
046 * @param exchange the exchange to send
047 */
048 public E send(String endpointUri, E exchange) {
049 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
050 send(endpoint, exchange);
051 return exchange;
052 }
053
054 /**
055 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
056 *
057 * @param endpointUri the endpoint URI to send the exchange to
058 * @param processor the transformer used to populate the new exchange
059 */
060 public E send(String endpointUri, Processor<E> processor) {
061 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
062 return send(endpoint, processor);
063 }
064
065 /**
066 * Sends the exchange to the given endpoint
067 *
068 * @param endpoint the endpoint to send the exchange to
069 * @param exchange the exchange to send
070 */
071 public E send(Endpoint<E> endpoint, E exchange) {
072 producerCache.send(endpoint, exchange);
073 return exchange;
074 }
075
076 /**
077 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
078 *
079 * @param endpoint the endpoint to send the exchange to
080 * @param processor the transformer used to populate the new exchange
081 */
082 public E send(Endpoint<E> endpoint, Processor<E> processor) {
083 return producerCache.send(endpoint, processor);
084 }
085
086 public Producer<E> getProducer(Endpoint<E> endpoint) {
087 return producerCache.getProducer(endpoint);
088 }
089
090 public CamelContext getContext() {
091 return context;
092 }
093
094 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
095 Endpoint endpoint = context.getEndpoint(endpointUri);
096 if (endpoint == null) {
097 throw new NoSuchEndpointException(endpointUri);
098 }
099 return endpoint;
100 }
101
102 protected void doStart() throws Exception {
103 producerCache.start();
104 }
105
106 protected void doStop() throws Exception {
107 producerCache.stop();
108 }
109 }