001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.processor;
019
020 import org.apache.camel.Endpoint;
021 import org.apache.camel.Exchange;
022 import org.apache.camel.Processor;
023 import org.apache.camel.Producer;
024
025 import java.util.Collection;
026
027 /**
028 * Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working
029 * with request/response message exchanges.
030 *
031 * @version $Revision: 534145 $
032 */
033 public class Pipeline extends MulticastProcessor implements Processor {
034 public Pipeline(Collection<Endpoint> endpoints) throws Exception {
035 super(endpoints);
036 }
037
038 public void process(Exchange exchange) throws Exception {
039 Exchange nextExchange = exchange;
040 boolean first = true;
041 for (Producer producer : getProducers()) {
042 if (first) {
043 first = false;
044 }
045 else {
046 nextExchange = createNextExchange(producer, nextExchange);
047 }
048 producer.process(nextExchange);
049 }
050 }
051
052 /**
053 * Strategy method to create the next exchange from the
054 *
055 * @param producer the producer used to send to the endpoint
056 * @param previousExchange the previous exchange
057 * @return a new exchange
058 */
059 protected Exchange createNextExchange(Producer producer, Exchange previousExchange) {
060 Exchange answer = producer.createExchange(previousExchange);
061
062 // now lets set the input of the next exchange to the output of the previous message if it is not null
063 Object output = previousExchange.getOut().getBody();
064 if (output != null) {
065 answer.getIn().setBody(output);
066 }
067 return answer;
068 }
069
070 /**
071 * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
072 * {@link Pipeline} will not clone the exchange
073 *
074 * @param exchange
075 * @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
076 */
077 protected Exchange copyExchangeStrategy(Exchange exchange) {
078 return exchange.copy();
079 }
080
081 @Override
082 public String toString() {
083 return "Pipeline" + getEndpoints();
084 }
085 }