001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.util;
018
019 import java.util.HashMap;
020 import java.util.Map;
021
022 import org.apache.camel.Endpoint;
023 import org.apache.camel.Exchange;
024 import org.apache.camel.FailedToCreateProducerException;
025 import org.apache.camel.Processor;
026 import org.apache.camel.Producer;
027 import org.apache.camel.RuntimeCamelException;
028 import org.apache.camel.impl.ServiceSupport;
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031
032 /**
033 * @version $Revision: 563607 $
034 */
035 public class ProducerCache<E extends Exchange> extends ServiceSupport {
036 private static final Log LOG = LogFactory.getLog(ProducerCache.class);
037
038 private Map<String, Producer<E>> producers = new HashMap<String, Producer<E>>();
039
040 public synchronized Producer<E> getProducer(Endpoint<E> endpoint) {
041 String key = endpoint.getEndpointUri();
042 Producer<E> answer = producers.get(key);
043 if (answer == null) {
044 try {
045 answer = endpoint.createProducer();
046 answer.start();
047 } catch (Exception e) {
048 throw new FailedToCreateProducerException(endpoint, e);
049 }
050 producers.put(key, answer);
051 }
052 return answer;
053 }
054
055 /**
056 * Sends the exchange to the given endpoint
057 *
058 * @param endpoint the endpoint to send the exchange to
059 * @param exchange the exchange to send
060 */
061 public void send(Endpoint<E> endpoint, E exchange) {
062 try {
063 Producer<E> producer = getProducer(endpoint);
064 producer.process(exchange);
065 } catch (Exception e) {
066 throw new RuntimeCamelException(e);
067 }
068 }
069
070 /**
071 * Sends an exchange to an endpoint using a supplied
072 *
073 * @{link Processor} to populate the exchange
074 *
075 * @param endpoint the endpoint to send the exchange to
076 * @param processor the transformer used to populate the new exchange
077 */
078 public E send(Endpoint<E> endpoint, Processor processor) {
079 try {
080 Producer<E> producer = getProducer(endpoint);
081 E exchange = producer.createExchange();
082
083 // lets populate using the processor callback
084 processor.process(exchange);
085
086 // now lets dispatch
087 if (LOG.isDebugEnabled()) {
088 LOG.debug(">>>> " + endpoint + " " + exchange);
089 }
090 producer.process(exchange);
091 return exchange;
092 } catch (Exception e) {
093 throw new RuntimeCamelException(e);
094 }
095 }
096
097 protected void doStop() throws Exception {
098 ServiceHelper.stopServices(producers.values());
099 }
100
101 protected void doStart() throws Exception {
102 }
103 }