001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.Collection;
020 import java.util.Iterator;
021 import java.util.concurrent.LinkedBlockingQueue;
022
023 import org.apache.camel.Exchange;
024 import org.apache.camel.Processor;
025 import org.apache.camel.impl.LoggingExceptionHandler;
026 import org.apache.camel.impl.ServiceSupport;
027 import org.apache.camel.spi.ExceptionHandler;
028 import org.apache.camel.util.ServiceHelper;
029
030 /**
031 * A base class for any kind of {@link Processor} which implements some kind of
032 * batch processing.
033 *
034 * @version $Revision: 726941 $
035 */
036 public class BatchProcessor extends ServiceSupport implements Processor {
037
038 public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
039 public static final int DEFAULT_BATCH_SIZE = 100;
040
041 private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
042 private int batchSize = DEFAULT_BATCH_SIZE;
043 private int outBatchSize;
044
045 private Processor processor;
046 private Collection<Exchange> collection;
047 private ExceptionHandler exceptionHandler;
048
049 private BatchSender sender;
050
051 public BatchProcessor(Processor processor, Collection<Exchange> collection) {
052 this.processor = processor;
053 this.collection = collection;
054 this.sender = new BatchSender();
055 }
056
057 @Override
058 public String toString() {
059 return "BatchProcessor[to: " + processor + "]";
060 }
061
062 // Properties
063 // -------------------------------------------------------------------------
064 public ExceptionHandler getExceptionHandler() {
065 if (exceptionHandler == null) {
066 exceptionHandler = new LoggingExceptionHandler(getClass());
067 }
068 return exceptionHandler;
069 }
070
071 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
072 this.exceptionHandler = exceptionHandler;
073 }
074
075 public int getBatchSize() {
076 return batchSize;
077 }
078
079 /**
080 * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor
081 * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
082 *
083 * @param batchSize the size
084 */
085 public void setBatchSize(int batchSize) {
086 this.batchSize = batchSize;
087 }
088
089 public int getOutBatchSize() {
090 return outBatchSize;
091 }
092
093 /**
094 * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
095 * the completion is triggered. Can for instance be used to ensure that this batch is completed when
096 * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled.
097 *
098 * @param outBatchSize the size
099 */
100 public void setOutBatchSize(int outBatchSize) {
101 this.outBatchSize = outBatchSize;
102 }
103
104 public long getBatchTimeout() {
105 return batchTimeout;
106 }
107
108 public void setBatchTimeout(long batchTimeout) {
109 this.batchTimeout = batchTimeout;
110 }
111
112 public Processor getProcessor() {
113 return processor;
114 }
115
116 /**
117 * A strategy method to decide if the "in" batch is completed. That is, whether the resulting
118 * exchanges in the in queue should be drained to the "out" collection.
119 */
120 protected boolean isInBatchCompleted(int num) {
121 return num >= batchSize;
122 }
123
124 /**
125 * A strategy method to decide if the "out" batch is completed. That is, whether the resulting
126 * exchange in the out collection should be sent.
127 */
128 protected boolean isOutBatchCompleted() {
129 if (outBatchSize == 0) {
130 // out batch is disabled, so go ahead and send.
131 return true;
132 }
133 return collection.size() > 0 && collection.size() >= outBatchSize;
134 }
135
136 /**
137 * Strategy Method to process an exchange in the batch. This method allows
138 * derived classes to perform custom processing before or after an
139 * individual exchange is processed
140 */
141 protected void processExchange(Exchange exchange) throws Exception {
142 processor.process(exchange);
143 }
144
145 protected void doStart() throws Exception {
146 ServiceHelper.startServices(processor);
147 sender.start();
148 }
149
150 protected void doStop() throws Exception {
151 sender.cancel();
152 ServiceHelper.stopServices(processor);
153 collection.clear();
154 }
155
156 protected Collection<Exchange> getCollection() {
157 return collection;
158 }
159
160 /**
161 * Enqueues an exchange for later batch processing.
162 */
163 public void process(Exchange exchange) throws Exception {
164 sender.enqueueExchange(exchange);
165 }
166
167 /**
168 * Sender thread for queued-up exchanges.
169 */
170 private class BatchSender extends Thread {
171
172 private volatile boolean cancelRequested;
173
174 private LinkedBlockingQueue<Exchange> queue;
175
176 public BatchSender() {
177 super("Batch Sender");
178 this.queue = new LinkedBlockingQueue<Exchange>();
179 }
180
181 @Override
182 public void run() {
183 while (true) {
184 try {
185 Thread.sleep(batchTimeout);
186 queue.drainTo(collection, batchSize);
187 } catch (InterruptedException e) {
188 if (cancelRequested) {
189 return;
190 }
191
192 while (isInBatchCompleted(queue.size())) {
193 queue.drainTo(collection, batchSize);
194 }
195
196 if (!isOutBatchCompleted()) {
197 continue;
198 }
199 }
200 try {
201 sendExchanges();
202 } catch (Exception e) {
203 getExceptionHandler().handleException(e);
204 }
205 }
206 }
207
208 public void cancel() {
209 cancelRequested = true;
210 interrupt();
211 }
212
213 public void enqueueExchange(Exchange exchange) {
214 queue.add(exchange);
215 interrupt();
216 }
217
218 private void sendExchanges() throws Exception {
219 Iterator<Exchange> iter = collection.iterator();
220 while (iter.hasNext()) {
221 Exchange exchange = iter.next();
222 iter.remove();
223 processExchange(exchange);
224 }
225 }
226 }
227
228 }