001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.builder;
018
019 import org.apache.camel.Endpoint;
020 import org.apache.camel.Exchange;
021 import org.apache.camel.Expression;
022 import org.apache.camel.Predicate;
023 import org.apache.camel.Processor;
024 import org.apache.camel.Route;
025 import org.apache.camel.impl.EventDrivenConsumerRoute;
026 import org.apache.camel.processor.CompositeProcessor;
027 import org.apache.camel.processor.DelegateProcessor;
028 import org.apache.camel.processor.MulticastProcessor;
029 import org.apache.camel.processor.Pipeline;
030 import org.apache.camel.processor.RecipientList;
031 import org.apache.camel.processor.aggregate.AggregationStrategy;
032 import org.apache.camel.processor.idempotent.IdempotentConsumer;
033 import org.apache.camel.processor.idempotent.MessageIdRepository;
034 import org.apache.camel.spi.Policy;
035 import org.apache.commons.logging.Log;
036 import org.apache.commons.logging.LogFactory;
037
038 import java.util.ArrayList;
039 import java.util.Collection;
040 import java.util.Collections;
041 import java.util.List;
042
043 import sun.net.smtp.SmtpClient;
044
045 /**
046 * @version $Revision: 559613 $
047 */
048 public class FromBuilder extends BuilderSupport implements ProcessorFactory {
049 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
050 private RouteBuilder builder;
051 private Endpoint from;
052 private List<Processor> processors = new ArrayList<Processor>();
053 private List<ProcessorFactory> processFactories = new ArrayList<ProcessorFactory>();
054 private FromBuilder routeBuilder;
055
056 public FromBuilder(RouteBuilder builder, Endpoint from) {
057 super(builder);
058 this.builder = builder;
059 this.from = from;
060 }
061
062 public FromBuilder(FromBuilder parent) {
063 super(parent);
064 this.builder = parent.getBuilder();
065 this.from = parent.getFrom();
066 }
067
068 /**
069 * Sends the exchange to the given endpoint URI
070 */
071 @Fluent
072 public ProcessorFactory to(@FluentArg("uri")String uri) {
073 return to(endpoint(uri));
074 }
075
076 /**
077 * Sends the exchange to the given endpoint
078 */
079 @Fluent
080 public ProcessorFactory to(@FluentArg("ref")Endpoint endpoint) {
081 ToBuilder answer = new ToBuilder(this, endpoint);
082 addProcessBuilder(answer);
083 return answer;
084 }
085
086 /**
087 * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
088 */
089 @Fluent
090 public ProcessorFactory to(String... uris) {
091 return to(endpoints(uris));
092 }
093
094 /**
095 * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
096 */
097 @Fluent
098 public ProcessorFactory to(
099 @FluentArg(value = "endpoint", attribute = false, element = true)
100 Endpoint... endpoints) {
101 return to(endpoints(endpoints));
102 }
103
104 /**
105 * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern
106 */
107 @Fluent
108 public ProcessorFactory to(@FluentArg(value = "endpoint", attribute = false, element = true)
109 Collection<Endpoint> endpoints) {
110 return addProcessBuilder(new MulticastBuilder(this, endpoints));
111 }
112
113 /**
114 * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
115 * and for request/response the output of one endpoint will be the input of the next endpoint
116 */
117 @Fluent
118 public ProcessorFactory pipeline(@FluentArg("uris")String... uris) {
119 return pipeline(endpoints(uris));
120 }
121
122 /**
123 * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
124 * and for request/response the output of one endpoint will be the input of the next endpoint
125 */
126 @Fluent
127 public ProcessorFactory pipeline(@FluentArg("endpoints")Endpoint... endpoints) {
128 return pipeline(endpoints(endpoints));
129 }
130
131 /**
132 * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
133 * and for request/response the output of one endpoint will be the input of the next endpoint
134 */
135 @Fluent
136 public ProcessorFactory pipeline(@FluentArg("endpoints")Collection<Endpoint> endpoints) {
137 return addProcessBuilder(new PipelineBuilder(this, endpoints));
138 }
139
140 /**
141 * Creates an {@link IdempotentConsumer} to avoid duplicate messages
142 */
143 @Fluent
144 public IdempotentConsumerBuilder idempotentConsumer(
145 @FluentArg("messageIdExpression")Expression messageIdExpression,
146 @FluentArg("MessageIdRepository")MessageIdRepository messageIdRepository) {
147 return (IdempotentConsumerBuilder) addProcessBuilder(new IdempotentConsumerBuilder(this, messageIdExpression, messageIdRepository));
148 }
149
150 /**
151 * Creates a predicate which is applied and only if it is true then
152 * the exchange is forwarded to the destination
153 *
154 * @return the builder for a predicate
155 */
156 @Fluent
157 public FilterBuilder filter(
158 @FluentArg(value = "predicate", element = true)
159 Predicate predicate) {
160 FilterBuilder answer = new FilterBuilder(this, predicate);
161 addProcessBuilder(answer);
162 return answer;
163 }
164
165 /**
166 * Creates a choice of one or more predicates with an otherwise clause
167 *
168 * @return the builder for a choice expression
169 */
170 @Fluent(nestedActions = true)
171 public ChoiceBuilder choice() {
172 ChoiceBuilder answer = new ChoiceBuilder(this);
173 addProcessBuilder(answer);
174 return answer;
175 }
176
177 /**
178 * Creates a dynamic <a href="http://activemq.apache.org/camel/recipient-list.html">Recipient List</a> pattern.
179 *
180 * @param receipients is the builder of the expression used in the {@link RecipientList} to decide the destinations
181 */
182 @Fluent
183 public RecipientListBuilder recipientList(
184 @FluentArg(value = "recipients", element = true)
185 Expression receipients) {
186 RecipientListBuilder answer = new RecipientListBuilder(this, receipients);
187 addProcessBuilder(answer);
188 return answer;
189 }
190
191 /**
192 * A builder for the <a href="http://activemq.apache.org/camel/splitter.html">Splitter</a> pattern
193 * where an expression is evaluated to iterate through each of the parts of a message and then each part is then send to some endpoint.
194 *
195 * @param receipients the expression on which to split
196 * @return the builder
197 */
198 @Fluent
199 public SplitterBuilder splitter(@FluentArg(value = "recipients", element = true)Expression receipients) {
200 SplitterBuilder answer = new SplitterBuilder(this, receipients);
201 addProcessBuilder(answer);
202 return answer;
203 }
204
205 /**
206 * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern
207 * where an expression is evaluated to be able to compare the message exchanges to reorder them. e.g. you
208 * may wish to sort by some header
209 *
210 * @param expression the expression on which to compare messages in order
211 * @return the builder
212 */
213 public ResequencerBuilder resequencer(Expression<Exchange> expression) {
214 return resequencer(Collections.<Expression<Exchange>>singletonList(expression));
215 }
216
217 /**
218 * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern
219 * where a list of expressions are evaluated to be able to compare the message exchanges to reorder them. e.g. you
220 * may wish to sort by some headers
221 *
222 * @param expressions the expressions on which to compare messages in order
223 * @return the builder
224 */
225 @Fluent
226 public ResequencerBuilder resequencer(@FluentArg(value = "expressions")List<Expression<Exchange>> expressions) {
227 ResequencerBuilder answer = new ResequencerBuilder(this, expressions);
228 setRouteBuilder(answer);
229 return answer;
230 }
231
232 /**
233 * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern
234 * where a list of expressions are evaluated to be able to compare the message exchanges to reorder them. e.g. you
235 * may wish to sort by some headers
236 *
237 * @param expressions the expressions on which to compare messages in order
238 * @return the builder
239 */
240 @Fluent
241 public ResequencerBuilder resequencer(Expression<Exchange>... expressions) {
242 List<Expression<Exchange>> list = new ArrayList<Expression<Exchange>>();
243 for (Expression<Exchange> expression : expressions) {
244 list.add(expression);
245 }
246 return resequencer(list);
247 }
248
249 /**
250 * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
251 * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
252 * and messages for the same correlation key are combined together using some kind of
253 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
254 * into a smaller number of exchanges.
255 * <p/>
256 * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
257 * throttle it right down so that multiple messages for the same stock are combined (or just the latest
258 * message is used and older prices are discarded). Another idea is to combine line item messages together
259 * into a single invoice message.
260 *
261 * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
262 * be the expression <code>header("JMSDestination")</code> or <code>header("JMSCorrelationID")</code>
263 */
264 @Fluent
265 public AggregatorBuilder aggregator(Expression correlationExpression) {
266 AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression);
267 setRouteBuilder(answer);
268 return answer;
269 }
270
271 /**
272 * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
273 * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
274 * and messages for the same correlation key are combined together using some kind of
275 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
276 * into a smaller number of exchanges.
277 * <p/>
278 * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
279 * throttle it right down so that multiple messages for the same stock are combined (or just the latest
280 * message is used and older prices are discarded). Another idea is to combine line item messages together
281 * into a single invoice message.
282 *
283 * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
284 * be the expression <code>header("JMSDestination")</code> or <code>header("JMSCorrelationID")</code>
285 */
286 @Fluent
287 public AggregatorBuilder aggregator(Expression correlationExpression, AggregationStrategy strategy) {
288 AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression);
289 answer.aggregationStrategy(strategy);
290 setRouteBuilder(answer);
291 return answer;
292 }
293
294 /**
295 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
296 * where an expression is used to calculate the time which the message will be dispatched on
297 *
298 * @param processAtExpression an expression to calculate the time at which the messages should be processed
299 * @return the builder
300 */
301 @Fluent
302 public DelayerBuilder delayer(Expression<Exchange> processAtExpression) {
303 return delayer(processAtExpression, 0L);
304 }
305
306 /**
307 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
308 * where an expression is used to calculate the time which the message will be dispatched on
309 *
310 * @param processAtExpression an expression to calculate the time at which the messages should be processed
311 * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the
312 * message should be processed
313 * @return the builder
314 */
315 @Fluent
316 public DelayerBuilder delayer(Expression<Exchange> processAtExpression, long delay) {
317 DelayerBuilder answer = new DelayerBuilder(this, processAtExpression, delay);
318 setRouteBuilder(answer);
319 return answer;
320 }
321
322 /**
323 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
324 * where a fixed amount of milliseconds are used to delay processing of a message exchange
325 *
326 * @param delay the default delay in milliseconds
327 * @return the builder
328 */
329 @Fluent
330 public DelayerBuilder delayer(long delay) {
331 return delayer(null, delay);
332 }
333
334
335 /**
336 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
337 * where an expression is used to calculate the time which the message will be dispatched on
338 *
339 * @param processAtExpression an expression to calculate the time at which the messages should be processed
340 * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the
341 * message should be processed
342 * @return the builder
343 */
344 @Fluent
345 public ThrottlerBuilder throttler(long maximumRequestCount) {
346 ThrottlerBuilder answer = new ThrottlerBuilder(this, maximumRequestCount);
347 setRouteBuilder(answer);
348 return answer;
349 }
350
351
352 /**
353 * Installs the given error handler builder
354 *
355 * @param errorHandlerBuilder the error handler to be used by default for all child routes
356 * @return the current builder with the error handler configured
357 */
358 @Fluent
359 public FromBuilder errorHandler(@FluentArg("handler")ErrorHandlerBuilder errorHandlerBuilder) {
360 setErrorHandlerBuilder(errorHandlerBuilder);
361 return this;
362 }
363
364 /**
365 * Configures whether or not the error handler is inherited by every processing node (or just the top most one)
366 *
367 * @param condition the falg as to whether error handlers should be inherited or not
368 * @return the current builder
369 */
370 @Fluent
371 public FromBuilder inheritErrorHandler(@FluentArg("condition")boolean condition) {
372 setInheritErrorHandler(condition);
373 return this;
374 }
375
376 @Fluent(nestedActions = true)
377 public InterceptorBuilder intercept() {
378 InterceptorBuilder answer = new InterceptorBuilder(this);
379 addProcessBuilder(answer);
380 return answer;
381 }
382
383 @Fluent
384 public FromBuilder intercept(@FluentArg("interceptor")DelegateProcessor interceptor) {
385 InterceptorBuilder answer = new InterceptorBuilder(this);
386 answer.add(interceptor);
387 addProcessBuilder(answer);
388 return answer.target();
389 }
390
391 /**
392 * Trace logs the exchange before it goes to the next processing step using the {@link #DEFAULT_TRACE_CATEGORY} logging
393 * category.
394 *
395 * @return
396 */
397 @Fluent
398 public FromBuilder trace() {
399 return trace(DEFAULT_TRACE_CATEGORY);
400 }
401
402 /**
403 * Trace logs the exchange before it goes to the next processing step using the specified logging
404 * category.
405 *
406 * @param category the logging category trace messages will sent to.
407 * @return
408 */
409 @Fluent
410 public FromBuilder trace(@FluentArg("category")String category) {
411 final Log log = LogFactory.getLog(category);
412 return intercept(new DelegateProcessor() {
413 @Override
414 public void process(Exchange exchange) throws Exception {
415 log.trace(exchange);
416 processNext(exchange);
417 }
418 });
419 }
420
421 @Fluent(nestedActions = true)
422 public PolicyBuilder policies() {
423 PolicyBuilder answer = new PolicyBuilder(this);
424 addProcessBuilder(answer);
425 return answer;
426 }
427
428 @Fluent
429 public FromBuilder policy(@FluentArg("policy")Policy policy) {
430 PolicyBuilder answer = new PolicyBuilder(this);
431 answer.add(policy);
432 addProcessBuilder(answer);
433 return answer.target();
434 }
435
436 // Transformers
437 //-------------------------------------------------------------------------
438
439 /**
440 * Adds the custom processor to this destination which could be a final destination, or could be a transformation in a pipeline
441 */
442 @Fluent
443 public FromBuilder process(@FluentArg("ref")Processor processor) {
444 addProcessorBuilder(processor);
445 return this;
446 }
447
448 /**
449 * Adds a processor which sets the body on the IN message
450 */
451 @Fluent
452 public FromBuilder setBody(Expression expression) {
453 addProcessorBuilder(ProcessorBuilder.setBody(expression));
454 return this;
455 }
456
457 /**
458 * Adds a processor which sets the body on the OUT message
459 */
460 @Fluent
461 public FromBuilder setOutBody(Expression expression) {
462 addProcessorBuilder(ProcessorBuilder.setOutBody(expression));
463 return this;
464 }
465
466 /**
467 * Adds a processor which sets the header on the IN message
468 */
469 @Fluent
470 public FromBuilder setHeader(String name, Expression expression) {
471 addProcessorBuilder(ProcessorBuilder.setHeader(name, expression));
472 return this;
473 }
474
475 /**
476 * Adds a processor which sets the header on the OUT message
477 */
478 @Fluent
479 public FromBuilder setOutHeader(String name, Expression expression) {
480 addProcessorBuilder(ProcessorBuilder.setOutHeader(name, expression));
481 return this;
482 }
483
484 /**
485 * Adds a processor which sets the exchange property
486 */
487 @Fluent
488 public FromBuilder setProperty(String name, Expression expression) {
489 addProcessorBuilder(ProcessorBuilder.setProperty(name, expression));
490 return this;
491 }
492
493 /**
494 * Converts the IN message body to the specified type
495 */
496 @Fluent
497 public FromBuilder convertBodyTo(Class type) {
498 addProcessorBuilder(ProcessorBuilder.setBody(Builder.body().convertTo(type)));
499 return this;
500 }
501
502 /**
503 * Converts the OUT message body to the specified type
504 */
505 @Fluent
506 public FromBuilder convertOutBodyTo(Class type) {
507 addProcessorBuilder(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type)));
508 return this;
509 }
510
511 // Properties
512 //-------------------------------------------------------------------------
513 public RouteBuilder getBuilder() {
514 return builder;
515 }
516
517 public Endpoint getFrom() {
518 return from;
519 }
520
521 public List<Processor> getProcessors() {
522 return processors;
523 }
524
525 public ProcessorFactory addProcessBuilder(ProcessorFactory processFactory) {
526 processFactories.add(processFactory);
527 return processFactory;
528 }
529
530 protected void addProcessorBuilder(Processor processor) {
531 addProcessBuilder(new ConstantProcessorBuilder(processor));
532 }
533
534 public void addProcessor(Processor processor) {
535 processors.add(processor);
536 }
537
538 public Route createRoute() throws Exception {
539 if (routeBuilder != null) {
540 return routeBuilder.createRoute();
541 }
542 Processor processor = createProcessor();
543 if (processor == null) {
544 throw new IllegalArgumentException("No processor created for: " + this);
545 }
546 return new EventDrivenConsumerRoute(getFrom(), processor);
547 }
548
549 public Processor createProcessor() throws Exception {
550 List<Processor> answer = new ArrayList<Processor>();
551
552 for (ProcessorFactory processFactory : processFactories) {
553 Processor processor = makeProcessor(processFactory);
554 if (processor == null) {
555 throw new IllegalArgumentException("No processor created for processBuilder: " + processFactory);
556 }
557 answer.add(processor);
558 }
559 if (answer.size() == 0) {
560 return null;
561 }
562 Processor processor = null;
563 if (answer.size() == 1) {
564 processor = answer.get(0);
565 }
566 else {
567 processor = new CompositeProcessor(answer);
568 }
569 return processor;
570 }
571
572 /**
573 * Creates the processor and wraps it in any necessary interceptors and error handlers
574 */
575 protected Processor makeProcessor(ProcessorFactory processFactory) throws Exception {
576 Processor processor = processFactory.createProcessor();
577 processor = wrapProcessor(processor);
578 return wrapInErrorHandler(processor);
579 }
580
581 /**
582 * A strategy method to allow newly created processors to be wrapped in an error handler. This feature
583 * could be disabled for child builders such as {@link IdempotentConsumerBuilder} which will rely on the
584 * {@link FromBuilder} to perform the error handling to avoid doubly-wrapped processors with 2 nested error handlers
585 */
586 protected Processor wrapInErrorHandler(Processor processor) throws Exception {
587 return getErrorHandlerBuilder().createErrorHandler(processor);
588 }
589
590 /**
591 * A strategy method which allows derived classes to wrap the child processor in some kind of interceptor such as
592 * a filter for the {@link IdempotentConsumerBuilder}.
593 *
594 * @param processor the processor which can be wrapped
595 * @return the original processor or a new wrapped interceptor
596 */
597 protected Processor wrapProcessor(Processor processor) {
598 return processor;
599 }
600
601 protected FromBuilder getRouteBuilder() {
602 return routeBuilder;
603 }
604
605 protected void setRouteBuilder(FromBuilder routeBuilder) {
606 this.routeBuilder = routeBuilder;
607 }
608
609 }