001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.beans.PropertyChangeListener; 020 import java.beans.PropertyChangeSupport; 021 import java.util.ArrayList; 022 import java.util.Arrays; 023 import java.util.Collection; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 import java.util.concurrent.CopyOnWriteArrayList; 030 import java.util.concurrent.CountDownLatch; 031 import java.util.concurrent.TimeUnit; 032 033 import org.apache.camel.CamelContext; 034 import org.apache.camel.Component; 035 import org.apache.camel.Consumer; 036 import org.apache.camel.Endpoint; 037 import org.apache.camel.Exchange; 038 import org.apache.camel.Expression; 039 import org.apache.camel.Message; 040 import org.apache.camel.Processor; 041 import org.apache.camel.Producer; 042 import org.apache.camel.impl.DefaultEndpoint; 043 import org.apache.camel.impl.DefaultProducer; 044 import org.apache.camel.spi.BrowsableEndpoint; 045 import org.apache.camel.util.CamelContextHelper; 046 import org.apache.camel.util.ExpressionComparator; 047 import org.apache.camel.util.ObjectHelper; 048 import org.apache.commons.logging.Log; 049 import org.apache.commons.logging.LogFactory; 050 051 /** 052 * A Mock endpoint which provides a literate, fluent API for testing routes 053 * using a <a href="http://jmock.org/">JMock style</a> API. 054 * 055 * @version $Revision: 64833 $ 056 */ 057 public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> { 058 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class); 059 private int expectedCount; 060 private int counter; 061 private Processor defaultProcessor; 062 private Map<Integer, Processor> processors; 063 private List<Exchange> receivedExchanges; 064 private List<Throwable> failures; 065 private List<Runnable> tests; 066 private CountDownLatch latch; 067 private long sleepForEmptyTest; 068 private long resultWaitTime; 069 private long resultMinimumWaitTime; 070 private int expectedMinimumCount; 071 private List expectedBodyValues; 072 private List actualBodyValues; 073 private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); 074 private String headerName; 075 private String headerValue; 076 private Object actualHeader; 077 private Processor reporter; 078 079 public MockEndpoint(String endpointUri, Component component) { 080 super(endpointUri, component); 081 init(); 082 } 083 084 public MockEndpoint(String endpointUri) { 085 super(endpointUri); 086 init(); 087 } 088 089 090 /** 091 * A helper method to resolve the mock endpoint of the given URI on the given context 092 * 093 * @param context the camel context to try resolve the mock endpoint from 094 * @param uri the uri of the endpoint to resolve 095 * @return the endpoint 096 */ 097 public static MockEndpoint resolve(CamelContext context, String uri) { 098 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class); 099 } 100 101 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 102 long start = System.currentTimeMillis(); 103 long left = unit.toMillis(timeout); 104 long end = start + left; 105 for (MockEndpoint endpoint : endpoints) { 106 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 107 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 108 } 109 left = end - System.currentTimeMillis(); 110 if (left <= 0) { 111 left = 0; 112 } 113 } 114 } 115 116 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 117 assertWait(timeout, unit, endpoints); 118 for (MockEndpoint endpoint : endpoints) { 119 endpoint.assertIsSatisfied(); 120 } 121 } 122 123 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 124 for (MockEndpoint endpoint : endpoints) { 125 endpoint.assertIsSatisfied(); 126 } 127 } 128 129 130 /** 131 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 132 * in the given context are valid 133 * 134 * @param context the camel context used to find all the available endpoints to be asserted 135 */ 136 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 137 ObjectHelper.notNull(context, "camelContext"); 138 Collection<Endpoint> endpoints = context.getSingletonEndpoints(); 139 for (Endpoint endpoint : endpoints) { 140 if (endpoint instanceof MockEndpoint) { 141 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 142 mockEndpoint.assertIsSatisfied(); 143 } 144 } 145 } 146 147 148 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 149 for (MockEndpoint endpoint : endpoints) { 150 MockEndpoint.expectsMessageCount(count); 151 } 152 } 153 154 public List<Exchange> getExchanges() { 155 return getReceivedExchanges(); 156 } 157 158 public void addPropertyChangeListener(PropertyChangeListener listener) { 159 propertyChangeSupport.addPropertyChangeListener(listener); 160 } 161 162 public void removePropertyChangeListener(PropertyChangeListener listener) { 163 propertyChangeSupport.removePropertyChangeListener(listener); 164 } 165 166 public Consumer<Exchange> createConsumer(Processor processor) throws Exception { 167 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 168 } 169 170 public Producer<Exchange> createProducer() throws Exception { 171 return new DefaultProducer<Exchange>(this) { 172 public void process(Exchange exchange) { 173 onExchange(exchange); 174 } 175 }; 176 } 177 178 public void reset() { 179 init(); 180 } 181 182 183 // Testing API 184 // ------------------------------------------------------------------------- 185 186 /** 187 * Set the processor that will be invoked when the index 188 * message is received. 189 * 190 * @param index 191 * @param processor 192 */ 193 public void whenExchangeReceived(int index, Processor processor) { 194 this.processors.put(index, processor); 195 } 196 197 /** 198 * Set the processor that will be invoked when the some message 199 * is received. 200 * 201 * This processor could be overwritten by 202 * {@link #whenExchangeReceived(int, Processor)} method. 203 * 204 * @param processor 205 */ 206 public void whenAnyExchangeReceived(Processor processor) { 207 this.defaultProcessor = processor; 208 } 209 210 /** 211 * Validates that all the available expectations on this endpoint are 212 * satisfied; or throw an exception 213 */ 214 public void assertIsSatisfied() throws InterruptedException { 215 assertIsSatisfied(sleepForEmptyTest); 216 } 217 218 /** 219 * Validates that all the available expectations on this endpoint are 220 * satisfied; or throw an exception 221 * 222 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 223 * should wait for the test to be true 224 */ 225 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 226 LOG.info("Asserting: " + this + " is satisfied"); 227 if (expectedCount == 0) { 228 if (timeoutForEmptyEndpoints > 0) { 229 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 230 Thread.sleep(timeoutForEmptyEndpoints); 231 } 232 assertEquals("Received message count", expectedCount, getReceivedCounter()); 233 } else if (expectedCount > 0) { 234 if (expectedCount != getReceivedCounter()) { 235 waitForCompleteLatch(); 236 } 237 assertEquals("Received message count", expectedCount, getReceivedCounter()); 238 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 239 waitForCompleteLatch(); 240 } 241 242 if (expectedMinimumCount >= 0) { 243 int receivedCounter = getReceivedCounter(); 244 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter); 245 } 246 247 for (Runnable test : tests) { 248 test.run(); 249 } 250 251 for (Throwable failure : failures) { 252 if (failure != null) { 253 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 254 fail("Failed due to caught exception: " + failure); 255 } 256 } 257 } 258 259 /** 260 * Validates that the assertions fail on this endpoint 261 */ 262 public void assertIsNotSatisfied() throws InterruptedException { 263 try { 264 assertIsSatisfied(); 265 fail("Expected assertion failure!"); 266 } catch (AssertionError e) { 267 LOG.info("Caught expected failure: " + e); 268 } 269 } 270 271 /** 272 * Specifies the expected number of message exchanges that should be 273 * received by this endpoint 274 * 275 * @param expectedCount the number of message exchanges that should be 276 * expected by this endpoint 277 */ 278 public void expectedMessageCount(int expectedCount) { 279 setExpectedMessageCount(expectedCount); 280 } 281 282 /** 283 * Specifies the minimum number of expected message exchanges that should be 284 * received by this endpoint 285 * 286 * @param expectedCount the number of message exchanges that should be 287 * expected by this endpoint 288 */ 289 public void expectedMinimumMessageCount(int expectedCount) { 290 setMinimumExpectedMessageCount(expectedCount); 291 } 292 293 /** 294 * Adds an expectation that the given header name & value are received by this 295 * endpoint 296 */ 297 public void expectedHeaderReceived(String name, String value) { 298 this.headerName = name; 299 this.headerValue = value; 300 301 expects(new Runnable() { 302 public void run() { 303 assertTrue("No header with name " + headerName + " found.", actualHeader != null); 304 305 assertEquals("Header of message", headerValue, actualHeader); 306 } 307 }); 308 } 309 310 /** 311 * Adds an expectation that the given body values are received by this 312 * endpoint in the specified order 313 */ 314 public void expectedBodiesReceived(final List bodies) { 315 expectedMessageCount(bodies.size()); 316 this.expectedBodyValues = bodies; 317 this.actualBodyValues = new ArrayList(); 318 319 expects(new Runnable() { 320 public void run() { 321 for (int i = 0; i < expectedBodyValues.size(); i++) { 322 Exchange exchange = getReceivedExchanges().get(i); 323 assertTrue("No exchange received for counter: " + i, exchange != null); 324 325 Object expectedBody = expectedBodyValues.get(i); 326 Object actualBody = actualBodyValues.get(i); 327 328 assertEquals("Body of message: " + i, expectedBody, actualBody); 329 } 330 } 331 }); 332 } 333 334 /** 335 * Adds an expectation that the given body values are received by this 336 * endpoint 337 */ 338 public void expectedBodiesReceived(Object... bodies) { 339 List bodyList = new ArrayList(); 340 bodyList.addAll(Arrays.asList(bodies)); 341 expectedBodiesReceived(bodyList); 342 } 343 344 /** 345 * Adds an expectation that the given body values are received by this 346 * endpoint in any order 347 */ 348 public void expectedBodiesReceivedInAnyOrder(final List bodies) { 349 expectedMessageCount(bodies.size()); 350 this.expectedBodyValues = bodies; 351 this.actualBodyValues = new ArrayList(); 352 353 expects(new Runnable() { 354 public void run() { 355 Set actualBodyValuesSet = new HashSet(actualBodyValues); 356 for (int i = 0; i < expectedBodyValues.size(); i++) { 357 Exchange exchange = getReceivedExchanges().get(i); 358 assertTrue("No exchange received for counter: " + i, exchange != null); 359 360 Object expectedBody = expectedBodyValues.get(i); 361 assertTrue("Message with body " + expectedBody 362 + " was expected but not found in " + actualBodyValuesSet, 363 actualBodyValuesSet.remove(expectedBody)); 364 } 365 } 366 }); 367 } 368 369 /** 370 * Adds an expectation that the given body values are received by this 371 * endpoint in any order 372 */ 373 public void expectedBodiesReceivedInAnyOrder(Object... bodies) { 374 List bodyList = new ArrayList(); 375 bodyList.addAll(Arrays.asList(bodies)); 376 expectedBodiesReceivedInAnyOrder(bodyList); 377 } 378 379 /** 380 * Adds an expectation that messages received should have ascending values 381 * of the given expression such as a user generated counter value 382 * 383 * @param expression 384 */ 385 public void expectsAscending(final Expression<Exchange> expression) { 386 expects(new Runnable() { 387 public void run() { 388 assertMessagesAscending(expression); 389 } 390 }); 391 } 392 393 /** 394 * Adds an expectation that messages received should have descending values 395 * of the given expression such as a user generated counter value 396 * 397 * @param expression 398 */ 399 public void expectsDescending(final Expression<Exchange> expression) { 400 expects(new Runnable() { 401 public void run() { 402 assertMessagesDescending(expression); 403 } 404 }); 405 } 406 407 /** 408 * Adds an expectation that no duplicate messages should be received using 409 * the expression to determine the message ID 410 * 411 * @param expression the expression used to create a unique message ID for 412 * message comparison (which could just be the message 413 * payload if the payload can be tested for uniqueness using 414 * {@link Object#equals(Object)} and 415 * {@link Object#hashCode()} 416 */ 417 public void expectsNoDuplicates(final Expression<Exchange> expression) { 418 expects(new Runnable() { 419 public void run() { 420 assertNoDuplicates(expression); 421 } 422 }); 423 } 424 425 /** 426 * Asserts that the messages have ascending values of the given expression 427 */ 428 public void assertMessagesAscending(Expression<Exchange> expression) { 429 assertMessagesSorted(expression, true); 430 } 431 432 /** 433 * Asserts that the messages have descending values of the given expression 434 */ 435 public void assertMessagesDescending(Expression<Exchange> expression) { 436 assertMessagesSorted(expression, false); 437 } 438 439 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) { 440 String type = ascending ? "ascending" : "descending"; 441 ExpressionComparator comparator = new ExpressionComparator(expression); 442 List<Exchange> list = getReceivedExchanges(); 443 for (int i = 1; i < list.size(); i++) { 444 int j = i - 1; 445 Exchange e1 = list.get(j); 446 Exchange e2 = list.get(i); 447 int result = comparator.compare(e1, e2); 448 if (result == 0) { 449 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and " 450 + e2); 451 } else { 452 if (!ascending) { 453 result = result * -1; 454 } 455 if (result > 0) { 456 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: " 457 + expression + ". Exchanges: " + e1 + " and " + e2); 458 } 459 } 460 } 461 } 462 463 public void assertNoDuplicates(Expression<Exchange> expression) { 464 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 465 List<Exchange> list = getReceivedExchanges(); 466 for (int i = 0; i < list.size(); i++) { 467 Exchange e2 = list.get(i); 468 Object key = expression.evaluate(e2); 469 Exchange e1 = map.get(key); 470 if (e1 != null) { 471 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 472 } else { 473 map.put(key, e2); 474 } 475 } 476 } 477 478 /** 479 * Adds the expectation which will be invoked when enough messages are 480 * received 481 */ 482 public void expects(Runnable runnable) { 483 tests.add(runnable); 484 } 485 486 /** 487 * Adds an assertion to the given message index 488 * 489 * @param messageIndex the number of the message 490 * @return the assertion clause 491 */ 492 public AssertionClause message(final int messageIndex) { 493 AssertionClause clause = new AssertionClause() { 494 public void run() { 495 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 496 } 497 }; 498 expects(clause); 499 return clause; 500 } 501 502 /** 503 * Adds an assertion to all the received messages 504 * 505 * @return the assertion clause 506 */ 507 public AssertionClause allMessages() { 508 AssertionClause clause = new AssertionClause() { 509 public void run() { 510 List<Exchange> list = getReceivedExchanges(); 511 int index = 0; 512 for (Exchange exchange : list) { 513 applyAssertionOn(MockEndpoint.this, index++, exchange); 514 } 515 } 516 }; 517 expects(clause); 518 return clause; 519 } 520 521 /** 522 * Asserts that the given index of message is received (starting at zero) 523 */ 524 public Exchange assertExchangeReceived(int index) { 525 int count = getReceivedCounter(); 526 assertTrue("Not enough messages received. Was: " + count, count > index); 527 return getReceivedExchanges().get(index); 528 } 529 530 // Properties 531 // ------------------------------------------------------------------------- 532 public List<Throwable> getFailures() { 533 return failures; 534 } 535 536 public int getReceivedCounter() { 537 return getReceivedExchanges().size(); 538 } 539 540 public List<Exchange> getReceivedExchanges() { 541 return receivedExchanges; 542 } 543 544 public int getExpectedCount() { 545 return expectedCount; 546 } 547 548 public long getSleepForEmptyTest() { 549 return sleepForEmptyTest; 550 } 551 552 /** 553 * Allows a sleep to be specified to wait to check that this endpoint really 554 * is empty when {@link #expectedMessageCount(int)} is called with zero 555 * 556 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 557 * this endpoint really is empty 558 */ 559 public void setSleepForEmptyTest(long sleepForEmptyTest) { 560 this.sleepForEmptyTest = sleepForEmptyTest; 561 } 562 563 public long getResultWaitTime() { 564 return resultWaitTime; 565 } 566 567 /** 568 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 569 * wait on a latch until it is satisfied 570 */ 571 public void setResultWaitTime(long resultWaitTime) { 572 this.resultWaitTime = resultWaitTime; 573 } 574 575 /** 576 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will 577 * wait on a latch until it is satisfied 578 */ 579 public void setMinimumResultWaitTime(long resultMinimumWaitTime) { 580 this.resultMinimumWaitTime = resultMinimumWaitTime; 581 } 582 583 /** 584 * Specifies the expected number of message exchanges that should be 585 * received by this endpoint 586 * 587 * @param expectedCount the number of message exchanges that should be 588 * expected by this endpoint 589 */ 590 public void setExpectedMessageCount(int expectedCount) { 591 this.expectedCount = expectedCount; 592 if (expectedCount <= 0) { 593 latch = null; 594 } else { 595 latch = new CountDownLatch(expectedCount); 596 } 597 } 598 599 /** 600 * Specifies the minimum number of expected message exchanges that should be 601 * received by this endpoint 602 * 603 * @param expectedCount the number of message exchanges that should be 604 * expected by this endpoint 605 */ 606 public void setMinimumExpectedMessageCount(int expectedCount) { 607 this.expectedMinimumCount = expectedCount; 608 if (expectedCount <= 0) { 609 latch = null; 610 } else { 611 latch = new CountDownLatch(expectedMinimumCount); 612 } 613 } 614 615 public Processor getReporter() { 616 return reporter; 617 } 618 619 /** 620 * Allows a processor to added to the endpoint to report on progress of the test 621 */ 622 public void setReporter(Processor reporter) { 623 this.reporter = reporter; 624 } 625 626 // Implementation methods 627 // ------------------------------------------------------------------------- 628 private void init() { 629 expectedCount = -1; 630 counter = 0; 631 processors = new HashMap<Integer, Processor>(); 632 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 633 failures = new CopyOnWriteArrayList<Throwable>(); 634 tests = new CopyOnWriteArrayList<Runnable>(); 635 latch = null; 636 sleepForEmptyTest = 0; 637 resultWaitTime = 20000L; 638 resultMinimumWaitTime = 0L; 639 expectedMinimumCount = -1; 640 expectedBodyValues = null; 641 actualBodyValues = new ArrayList(); 642 } 643 644 protected synchronized void onExchange(Exchange exchange) { 645 try { 646 if (reporter != null) { 647 reporter.process(exchange); 648 } 649 650 performAssertions(exchange); 651 } catch (Throwable e) { 652 failures.add(e); 653 } 654 if (latch != null) { 655 latch.countDown(); 656 } 657 } 658 659 protected void performAssertions(Exchange exchange) throws Exception { 660 Message in = exchange.getIn(); 661 Object actualBody = in.getBody(); 662 663 if (headerName != null) { 664 actualHeader = in.getHeader(headerName); 665 } 666 667 if (expectedBodyValues != null) { 668 int index = actualBodyValues.size(); 669 if (expectedBodyValues.size() > index) { 670 Object expectedBody = expectedBodyValues.get(index); 671 if (expectedBody != null) { 672 actualBody = in.getBody(expectedBody.getClass()); 673 } 674 actualBodyValues.add(actualBody); 675 } 676 } 677 678 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody); 679 680 receivedExchanges.add(exchange); 681 682 Processor processor = processors.get(getReceivedCounter()) != null 683 ? processors.get(getReceivedCounter()) : defaultProcessor; 684 685 if (processor != null) { 686 processor.process(exchange); 687 } 688 } 689 690 protected void waitForCompleteLatch() throws InterruptedException { 691 if (latch == null) { 692 fail("Should have a latch!"); 693 } 694 695 // now lets wait for the results 696 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis"); 697 long start = System.currentTimeMillis(); 698 latch.await(resultWaitTime, TimeUnit.MILLISECONDS); 699 long delta = System.currentTimeMillis() - start; 700 LOG.debug("Took " + delta + " millis to complete latch"); 701 702 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) { 703 fail("Expected minimum " + resultWaitTime 704 + " millis waiting on the result, but was faster with " + delta + " millis."); 705 } 706 } 707 708 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 709 if (!ObjectHelper.equal(expectedValue, actualValue)) { 710 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 711 } 712 } 713 714 protected void assertTrue(String message, boolean predicate) { 715 if (!predicate) { 716 fail(message); 717 } 718 } 719 720 protected void fail(Object message) { 721 if (LOG.isDebugEnabled()) { 722 List<Exchange> list = getReceivedExchanges(); 723 int index = 0; 724 for (Exchange exchange : list) { 725 LOG.debug("Received[" + (++index) + "]: " + exchange); 726 } 727 } 728 throw new AssertionError(getEndpointUri() + " " + message); 729 } 730 731 public int getExpectedMinimumCount() { 732 return expectedMinimumCount; 733 } 734 735 public void await() throws InterruptedException { 736 if (latch != null) { 737 latch.await(); 738 } 739 } 740 741 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 742 if (latch != null) { 743 return latch.await(timeout, unit); 744 } 745 return true; 746 } 747 748 public boolean isSingleton() { 749 return true; 750 } 751 }