001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.mock;
018
019 import java.util.ArrayList;
020 import java.util.HashMap;
021 import java.util.List;
022 import java.util.Map;
023 import java.util.concurrent.CopyOnWriteArrayList;
024 import java.util.concurrent.CountDownLatch;
025 import java.util.concurrent.TimeUnit;
026
027 import org.apache.camel.Component;
028 import org.apache.camel.Consumer;
029 import org.apache.camel.Exchange;
030 import org.apache.camel.Expression;
031 import org.apache.camel.Message;
032 import org.apache.camel.Processor;
033 import org.apache.camel.Producer;
034 import org.apache.camel.impl.DefaultEndpoint;
035 import org.apache.camel.impl.DefaultExchange;
036 import org.apache.camel.impl.DefaultProducer;
037 import org.apache.camel.util.ExpressionComparator;
038 import org.apache.camel.util.ObjectHelper;
039 import org.apache.commons.logging.Log;
040 import org.apache.commons.logging.LogFactory;
041
042 /**
043 * A Mock endpoint which provides a literate, fluent API for testing routes
044 * using a <a href="http://jmock.org/">JMock style</a> API.
045 *
046 * @version $Revision: 1.1 $
047 */
048 public class MockEndpoint extends DefaultEndpoint<Exchange> {
049 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
050 private int expectedCount = -1;
051 private int counter;
052 private Map<Integer, Processor> processors = new HashMap<Integer, Processor>();
053 private List<Exchange> receivedExchanges = new CopyOnWriteArrayList<Exchange>();
054 private List<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
055 private List<Runnable> tests = new CopyOnWriteArrayList<Runnable>();
056 private CountDownLatch latch;
057 private long sleepForEmptyTest = 1000L;
058 private long defaulResultWaitMillis = 20000L;
059 private int expectedMinimumCount = -1;
060 private List expectedBodyValues;
061 private List actualBodyValues = new ArrayList();
062
063 public MockEndpoint(String endpointUri, Component component) {
064 super(endpointUri, component);
065 }
066
067 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
068 long start = System.currentTimeMillis();
069 long left = unit.toMillis(timeout);
070 long end = start + left;
071 for (MockEndpoint endpoint : endpoints) {
072 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
073 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
074 }
075 left = end - System.currentTimeMillis();
076 if (left <= 0) {
077 left = 0;
078 }
079 }
080 }
081
082 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
083 assertWait(timeout, unit, endpoints);
084 for (MockEndpoint endpoint : endpoints) {
085 endpoint.assertIsSatisfied();
086 }
087 }
088
089 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
090 for (MockEndpoint endpoint : endpoints) {
091 endpoint.assertIsSatisfied();
092 }
093 }
094
095 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
096 for (MockEndpoint endpoint : endpoints) {
097 endpoint.expectsMessageCount(count);
098 }
099 }
100
101 public Exchange createExchange() {
102 return new DefaultExchange(getContext());
103 }
104
105 public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
106 throw new UnsupportedOperationException("You cannot consume from this endpoint");
107 }
108
109 public Producer<Exchange> createProducer() throws Exception {
110 return new DefaultProducer<Exchange>(this) {
111 public void process(Exchange exchange) {
112 onExchange(exchange);
113 }
114 };
115 }
116
117 // Testing API
118 // -------------------------------------------------------------------------
119
120 /**
121 * Validates that all the available expectations on this endpoint are
122 * satisfied; or throw an exception
123 */
124 public void assertIsSatisfied() throws InterruptedException {
125 assertIsSatisfied(sleepForEmptyTest);
126 }
127
128 /**
129 * Validates that all the available expectations on this endpoint are
130 * satisfied; or throw an exception
131 *
132 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
133 * should wait for the test to be true
134 */
135 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
136 if (expectedCount >= 0) {
137 if (expectedCount != getReceivedCounter()) {
138 if (expectedCount == 0) {
139 // lets wait a little bit just in case
140 if (timeoutForEmptyEndpoints > 0) {
141 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
142 Thread.sleep(timeoutForEmptyEndpoints);
143 }
144 } else {
145 waitForCompleteLatch();
146 }
147 }
148 assertEquals("Received message count", expectedCount, getReceivedCounter());
149 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
150 waitForCompleteLatch();
151 }
152
153 if (expectedMinimumCount >= 0) {
154 int receivedCounter = getReceivedCounter();
155 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter);
156 }
157
158 for (Runnable test : tests) {
159 test.run();
160 }
161
162 for (Throwable failure : failures) {
163 if (failure != null) {
164 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
165 fail("Failed due to caught exception: " + failure);
166 }
167 }
168 }
169
170 /**
171 * Validates that the assertions fail on this endpoint
172 */
173 public void assertIsNotSatisfied() throws InterruptedException {
174 try {
175 assertIsSatisfied();
176 fail("Expected assertion failure!");
177 } catch (AssertionError e) {
178 LOG.info("Caught expected failure: " + e);
179 }
180 }
181
182 /**
183 * Specifies the expected number of message exchanges that should be
184 * received by this endpoint
185 *
186 * @param expectedCount the number of message exchanges that should be
187 * expected by this endpoint
188 */
189 public void expectedMessageCount(int expectedCount) {
190 this.expectedCount = expectedCount;
191 if (expectedCount <= 0) {
192 latch = null;
193 } else {
194 latch = new CountDownLatch(expectedCount);
195 }
196 }
197
198 /**
199 * Specifies the minimum number of expected message exchanges that should be
200 * received by this endpoint
201 *
202 * @param expectedCount the number of message exchanges that should be
203 * expected by this endpoint
204 */
205 public void expectedMinimumMessageCount(int expectedCount) {
206 this.expectedMinimumCount = expectedCount;
207 if (expectedCount <= 0) {
208 latch = null;
209 } else {
210 latch = new CountDownLatch(expectedMinimumCount);
211 }
212 }
213
214 /**
215 * Adds an expectation that the given body values are received by this
216 * endpoint
217 */
218 public void expectedBodiesReceived(final List bodies) {
219 expectedMessageCount(bodies.size());
220 this.expectedBodyValues = bodies;
221 this.actualBodyValues = new ArrayList();
222
223 expects(new Runnable() {
224 public void run() {
225 for (int i = 0; i < expectedBodyValues.size(); i++) {
226 Exchange exchange = getReceivedExchanges().get(i);
227 assertTrue("No exchange received for counter: " + i, exchange != null);
228
229 Object expectedBody = expectedBodyValues.get(i);
230 Object actualBody = actualBodyValues.get(i);
231
232 assertEquals("Body of message: " + i, expectedBody, actualBody);
233 }
234 }
235 });
236 }
237
238 /**
239 * Adds an expectation that the given body values are received by this
240 * endpoint
241 */
242 public void expectedBodiesReceived(Object... bodies) {
243 List bodyList = new ArrayList();
244 for (Object body : bodies) {
245 bodyList.add(body);
246 }
247 expectedBodiesReceived(bodyList);
248 }
249
250 /**
251 * Adds an expectation that messages received should have ascending values
252 * of the given expression such as a user generated counter value
253 *
254 * @param expression
255 */
256 public void expectsAscending(final Expression<Exchange> expression) {
257 expects(new Runnable() {
258 public void run() {
259 assertMessagesAscending(expression);
260 }
261 });
262 }
263
264 /**
265 * Adds an expectation that messages received should have descending values
266 * of the given expression such as a user generated counter value
267 *
268 * @param expression
269 */
270 public void expectsDescending(final Expression<Exchange> expression) {
271 expects(new Runnable() {
272 public void run() {
273 assertMessagesDescending(expression);
274 }
275 });
276 }
277
278 /**
279 * Adds an expectation that no duplicate messages should be received using
280 * the expression to determine the message ID
281 *
282 * @param expression the expression used to create a unique message ID for
283 * message comparison (which could just be the message
284 * payload if the payload can be tested for uniqueness using
285 * {@link Object#equals(Object)} and
286 * {@link Object#hashCode()}
287 */
288 public void expectsNoDuplicates(final Expression<Exchange> expression) {
289 expects(new Runnable() {
290 public void run() {
291 assertNoDuplicates(expression);
292 }
293 });
294 }
295
296 /**
297 * Asserts that the messages have ascending values of the given expression
298 */
299 public void assertMessagesAscending(Expression<Exchange> expression) {
300 assertMessagesSorted(expression, true);
301 }
302
303 /**
304 * Asserts that the messages have descending values of the given expression
305 */
306 public void assertMessagesDescending(Expression<Exchange> expression) {
307 assertMessagesSorted(expression, false);
308 }
309
310 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) {
311 String type = ascending ? "ascending" : "descending";
312 ExpressionComparator comparator = new ExpressionComparator(expression);
313 List<Exchange> list = getReceivedExchanges();
314 for (int i = 1; i < list.size(); i++) {
315 int j = i - 1;
316 Exchange e1 = list.get(j);
317 Exchange e2 = list.get(i);
318 int result = comparator.compare(e1, e2);
319 if (result == 0) {
320 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
321 + e2);
322 } else {
323 if (!ascending) {
324 result = result * -1;
325 }
326 if (result > 0) {
327 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
328 + expression + ". Exchanges: " + e1 + " and " + e2);
329 }
330 }
331 }
332 }
333
334 public void assertNoDuplicates(Expression<Exchange> expression) {
335 Map<Object, Exchange> map = new HashMap<Object, Exchange>();
336 List<Exchange> list = getReceivedExchanges();
337 for (int i = 0; i < list.size(); i++) {
338 Exchange e2 = list.get(i);
339 Object key = expression.evaluate(e2);
340 Exchange e1 = map.get(key);
341 if (e1 != null) {
342 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
343 } else {
344 map.put(key, e2);
345 }
346 }
347 }
348
349 /**
350 * Adds the expection which will be invoked when enough messages are
351 * received
352 */
353 public void expects(Runnable runnable) {
354 tests.add(runnable);
355 }
356
357 /**
358 * Adds an assertion to the given message index
359 *
360 * @param messageIndex the number of the message
361 * @return the assertion clause
362 */
363 public AssertionClause message(final int messageIndex) {
364 AssertionClause clause = new AssertionClause() {
365 public void run() {
366 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
367 }
368 };
369 expects(clause);
370 return clause;
371 }
372
373 /**
374 * Adds an assertion to all the received messages
375 *
376 * @return the assertion clause
377 */
378 public AssertionClause allMessages() {
379 AssertionClause clause = new AssertionClause() {
380 public void run() {
381 List<Exchange> list = getReceivedExchanges();
382 int index = 0;
383 for (Exchange exchange : list) {
384 applyAssertionOn(MockEndpoint.this, index++, exchange);
385 }
386 }
387 };
388 expects(clause);
389 return clause;
390 }
391
392 /**
393 * Asserts that the given index of message is received (starting at zero)
394 */
395 public Exchange assertExchangeReceived(int index) {
396 int count = getReceivedCounter();
397 assertTrue("Not enough messages received. Was: " + count, count > index);
398 return getReceivedExchanges().get(index);
399 }
400
401 // Properties
402 // -------------------------------------------------------------------------
403 public List<Throwable> getFailures() {
404 return failures;
405 }
406
407 public int getReceivedCounter() {
408 return getReceivedExchanges().size();
409 }
410
411 public List<Exchange> getReceivedExchanges() {
412 return receivedExchanges;
413 }
414
415 public int getExpectedCount() {
416 return expectedCount;
417 }
418
419 public long getSleepForEmptyTest() {
420 return sleepForEmptyTest;
421 }
422
423 /**
424 * Allows a sleep to be specified to wait to check that this endpoint really
425 * is empty when {@link #expectedMessageCount(int)} is called with zero
426 *
427 * @param sleepForEmptyTest the milliseconds to sleep for to determine that
428 * this endpoint really is empty
429 */
430 public void setSleepForEmptyTest(long sleepForEmptyTest) {
431 this.sleepForEmptyTest = sleepForEmptyTest;
432 }
433
434 public long getDefaulResultWaitMillis() {
435 return defaulResultWaitMillis;
436 }
437
438 /**
439 * Sets the maximum amount of time the {@link #assertIsSatisfied()} will
440 * wait on a latch until it is satisfied
441 */
442 public void setDefaulResultWaitMillis(long defaulResultWaitMillis) {
443 this.defaulResultWaitMillis = defaulResultWaitMillis;
444 }
445
446 // Implementation methods
447 // -------------------------------------------------------------------------
448 protected synchronized void onExchange(Exchange exchange) {
449 try {
450 Message in = exchange.getIn();
451 Object actualBody = in.getBody();
452
453 if (expectedBodyValues != null) {
454 int index = actualBodyValues.size();
455 if (expectedBodyValues.size() > index) {
456 Object expectedBody = expectedBodyValues.get(index);
457 if (expectedBody != null) {
458 actualBody = in.getBody(expectedBody.getClass());
459 }
460 actualBodyValues.add(actualBody);
461 }
462 }
463
464 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
465
466 receivedExchanges.add(exchange);
467
468 Processor processor = processors.get(getReceivedCounter());
469 if (processor != null) {
470 processor.process(exchange);
471 }
472
473 if (latch != null) {
474 latch.countDown();
475 }
476 } catch (Exception e) {
477 failures.add(e);
478 }
479 }
480
481 protected void waitForCompleteLatch() throws InterruptedException {
482 if (latch == null) {
483 fail("Should have a latch!");
484 }
485
486 // now lets wait for the results
487 LOG.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis");
488 latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
489 }
490
491 protected void assertEquals(String message, Object expectedValue, Object actualValue) {
492 if (!ObjectHelper.equals(expectedValue, actualValue)) {
493 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
494 }
495 }
496
497 protected void assertTrue(String message, boolean predicate) {
498 if (!predicate) {
499 fail(message);
500 }
501 }
502
503 protected void fail(Object message) {
504 if (LOG.isDebugEnabled()) {
505 List<Exchange> list = getReceivedExchanges();
506 int index = 0;
507 for (Exchange exchange : list) {
508 LOG.debug("Received[" + (++index) + "]: " + exchange);
509 }
510 }
511 throw new AssertionError(getEndpointUri() + " " + message);
512 }
513
514 public int getExpectedMinimumCount() {
515 return expectedMinimumCount;
516 }
517
518 public void await() throws InterruptedException {
519 if (latch != null) {
520 latch.await();
521 }
522 }
523
524 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
525 if (latch != null) {
526 return latch.await(timeout, unit);
527 }
528 return true;
529 }
530
531 public boolean isSingleton() {
532 return true;
533 }
534
535 }