001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.beans.PropertyChangeListener; 020 import java.beans.PropertyChangeSupport; 021 import java.util.ArrayList; 022 import java.util.Arrays; 023 import java.util.Collection; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 import java.util.concurrent.CopyOnWriteArrayList; 030 import java.util.concurrent.CountDownLatch; 031 import java.util.concurrent.TimeUnit; 032 033 import org.apache.camel.CamelContext; 034 import org.apache.camel.Component; 035 import org.apache.camel.Consumer; 036 import org.apache.camel.Endpoint; 037 import org.apache.camel.Exchange; 038 import org.apache.camel.Expression; 039 import org.apache.camel.Message; 040 import org.apache.camel.Processor; 041 import org.apache.camel.Producer; 042 import org.apache.camel.impl.DefaultEndpoint; 043 import org.apache.camel.impl.DefaultProducer; 044 import org.apache.camel.spi.BrowsableEndpoint; 045 import org.apache.camel.util.CamelContextHelper; 046 import org.apache.camel.util.ExpressionComparator; 047 import org.apache.camel.util.ObjectHelper; 048 import org.apache.commons.logging.Log; 049 import org.apache.commons.logging.LogFactory; 050 051 /** 052 * A Mock endpoint which provides a literate, fluent API for testing routes 053 * using a <a href="http://jmock.org/">JMock style</a> API. 054 * 055 * @version $Revision: 2278 $ 056 */ 057 public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> { 058 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class); 059 private int expectedCount; 060 private int counter; 061 private Processor defaultProcessor; 062 private Map<Integer, Processor> processors; 063 private List<Exchange> receivedExchanges; 064 private List<Throwable> failures; 065 private List<Runnable> tests; 066 private CountDownLatch latch; 067 private long sleepForEmptyTest; 068 private long resultWaitTime; 069 private long resultMinimumWaitTime; 070 private int expectedMinimumCount; 071 private List expectedBodyValues; 072 private List actualBodyValues; 073 private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); 074 private String headerName; 075 private String headerValue; 076 private Object actualHeader; 077 private Processor reporter; 078 079 public MockEndpoint(String endpointUri, Component component) { 080 super(endpointUri, component); 081 init(); 082 } 083 084 public MockEndpoint(String endpointUri) { 085 super(endpointUri); 086 init(); 087 } 088 089 090 /** 091 * A helper method to resolve the mock endpoint of the given URI on the given context 092 * 093 * @param context the camel context to try resolve the mock endpoint from 094 * @param uri the uri of the endpoint to resolve 095 * @return the endpoint 096 */ 097 public static MockEndpoint resolve(CamelContext context, String uri) { 098 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class); 099 } 100 101 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 102 long start = System.currentTimeMillis(); 103 long left = unit.toMillis(timeout); 104 long end = start + left; 105 for (MockEndpoint endpoint : endpoints) { 106 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 107 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 108 } 109 left = end - System.currentTimeMillis(); 110 if (left <= 0) { 111 left = 0; 112 } 113 } 114 } 115 116 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 117 assertWait(timeout, unit, endpoints); 118 for (MockEndpoint endpoint : endpoints) { 119 endpoint.assertIsSatisfied(); 120 } 121 } 122 123 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 124 for (MockEndpoint endpoint : endpoints) { 125 endpoint.assertIsSatisfied(); 126 } 127 } 128 129 130 /** 131 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 132 * in the given context are valid 133 * 134 * @param context the camel context used to find all the available endpoints to be asserted 135 */ 136 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 137 ObjectHelper.notNull(context, "camelContext"); 138 Collection<Endpoint> endpoints = context.getSingletonEndpoints(); 139 for (Endpoint endpoint : endpoints) { 140 if (endpoint instanceof MockEndpoint) { 141 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 142 mockEndpoint.assertIsSatisfied(); 143 } 144 } 145 } 146 147 148 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 149 for (MockEndpoint endpoint : endpoints) { 150 MockEndpoint.expectsMessageCount(count); 151 } 152 } 153 154 public List<Exchange> getExchanges() { 155 return getReceivedExchanges(); 156 } 157 158 public void addPropertyChangeListener(PropertyChangeListener listener) { 159 propertyChangeSupport.addPropertyChangeListener(listener); 160 } 161 162 public void removePropertyChangeListener(PropertyChangeListener listener) { 163 propertyChangeSupport.removePropertyChangeListener(listener); 164 } 165 166 public Consumer<Exchange> createConsumer(Processor processor) throws Exception { 167 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 168 } 169 170 public Producer<Exchange> createProducer() throws Exception { 171 return new DefaultProducer<Exchange>(this) { 172 public void process(Exchange exchange) { 173 onExchange(exchange); 174 } 175 }; 176 } 177 178 public void reset() { 179 init(); 180 } 181 182 183 // Testing API 184 // ------------------------------------------------------------------------- 185 186 /** 187 * Set the processor that will be invoked when the index 188 * message is received. 189 * 190 * @param index 191 * @param processor 192 */ 193 public void whenExchangeReceived(int index, Processor processor) { 194 this.processors.put(index, processor); 195 } 196 197 /** 198 * Set the processor that will be invoked when the some message 199 * is received. 200 * 201 * This processor could be overwritten by 202 * {@link #whenExchangeReceived(int, Processor)} method. 203 * 204 * @param processor 205 */ 206 public void whenAnyExchangeReceived(Processor processor) { 207 this.defaultProcessor = processor; 208 } 209 210 /** 211 * Validates that all the available expectations on this endpoint are 212 * satisfied; or throw an exception 213 */ 214 public void assertIsSatisfied() throws InterruptedException { 215 assertIsSatisfied(sleepForEmptyTest); 216 } 217 218 /** 219 * Validates that all the available expectations on this endpoint are 220 * satisfied; or throw an exception 221 * 222 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 223 * should wait for the test to be true 224 */ 225 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 226 LOG.info("Asserting: " + this + " is satisfied"); 227 if (expectedCount == 0) { 228 if (timeoutForEmptyEndpoints > 0) { 229 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 230 Thread.sleep(timeoutForEmptyEndpoints); 231 } 232 assertEquals("Received message count", expectedCount, getReceivedCounter()); 233 } else if (expectedCount > 0) { 234 if (expectedCount != getReceivedCounter()) { 235 waitForCompleteLatch(); 236 } 237 assertEquals("Received message count", expectedCount, getReceivedCounter()); 238 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 239 waitForCompleteLatch(); 240 } 241 242 if (expectedMinimumCount >= 0) { 243 int receivedCounter = getReceivedCounter(); 244 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter); 245 } 246 247 for (Runnable test : tests) { 248 test.run(); 249 } 250 251 for (Throwable failure : failures) { 252 if (failure != null) { 253 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 254 fail("Failed due to caught exception: " + failure); 255 } 256 } 257 } 258 259 /** 260 * Validates that the assertions fail on this endpoint 261 */ 262 public void assertIsNotSatisfied() throws InterruptedException { 263 try { 264 assertIsSatisfied(); 265 fail("Expected assertion failure!"); 266 } catch (AssertionError e) { 267 LOG.info("Caught expected failure: " + e); 268 } 269 } 270 271 /** 272 * Specifies the expected number of message exchanges that should be 273 * received by this endpoint 274 * 275 * @param expectedCount the number of message exchanges that should be 276 * expected by this endpoint 277 */ 278 public void expectedMessageCount(int expectedCount) { 279 setExpectedMessageCount(expectedCount); 280 } 281 282 /** 283 * Specifies the minimum number of expected message exchanges that should be 284 * received by this endpoint 285 * 286 * @param expectedCount the number of message exchanges that should be 287 * expected by this endpoint 288 */ 289 public void expectedMinimumMessageCount(int expectedCount) { 290 setMinimumExpectedMessageCount(expectedCount); 291 } 292 293 /** 294 * Adds an expectation that the given header name & value are received by this 295 * endpoint 296 */ 297 public void expectedHeaderReceived(String name, String value) { 298 this.headerName = name; 299 this.headerValue = value; 300 301 expects(new Runnable() { 302 public void run() { 303 assertTrue("No header with name " + headerName + " found.", actualHeader != null); 304 305 assertEquals("Header of message", headerValue, actualHeader); 306 } 307 }); 308 } 309 310 /** 311 * Adds an expectation that the given body values are received by this 312 * endpoint in the specified order 313 */ 314 public void expectedBodiesReceived(final List bodies) { 315 expectedMessageCount(bodies.size()); 316 this.expectedBodyValues = bodies; 317 this.actualBodyValues = new ArrayList(); 318 319 expects(new Runnable() { 320 public void run() { 321 for (int i = 0; i < expectedBodyValues.size(); i++) { 322 Exchange exchange = getReceivedExchanges().get(i); 323 assertTrue("No exchange received for counter: " + i, exchange != null); 324 325 Object expectedBody = expectedBodyValues.get(i); 326 Object actualBody = null; 327 if (i < actualBodyValues.size()) { 328 actualBody = actualBodyValues.get(i); 329 } 330 331 assertEquals("Body of message: " + i, expectedBody, actualBody); 332 } 333 } 334 }); 335 } 336 337 /** 338 * Adds an expectation that the given body values are received by this 339 * endpoint 340 */ 341 public void expectedBodiesReceived(Object... bodies) { 342 List bodyList = new ArrayList(); 343 bodyList.addAll(Arrays.asList(bodies)); 344 expectedBodiesReceived(bodyList); 345 } 346 347 /** 348 * Adds an expectation that the given body values are received by this 349 * endpoint in any order 350 */ 351 public void expectedBodiesReceivedInAnyOrder(final List bodies) { 352 expectedMessageCount(bodies.size()); 353 this.expectedBodyValues = bodies; 354 this.actualBodyValues = new ArrayList(); 355 356 expects(new Runnable() { 357 public void run() { 358 Set actualBodyValuesSet = new HashSet(actualBodyValues); 359 for (int i = 0; i < expectedBodyValues.size(); i++) { 360 Exchange exchange = getReceivedExchanges().get(i); 361 assertTrue("No exchange received for counter: " + i, exchange != null); 362 363 Object expectedBody = expectedBodyValues.get(i); 364 assertTrue("Message with body " + expectedBody 365 + " was expected but not found in " + actualBodyValuesSet, 366 actualBodyValuesSet.remove(expectedBody)); 367 } 368 } 369 }); 370 } 371 372 /** 373 * Adds an expectation that the given body values are received by this 374 * endpoint in any order 375 */ 376 public void expectedBodiesReceivedInAnyOrder(Object... bodies) { 377 List bodyList = new ArrayList(); 378 bodyList.addAll(Arrays.asList(bodies)); 379 expectedBodiesReceivedInAnyOrder(bodyList); 380 } 381 382 /** 383 * Adds an expectation that messages received should have ascending values 384 * of the given expression such as a user generated counter value 385 * 386 * @param expression 387 */ 388 public void expectsAscending(final Expression<Exchange> expression) { 389 expects(new Runnable() { 390 public void run() { 391 assertMessagesAscending(expression); 392 } 393 }); 394 } 395 396 /** 397 * Adds an expectation that messages received should have descending values 398 * of the given expression such as a user generated counter value 399 * 400 * @param expression 401 */ 402 public void expectsDescending(final Expression<Exchange> expression) { 403 expects(new Runnable() { 404 public void run() { 405 assertMessagesDescending(expression); 406 } 407 }); 408 } 409 410 /** 411 * Adds an expectation that no duplicate messages should be received using 412 * the expression to determine the message ID 413 * 414 * @param expression the expression used to create a unique message ID for 415 * message comparison (which could just be the message 416 * payload if the payload can be tested for uniqueness using 417 * {@link Object#equals(Object)} and 418 * {@link Object#hashCode()} 419 */ 420 public void expectsNoDuplicates(final Expression<Exchange> expression) { 421 expects(new Runnable() { 422 public void run() { 423 assertNoDuplicates(expression); 424 } 425 }); 426 } 427 428 /** 429 * Asserts that the messages have ascending values of the given expression 430 */ 431 public void assertMessagesAscending(Expression<Exchange> expression) { 432 assertMessagesSorted(expression, true); 433 } 434 435 /** 436 * Asserts that the messages have descending values of the given expression 437 */ 438 public void assertMessagesDescending(Expression<Exchange> expression) { 439 assertMessagesSorted(expression, false); 440 } 441 442 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) { 443 String type = ascending ? "ascending" : "descending"; 444 ExpressionComparator comparator = new ExpressionComparator(expression); 445 List<Exchange> list = getReceivedExchanges(); 446 for (int i = 1; i < list.size(); i++) { 447 int j = i - 1; 448 Exchange e1 = list.get(j); 449 Exchange e2 = list.get(i); 450 int result = comparator.compare(e1, e2); 451 if (result == 0) { 452 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and " 453 + e2); 454 } else { 455 if (!ascending) { 456 result = result * -1; 457 } 458 if (result > 0) { 459 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: " 460 + expression + ". Exchanges: " + e1 + " and " + e2); 461 } 462 } 463 } 464 } 465 466 public void assertNoDuplicates(Expression<Exchange> expression) { 467 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 468 List<Exchange> list = getReceivedExchanges(); 469 for (int i = 0; i < list.size(); i++) { 470 Exchange e2 = list.get(i); 471 Object key = expression.evaluate(e2); 472 Exchange e1 = map.get(key); 473 if (e1 != null) { 474 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 475 } else { 476 map.put(key, e2); 477 } 478 } 479 } 480 481 /** 482 * Adds the expectation which will be invoked when enough messages are 483 * received 484 */ 485 public void expects(Runnable runnable) { 486 tests.add(runnable); 487 } 488 489 /** 490 * Adds an assertion to the given message index 491 * 492 * @param messageIndex the number of the message 493 * @return the assertion clause 494 */ 495 public AssertionClause message(final int messageIndex) { 496 AssertionClause clause = new AssertionClause() { 497 public void run() { 498 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 499 } 500 }; 501 expects(clause); 502 return clause; 503 } 504 505 /** 506 * Adds an assertion to all the received messages 507 * 508 * @return the assertion clause 509 */ 510 public AssertionClause allMessages() { 511 AssertionClause clause = new AssertionClause() { 512 public void run() { 513 List<Exchange> list = getReceivedExchanges(); 514 int index = 0; 515 for (Exchange exchange : list) { 516 applyAssertionOn(MockEndpoint.this, index++, exchange); 517 } 518 } 519 }; 520 expects(clause); 521 return clause; 522 } 523 524 /** 525 * Asserts that the given index of message is received (starting at zero) 526 */ 527 public Exchange assertExchangeReceived(int index) { 528 int count = getReceivedCounter(); 529 assertTrue("Not enough messages received. Was: " + count, count > index); 530 return getReceivedExchanges().get(index); 531 } 532 533 // Properties 534 // ------------------------------------------------------------------------- 535 public List<Throwable> getFailures() { 536 return failures; 537 } 538 539 public int getReceivedCounter() { 540 return getReceivedExchanges().size(); 541 } 542 543 public List<Exchange> getReceivedExchanges() { 544 return receivedExchanges; 545 } 546 547 public int getExpectedCount() { 548 return expectedCount; 549 } 550 551 public long getSleepForEmptyTest() { 552 return sleepForEmptyTest; 553 } 554 555 /** 556 * Allows a sleep to be specified to wait to check that this endpoint really 557 * is empty when {@link #expectedMessageCount(int)} is called with zero 558 * 559 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 560 * this endpoint really is empty 561 */ 562 public void setSleepForEmptyTest(long sleepForEmptyTest) { 563 this.sleepForEmptyTest = sleepForEmptyTest; 564 } 565 566 public long getResultWaitTime() { 567 return resultWaitTime; 568 } 569 570 /** 571 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 572 * wait on a latch until it is satisfied 573 */ 574 public void setResultWaitTime(long resultWaitTime) { 575 this.resultWaitTime = resultWaitTime; 576 } 577 578 /** 579 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will 580 * wait on a latch until it is satisfied 581 */ 582 public void setMinimumResultWaitTime(long resultMinimumWaitTime) { 583 this.resultMinimumWaitTime = resultMinimumWaitTime; 584 } 585 586 /** 587 * Specifies the expected number of message exchanges that should be 588 * received by this endpoint 589 * 590 * @param expectedCount the number of message exchanges that should be 591 * expected by this endpoint 592 */ 593 public void setExpectedMessageCount(int expectedCount) { 594 this.expectedCount = expectedCount; 595 if (expectedCount <= 0) { 596 latch = null; 597 } else { 598 latch = new CountDownLatch(expectedCount); 599 } 600 } 601 602 /** 603 * Specifies the minimum number of expected message exchanges that should be 604 * received by this endpoint 605 * 606 * @param expectedCount the number of message exchanges that should be 607 * expected by this endpoint 608 */ 609 public void setMinimumExpectedMessageCount(int expectedCount) { 610 this.expectedMinimumCount = expectedCount; 611 if (expectedCount <= 0) { 612 latch = null; 613 } else { 614 latch = new CountDownLatch(expectedMinimumCount); 615 } 616 } 617 618 public Processor getReporter() { 619 return reporter; 620 } 621 622 /** 623 * Allows a processor to added to the endpoint to report on progress of the test 624 */ 625 public void setReporter(Processor reporter) { 626 this.reporter = reporter; 627 } 628 629 // Implementation methods 630 // ------------------------------------------------------------------------- 631 private void init() { 632 expectedCount = -1; 633 counter = 0; 634 processors = new HashMap<Integer, Processor>(); 635 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 636 failures = new CopyOnWriteArrayList<Throwable>(); 637 tests = new CopyOnWriteArrayList<Runnable>(); 638 latch = null; 639 sleepForEmptyTest = 0; 640 resultWaitTime = 20000L; 641 resultMinimumWaitTime = 0L; 642 expectedMinimumCount = -1; 643 expectedBodyValues = null; 644 actualBodyValues = new ArrayList(); 645 } 646 647 protected synchronized void onExchange(Exchange exchange) { 648 try { 649 if (reporter != null) { 650 reporter.process(exchange); 651 } 652 653 performAssertions(exchange); 654 } catch (Throwable e) { 655 failures.add(e); 656 } 657 if (latch != null) { 658 latch.countDown(); 659 } 660 } 661 662 protected void performAssertions(Exchange exchange) throws Exception { 663 Message in = exchange.getIn(); 664 Object actualBody = in.getBody(); 665 666 if (headerName != null) { 667 actualHeader = in.getHeader(headerName); 668 } 669 670 if (expectedBodyValues != null) { 671 int index = actualBodyValues.size(); 672 if (expectedBodyValues.size() > index) { 673 Object expectedBody = expectedBodyValues.get(index); 674 if (expectedBody != null) { 675 actualBody = in.getBody(expectedBody.getClass()); 676 } 677 actualBodyValues.add(actualBody); 678 } 679 } 680 681 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody); 682 683 receivedExchanges.add(exchange); 684 685 Processor processor = processors.get(getReceivedCounter()) != null 686 ? processors.get(getReceivedCounter()) : defaultProcessor; 687 688 if (processor != null) { 689 processor.process(exchange); 690 } 691 } 692 693 protected void waitForCompleteLatch() throws InterruptedException { 694 if (latch == null) { 695 fail("Should have a latch!"); 696 } 697 698 // now lets wait for the results 699 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis"); 700 long start = System.currentTimeMillis(); 701 latch.await(resultWaitTime, TimeUnit.MILLISECONDS); 702 long delta = System.currentTimeMillis() - start; 703 LOG.debug("Took " + delta + " millis to complete latch"); 704 705 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) { 706 fail("Expected minimum " + resultWaitTime 707 + " millis waiting on the result, but was faster with " + delta + " millis."); 708 } 709 } 710 711 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 712 if (!ObjectHelper.equal(expectedValue, actualValue)) { 713 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 714 } 715 } 716 717 protected void assertTrue(String message, boolean predicate) { 718 if (!predicate) { 719 fail(message); 720 } 721 } 722 723 protected void fail(Object message) { 724 if (LOG.isDebugEnabled()) { 725 List<Exchange> list = getReceivedExchanges(); 726 int index = 0; 727 for (Exchange exchange : list) { 728 LOG.debug("Received[" + (++index) + "]: " + exchange); 729 } 730 } 731 throw new AssertionError(getEndpointUri() + " " + message); 732 } 733 734 public int getExpectedMinimumCount() { 735 return expectedMinimumCount; 736 } 737 738 public void await() throws InterruptedException { 739 if (latch != null) { 740 latch.await(); 741 } 742 } 743 744 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 745 if (latch != null) { 746 return latch.await(timeout, unit); 747 } 748 return true; 749 } 750 751 public boolean isSingleton() { 752 return true; 753 } 754 }