001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.builder;
019
020 import org.apache.camel.Exchange;
021 import org.apache.camel.Expression;
022 import org.apache.camel.Processor;
023 import org.apache.camel.Route;
024 import org.apache.camel.Service;
025 import org.apache.camel.processor.Aggregator;
026 import org.apache.camel.processor.aggregate.AggregationStrategy;
027 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
028
029 import java.util.List;
030
031 /**
032 * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
033 * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
034 * and messages for the same correlation key are combined together using some kind of
035 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
036 * into a smaller number of exchanges.
037 * <p/>
038 * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
039 * throttle it right down so that multiple messages for the same stock are combined (or just the latest
040 * message is used and older prices are discarded). Another idea is to combine line item messages together
041 * into a single invoice message.
042 *
043 * @version $Revision: 1.1 $
044 */
045 public class AggregatorBuilder extends FromBuilder {
046 private final Expression correlationExpression;
047 private long batchTimeout = 1000L;
048 private int batchSize = 50000;
049 private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
050
051 public AggregatorBuilder(FromBuilder builder, Expression correlationExpression) {
052 super(builder);
053 this.correlationExpression = correlationExpression;
054 }
055
056 @Override
057 public Route createRoute() throws Exception {
058 final Processor processor = super.createProcessor();
059 final Aggregator service = new Aggregator(getFrom(), processor, correlationExpression, aggregationStrategy);
060
061 return new Route<Exchange>(getFrom(), service) {
062
063 @Override
064 public String toString() {
065 return "AggregatorRoute[" + getEndpoint() + " -> " + processor + "]";
066 }
067 };
068 }
069
070 // Builder methods
071 //-------------------------------------------------------------------------
072 public AggregatorBuilder aggregationStrategy(AggregationStrategy aggregationStrategy) {
073 setAggregationStrategy(aggregationStrategy);
074 return this;
075 }
076
077 public AggregatorBuilder batchSize(int batchSize) {
078 setBatchSize(batchSize);
079 return this;
080 }
081
082 public AggregatorBuilder batchTimeout(int batchTimeout) {
083 setBatchTimeout(batchTimeout);
084 return this;
085 }
086
087 // Properties
088 //-------------------------------------------------------------------------
089 public AggregationStrategy getAggregationStrategy() {
090 return aggregationStrategy;
091 }
092
093 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
094 this.aggregationStrategy = aggregationStrategy;
095 }
096
097 public int getBatchSize() {
098 return batchSize;
099 }
100
101 public void setBatchSize(int batchSize) {
102 this.batchSize = batchSize;
103 }
104
105 public long getBatchTimeout() {
106 return batchTimeout;
107 }
108
109 public void setBatchTimeout(long batchTimeout) {
110 this.batchTimeout = batchTimeout;
111 }
112
113 }