001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.beans.PropertyChangeListener; 020 import java.beans.PropertyChangeSupport; 021 import java.util.ArrayList; 022 import java.util.Arrays; 023 import java.util.Collection; 024 import java.util.HashMap; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.concurrent.CopyOnWriteArrayList; 028 import java.util.concurrent.CountDownLatch; 029 import java.util.concurrent.TimeUnit; 030 031 import org.apache.camel.CamelContext; 032 import org.apache.camel.Component; 033 import org.apache.camel.Consumer; 034 import org.apache.camel.Endpoint; 035 import org.apache.camel.Exchange; 036 import org.apache.camel.Expression; 037 import org.apache.camel.Message; 038 import org.apache.camel.Processor; 039 import org.apache.camel.Producer; 040 import org.apache.camel.impl.DefaultEndpoint; 041 import org.apache.camel.impl.DefaultProducer; 042 import org.apache.camel.spi.BrowsableEndpoint; 043 import org.apache.camel.util.ExpressionComparator; 044 import org.apache.camel.util.ObjectHelper; 045 import org.apache.commons.logging.Log; 046 import org.apache.commons.logging.LogFactory; 047 048 /** 049 * A Mock endpoint which provides a literate, fluent API for testing routes 050 * using a <a href="http://jmock.org/">JMock style</a> API. 051 * 052 * @version $Revision: 41895 $ 053 */ 054 public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> { 055 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class); 056 private int expectedCount; 057 private int counter; 058 private Processor defaultProcessor; 059 private Map<Integer, Processor> processors; 060 private List<Exchange> receivedExchanges; 061 private List<Throwable> failures; 062 private List<Runnable> tests; 063 private CountDownLatch latch; 064 private long sleepForEmptyTest; 065 private long resultWaitTime; 066 private int expectedMinimumCount; 067 private List expectedBodyValues; 068 private List actualBodyValues; 069 private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); 070 private String headerName; 071 private String headerValue; 072 private Object actualHeader; 073 private Processor reporter; 074 075 public MockEndpoint(String endpointUri, Component component) { 076 super(endpointUri, component); 077 init(); 078 } 079 080 public MockEndpoint(String endpointUri) { 081 super(endpointUri); 082 init(); 083 } 084 085 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 086 long start = System.currentTimeMillis(); 087 long left = unit.toMillis(timeout); 088 long end = start + left; 089 for (MockEndpoint endpoint : endpoints) { 090 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 091 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 092 } 093 left = end - System.currentTimeMillis(); 094 if (left <= 0) { 095 left = 0; 096 } 097 } 098 } 099 100 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 101 assertWait(timeout, unit, endpoints); 102 for (MockEndpoint endpoint : endpoints) { 103 endpoint.assertIsSatisfied(); 104 } 105 } 106 107 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 108 for (MockEndpoint endpoint : endpoints) { 109 endpoint.assertIsSatisfied(); 110 } 111 } 112 113 114 /** 115 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 116 * in the given context are valid 117 * 118 * @param context the camel context used to find all the available endpoints to be asserted 119 */ 120 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 121 ObjectHelper.notNull(context, "camelContext"); 122 Collection<Endpoint> endpoints = context.getSingletonEndpoints(); 123 for (Endpoint endpoint : endpoints) { 124 if (endpoint instanceof MockEndpoint) { 125 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 126 mockEndpoint.assertIsSatisfied(); 127 } 128 } 129 } 130 131 132 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 133 for (MockEndpoint endpoint : endpoints) { 134 MockEndpoint.expectsMessageCount(count); 135 } 136 } 137 138 public List<Exchange> getExchanges() { 139 return getReceivedExchanges(); 140 } 141 142 public void addPropertyChangeListener(PropertyChangeListener listener) { 143 propertyChangeSupport.addPropertyChangeListener(listener); 144 } 145 146 public void removePropertyChangeListener(PropertyChangeListener listener) { 147 propertyChangeSupport.removePropertyChangeListener(listener); 148 } 149 150 public Consumer<Exchange> createConsumer(Processor processor) throws Exception { 151 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 152 } 153 154 public Producer<Exchange> createProducer() throws Exception { 155 return new DefaultProducer<Exchange>(this) { 156 public void process(Exchange exchange) { 157 onExchange(exchange); 158 } 159 }; 160 } 161 162 public void reset() { 163 init(); 164 } 165 166 167 // Testing API 168 // ------------------------------------------------------------------------- 169 170 /** 171 * Set the processor that will be invoked when the index 172 * message is received. 173 * 174 * @param index 175 * @param processor 176 */ 177 public void whenExchangeReceived(int index, Processor processor) { 178 this.processors.put(index, processor); 179 } 180 181 /** 182 * Set the processor that will be invoked when the some message 183 * is received. 184 * 185 * This processor could be overwritten by 186 * {@link #whenExchangeReceived(int, Processor)} method. 187 * 188 * @param processor 189 */ 190 public void whenAnyExchangeReceived(Processor processor) { 191 this.defaultProcessor = processor; 192 } 193 194 /** 195 * Validates that all the available expectations on this endpoint are 196 * satisfied; or throw an exception 197 */ 198 public void assertIsSatisfied() throws InterruptedException { 199 assertIsSatisfied(sleepForEmptyTest); 200 } 201 202 /** 203 * Validates that all the available expectations on this endpoint are 204 * satisfied; or throw an exception 205 * 206 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 207 * should wait for the test to be true 208 */ 209 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 210 LOG.info("Asserting: " + this + " is satisfied"); 211 if (expectedCount >= 0) { 212 if (expectedCount != getReceivedCounter()) { 213 if (expectedCount == 0) { 214 // lets wait a little bit just in case 215 if (timeoutForEmptyEndpoints > 0) { 216 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 217 Thread.sleep(timeoutForEmptyEndpoints); 218 } 219 } else { 220 waitForCompleteLatch(); 221 } 222 } 223 assertEquals("Received message count", expectedCount, getReceivedCounter()); 224 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 225 waitForCompleteLatch(); 226 } 227 228 if (expectedMinimumCount >= 0) { 229 int receivedCounter = getReceivedCounter(); 230 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter); 231 } 232 233 for (Runnable test : tests) { 234 test.run(); 235 } 236 237 for (Throwable failure : failures) { 238 if (failure != null) { 239 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 240 fail("Failed due to caught exception: " + failure); 241 } 242 } 243 } 244 245 /** 246 * Validates that the assertions fail on this endpoint 247 */ 248 public void assertIsNotSatisfied() throws InterruptedException { 249 try { 250 assertIsSatisfied(); 251 fail("Expected assertion failure!"); 252 } catch (AssertionError e) { 253 LOG.info("Caught expected failure: " + e); 254 } 255 } 256 257 /** 258 * Specifies the expected number of message exchanges that should be 259 * received by this endpoint 260 * 261 * @param expectedCount the number of message exchanges that should be 262 * expected by this endpoint 263 */ 264 public void expectedMessageCount(int expectedCount) { 265 setExpectedMessageCount(expectedCount); 266 } 267 268 /** 269 * Specifies the minimum number of expected message exchanges that should be 270 * received by this endpoint 271 * 272 * @param expectedCount the number of message exchanges that should be 273 * expected by this endpoint 274 */ 275 public void expectedMinimumMessageCount(int expectedCount) { 276 setMinimumExpectedMessageCount(expectedCount); 277 } 278 279 /** 280 * Adds an expectation that the given header name & value are received by this 281 * endpoint 282 */ 283 public void expectedHeaderReceived(String name, String value) { 284 this.headerName = name; 285 this.headerValue = value; 286 287 expects(new Runnable() { 288 public void run() { 289 assertTrue("No header with name " + headerName + " found.", actualHeader != null); 290 291 assertEquals("Header of message", headerValue, actualHeader); 292 } 293 }); 294 } 295 296 /** 297 * Adds an expectation that the given body values are received by this 298 * endpoint 299 */ 300 public void expectedBodiesReceived(final List bodies) { 301 expectedMessageCount(bodies.size()); 302 this.expectedBodyValues = bodies; 303 this.actualBodyValues = new ArrayList(); 304 305 expects(new Runnable() { 306 public void run() { 307 for (int i = 0; i < expectedBodyValues.size(); i++) { 308 Exchange exchange = getReceivedExchanges().get(i); 309 assertTrue("No exchange received for counter: " + i, exchange != null); 310 311 Object expectedBody = expectedBodyValues.get(i); 312 Object actualBody = actualBodyValues.get(i); 313 314 assertEquals("Body of message: " + i, expectedBody, actualBody); 315 } 316 } 317 }); 318 } 319 320 /** 321 * Adds an expectation that the given body values are received by this 322 * endpoint 323 */ 324 public void expectedBodiesReceived(Object... bodies) { 325 List bodyList = new ArrayList(); 326 bodyList.addAll(Arrays.asList(bodies)); 327 expectedBodiesReceived(bodyList); 328 } 329 330 /** 331 * Adds an expectation that messages received should have ascending values 332 * of the given expression such as a user generated counter value 333 * 334 * @param expression 335 */ 336 public void expectsAscending(final Expression<Exchange> expression) { 337 expects(new Runnable() { 338 public void run() { 339 assertMessagesAscending(expression); 340 } 341 }); 342 } 343 344 /** 345 * Adds an expectation that messages received should have descending values 346 * of the given expression such as a user generated counter value 347 * 348 * @param expression 349 */ 350 public void expectsDescending(final Expression<Exchange> expression) { 351 expects(new Runnable() { 352 public void run() { 353 assertMessagesDescending(expression); 354 } 355 }); 356 } 357 358 /** 359 * Adds an expectation that no duplicate messages should be received using 360 * the expression to determine the message ID 361 * 362 * @param expression the expression used to create a unique message ID for 363 * message comparison (which could just be the message 364 * payload if the payload can be tested for uniqueness using 365 * {@link Object#equals(Object)} and 366 * {@link Object#hashCode()} 367 */ 368 public void expectsNoDuplicates(final Expression<Exchange> expression) { 369 expects(new Runnable() { 370 public void run() { 371 assertNoDuplicates(expression); 372 } 373 }); 374 } 375 376 /** 377 * Asserts that the messages have ascending values of the given expression 378 */ 379 public void assertMessagesAscending(Expression<Exchange> expression) { 380 assertMessagesSorted(expression, true); 381 } 382 383 /** 384 * Asserts that the messages have descending values of the given expression 385 */ 386 public void assertMessagesDescending(Expression<Exchange> expression) { 387 assertMessagesSorted(expression, false); 388 } 389 390 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) { 391 String type = ascending ? "ascending" : "descending"; 392 ExpressionComparator comparator = new ExpressionComparator(expression); 393 List<Exchange> list = getReceivedExchanges(); 394 for (int i = 1; i < list.size(); i++) { 395 int j = i - 1; 396 Exchange e1 = list.get(j); 397 Exchange e2 = list.get(i); 398 int result = comparator.compare(e1, e2); 399 if (result == 0) { 400 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and " 401 + e2); 402 } else { 403 if (!ascending) { 404 result = result * -1; 405 } 406 if (result > 0) { 407 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: " 408 + expression + ". Exchanges: " + e1 + " and " + e2); 409 } 410 } 411 } 412 } 413 414 public void assertNoDuplicates(Expression<Exchange> expression) { 415 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 416 List<Exchange> list = getReceivedExchanges(); 417 for (int i = 0; i < list.size(); i++) { 418 Exchange e2 = list.get(i); 419 Object key = expression.evaluate(e2); 420 Exchange e1 = map.get(key); 421 if (e1 != null) { 422 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 423 } else { 424 map.put(key, e2); 425 } 426 } 427 } 428 429 /** 430 * Adds the expection which will be invoked when enough messages are 431 * received 432 */ 433 public void expects(Runnable runnable) { 434 tests.add(runnable); 435 } 436 437 /** 438 * Adds an assertion to the given message index 439 * 440 * @param messageIndex the number of the message 441 * @return the assertion clause 442 */ 443 public AssertionClause message(final int messageIndex) { 444 AssertionClause clause = new AssertionClause() { 445 public void run() { 446 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 447 } 448 }; 449 expects(clause); 450 return clause; 451 } 452 453 /** 454 * Adds an assertion to all the received messages 455 * 456 * @return the assertion clause 457 */ 458 public AssertionClause allMessages() { 459 AssertionClause clause = new AssertionClause() { 460 public void run() { 461 List<Exchange> list = getReceivedExchanges(); 462 int index = 0; 463 for (Exchange exchange : list) { 464 applyAssertionOn(MockEndpoint.this, index++, exchange); 465 } 466 } 467 }; 468 expects(clause); 469 return clause; 470 } 471 472 /** 473 * Asserts that the given index of message is received (starting at zero) 474 */ 475 public Exchange assertExchangeReceived(int index) { 476 int count = getReceivedCounter(); 477 assertTrue("Not enough messages received. Was: " + count, count > index); 478 return getReceivedExchanges().get(index); 479 } 480 481 // Properties 482 // ------------------------------------------------------------------------- 483 public List<Throwable> getFailures() { 484 return failures; 485 } 486 487 public int getReceivedCounter() { 488 return getReceivedExchanges().size(); 489 } 490 491 public List<Exchange> getReceivedExchanges() { 492 return receivedExchanges; 493 } 494 495 public int getExpectedCount() { 496 return expectedCount; 497 } 498 499 public long getSleepForEmptyTest() { 500 return sleepForEmptyTest; 501 } 502 503 /** 504 * Allows a sleep to be specified to wait to check that this endpoint really 505 * is empty when {@link #expectedMessageCount(int)} is called with zero 506 * 507 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 508 * this endpoint really is empty 509 */ 510 public void setSleepForEmptyTest(long sleepForEmptyTest) { 511 this.sleepForEmptyTest = sleepForEmptyTest; 512 } 513 514 public long getResultWaitTime() { 515 return resultWaitTime; 516 } 517 518 /** 519 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 520 * wait on a latch until it is satisfied 521 */ 522 public void setResultWaitTime(long resultWaitTime) { 523 this.resultWaitTime = resultWaitTime; 524 } 525 526 /** 527 * Specifies the expected number of message exchanges that should be 528 * received by this endpoint 529 * 530 * @param expectedCount the number of message exchanges that should be 531 * expected by this endpoint 532 */ 533 public void setExpectedMessageCount(int expectedCount) { 534 this.expectedCount = expectedCount; 535 if (expectedCount <= 0) { 536 latch = null; 537 } else { 538 latch = new CountDownLatch(expectedCount); 539 } 540 } 541 542 /** 543 * Specifies the minimum number of expected message exchanges that should be 544 * received by this endpoint 545 * 546 * @param expectedCount the number of message exchanges that should be 547 * expected by this endpoint 548 */ 549 public void setMinimumExpectedMessageCount(int expectedCount) { 550 this.expectedMinimumCount = expectedCount; 551 if (expectedCount <= 0) { 552 latch = null; 553 } else { 554 latch = new CountDownLatch(expectedMinimumCount); 555 } 556 } 557 558 public Processor getReporter() { 559 return reporter; 560 } 561 562 /** 563 * Allows a processor to added to the endpoint to report on progress of the test 564 */ 565 public void setReporter(Processor reporter) { 566 this.reporter = reporter; 567 } 568 569 // Implementation methods 570 // ------------------------------------------------------------------------- 571 private void init() { 572 expectedCount = -1; 573 counter = 0; 574 processors = new HashMap<Integer, Processor>(); 575 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 576 failures = new CopyOnWriteArrayList<Throwable>(); 577 tests = new CopyOnWriteArrayList<Runnable>(); 578 latch = null; 579 sleepForEmptyTest = 1000L; 580 resultWaitTime = 20000L; 581 expectedMinimumCount = -1; 582 expectedBodyValues = null; 583 actualBodyValues = new ArrayList(); 584 } 585 586 protected synchronized void onExchange(Exchange exchange) { 587 try { 588 if (reporter != null) { 589 reporter.process(exchange); 590 } 591 592 performAssertions(exchange); 593 } catch (Throwable e) { 594 failures.add(e); 595 } 596 if (latch != null) { 597 latch.countDown(); 598 } 599 } 600 601 protected void performAssertions(Exchange exchange) throws Exception { 602 Message in = exchange.getIn(); 603 Object actualBody = in.getBody(); 604 605 if (headerName != null) { 606 actualHeader = in.getHeader(headerName); 607 } 608 609 if (expectedBodyValues != null) { 610 int index = actualBodyValues.size(); 611 if (expectedBodyValues.size() > index) { 612 Object expectedBody = expectedBodyValues.get(index); 613 if (expectedBody != null) { 614 actualBody = in.getBody(expectedBody.getClass()); 615 } 616 actualBodyValues.add(actualBody); 617 } 618 } 619 620 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody); 621 622 receivedExchanges.add(exchange); 623 624 Processor processor = processors.get(getReceivedCounter()) != null 625 ? processors.get(getReceivedCounter()) : defaultProcessor; 626 627 if (processor != null) { 628 processor.process(exchange); 629 } 630 } 631 632 protected void waitForCompleteLatch() throws InterruptedException { 633 if (latch == null) { 634 fail("Should have a latch!"); 635 } 636 637 // now lets wait for the results 638 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis"); 639 latch.await(resultWaitTime, TimeUnit.MILLISECONDS); 640 } 641 642 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 643 if (!ObjectHelper.equal(expectedValue, actualValue)) { 644 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 645 } 646 } 647 648 protected void assertTrue(String message, boolean predicate) { 649 if (!predicate) { 650 fail(message); 651 } 652 } 653 654 protected void fail(Object message) { 655 if (LOG.isDebugEnabled()) { 656 List<Exchange> list = getReceivedExchanges(); 657 int index = 0; 658 for (Exchange exchange : list) { 659 LOG.debug("Received[" + (++index) + "]: " + exchange); 660 } 661 } 662 throw new AssertionError(getEndpointUri() + " " + message); 663 } 664 665 public int getExpectedMinimumCount() { 666 return expectedMinimumCount; 667 } 668 669 public void await() throws InterruptedException { 670 if (latch != null) { 671 latch.await(); 672 } 673 } 674 675 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 676 if (latch != null) { 677 return latch.await(timeout, unit); 678 } 679 return true; 680 } 681 682 public boolean isSingleton() { 683 return true; 684 } 685 }