001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.beans.PropertyChangeListener; 020 import java.beans.PropertyChangeSupport; 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.concurrent.CopyOnWriteArrayList; 027 import java.util.concurrent.CountDownLatch; 028 import java.util.concurrent.TimeUnit; 029 030 import org.apache.camel.CamelContext; 031 import org.apache.camel.Component; 032 import org.apache.camel.Consumer; 033 import org.apache.camel.Endpoint; 034 import org.apache.camel.Exchange; 035 import org.apache.camel.Expression; 036 import org.apache.camel.Message; 037 import org.apache.camel.Processor; 038 import org.apache.camel.Producer; 039 import org.apache.camel.impl.DefaultEndpoint; 040 import org.apache.camel.impl.DefaultProducer; 041 import org.apache.camel.spi.BrowsableEndpoint; 042 import org.apache.camel.util.ExpressionComparator; 043 import org.apache.camel.util.ObjectHelper; 044 import org.apache.commons.logging.Log; 045 import org.apache.commons.logging.LogFactory; 046 047 /** 048 * A Mock endpoint which provides a literate, fluent API for testing routes 049 * using a <a href="http://jmock.org/">JMock style</a> API. 050 * 051 * @version $Revision: 41278 $ 052 */ 053 public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> { 054 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class); 055 private int expectedCount; 056 private int counter; 057 private Processor defaultProcessor; 058 private Map<Integer, Processor> processors; 059 private List<Exchange> receivedExchanges; 060 private List<Throwable> failures; 061 private List<Runnable> tests; 062 private CountDownLatch latch; 063 private long sleepForEmptyTest; 064 private long resultWaitTime; 065 private int expectedMinimumCount; 066 private List expectedBodyValues; 067 private List actualBodyValues; 068 private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); 069 private String headerName; 070 private String headerValue; 071 private Object actualHeader; 072 private Processor reporter; 073 074 public MockEndpoint(String endpointUri, Component component) { 075 super(endpointUri, component); 076 init(); 077 } 078 079 public MockEndpoint(String endpointUri) { 080 super(endpointUri); 081 init(); 082 } 083 084 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 085 long start = System.currentTimeMillis(); 086 long left = unit.toMillis(timeout); 087 long end = start + left; 088 for (MockEndpoint endpoint : endpoints) { 089 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 090 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 091 } 092 left = end - System.currentTimeMillis(); 093 if (left <= 0) { 094 left = 0; 095 } 096 } 097 } 098 099 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 100 assertWait(timeout, unit, endpoints); 101 for (MockEndpoint endpoint : endpoints) { 102 endpoint.assertIsSatisfied(); 103 } 104 } 105 106 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 107 for (MockEndpoint endpoint : endpoints) { 108 endpoint.assertIsSatisfied(); 109 } 110 } 111 112 113 /** 114 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 115 * in the given context are valid 116 * 117 * @param context the camel context used to find all the available endpoints to be asserted 118 */ 119 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 120 ObjectHelper.notNull(context, "camelContext"); 121 Collection<Endpoint> endpoints = context.getSingletonEndpoints(); 122 for (Endpoint endpoint : endpoints) { 123 if (endpoint instanceof MockEndpoint) { 124 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 125 mockEndpoint.assertIsSatisfied(); 126 } 127 } 128 } 129 130 131 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 132 for (MockEndpoint endpoint : endpoints) { 133 endpoint.expectsMessageCount(count); 134 } 135 } 136 137 public List<Exchange> getExchanges() { 138 return getReceivedExchanges(); 139 } 140 141 public void addPropertyChangeListener(PropertyChangeListener listener) { 142 propertyChangeSupport.addPropertyChangeListener(listener); 143 } 144 145 public void removePropertyChangeListener(PropertyChangeListener listener) { 146 propertyChangeSupport.removePropertyChangeListener(listener); 147 } 148 149 public Consumer<Exchange> createConsumer(Processor processor) throws Exception { 150 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 151 } 152 153 public Producer<Exchange> createProducer() throws Exception { 154 return new DefaultProducer<Exchange>(this) { 155 public void process(Exchange exchange) { 156 onExchange(exchange); 157 } 158 }; 159 } 160 161 public void reset() { 162 init(); 163 } 164 165 166 // Testing API 167 // ------------------------------------------------------------------------- 168 169 /** 170 * Set the processor that will be invoked when the index 171 * message is received. 172 * 173 * @param index 174 * @param processor 175 */ 176 public void whenExchangeReceived(int index, Processor processor) { 177 this.processors.put(index, processor); 178 } 179 180 /** 181 * Set the processor that will be invoked when the some message 182 * is received. 183 * 184 * This processor could be overwritten by 185 * {@link #whenExchangeReceived(int, Processor)} method. 186 * 187 * @param processor 188 */ 189 public void whenAnyExchangeReceived(Processor processor) { 190 this.defaultProcessor = processor; 191 } 192 193 /** 194 * Validates that all the available expectations on this endpoint are 195 * satisfied; or throw an exception 196 */ 197 public void assertIsSatisfied() throws InterruptedException { 198 assertIsSatisfied(sleepForEmptyTest); 199 } 200 201 /** 202 * Validates that all the available expectations on this endpoint are 203 * satisfied; or throw an exception 204 * 205 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 206 * should wait for the test to be true 207 */ 208 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 209 LOG.info("Asserting: " + this + " is satisfied"); 210 if (expectedCount >= 0) { 211 if (expectedCount != getReceivedCounter()) { 212 if (expectedCount == 0) { 213 // lets wait a little bit just in case 214 if (timeoutForEmptyEndpoints > 0) { 215 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 216 Thread.sleep(timeoutForEmptyEndpoints); 217 } 218 } else { 219 waitForCompleteLatch(); 220 } 221 } 222 assertEquals("Received message count", expectedCount, getReceivedCounter()); 223 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 224 waitForCompleteLatch(); 225 } 226 227 if (expectedMinimumCount >= 0) { 228 int receivedCounter = getReceivedCounter(); 229 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter); 230 } 231 232 for (Runnable test : tests) { 233 test.run(); 234 } 235 236 for (Throwable failure : failures) { 237 if (failure != null) { 238 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 239 fail("Failed due to caught exception: " + failure); 240 } 241 } 242 } 243 244 /** 245 * Validates that the assertions fail on this endpoint 246 */ 247 public void assertIsNotSatisfied() throws InterruptedException { 248 try { 249 assertIsSatisfied(); 250 fail("Expected assertion failure!"); 251 } catch (AssertionError e) { 252 LOG.info("Caught expected failure: " + e); 253 } 254 } 255 256 /** 257 * Specifies the expected number of message exchanges that should be 258 * received by this endpoint 259 * 260 * @param expectedCount the number of message exchanges that should be 261 * expected by this endpoint 262 */ 263 public void expectedMessageCount(int expectedCount) { 264 setExpectedMessageCount(expectedCount); 265 } 266 267 /** 268 * Specifies the minimum number of expected message exchanges that should be 269 * received by this endpoint 270 * 271 * @param expectedCount the number of message exchanges that should be 272 * expected by this endpoint 273 */ 274 public void expectedMinimumMessageCount(int expectedCount) { 275 setMinimumExpectedMessageCount(expectedCount); 276 } 277 278 /** 279 * Adds an expectation that the given header name & value are received by this 280 * endpoint 281 */ 282 public void expectedHeaderReceived(String name, String value) { 283 this.headerName = name; 284 this.headerValue = value; 285 286 expects(new Runnable() { 287 public void run() { 288 assertTrue("No header with name " + headerName + " found.", actualHeader != null); 289 290 assertEquals("Header of message", headerValue, actualHeader); 291 } 292 }); 293 } 294 295 /** 296 * Adds an expectation that the given body values are received by this 297 * endpoint 298 */ 299 public void expectedBodiesReceived(final List bodies) { 300 expectedMessageCount(bodies.size()); 301 this.expectedBodyValues = bodies; 302 this.actualBodyValues = new ArrayList(); 303 304 expects(new Runnable() { 305 public void run() { 306 for (int i = 0; i < expectedBodyValues.size(); i++) { 307 Exchange exchange = getReceivedExchanges().get(i); 308 assertTrue("No exchange received for counter: " + i, exchange != null); 309 310 Object expectedBody = expectedBodyValues.get(i); 311 Object actualBody = actualBodyValues.get(i); 312 313 assertEquals("Body of message: " + i, expectedBody, actualBody); 314 } 315 } 316 }); 317 } 318 319 /** 320 * Adds an expectation that the given body values are received by this 321 * endpoint 322 */ 323 public void expectedBodiesReceived(Object... bodies) { 324 List bodyList = new ArrayList(); 325 for (Object body : bodies) { 326 bodyList.add(body); 327 } 328 expectedBodiesReceived(bodyList); 329 } 330 331 /** 332 * Adds an expectation that messages received should have ascending values 333 * of the given expression such as a user generated counter value 334 * 335 * @param expression 336 */ 337 public void expectsAscending(final Expression<Exchange> expression) { 338 expects(new Runnable() { 339 public void run() { 340 assertMessagesAscending(expression); 341 } 342 }); 343 } 344 345 /** 346 * Adds an expectation that messages received should have descending values 347 * of the given expression such as a user generated counter value 348 * 349 * @param expression 350 */ 351 public void expectsDescending(final Expression<Exchange> expression) { 352 expects(new Runnable() { 353 public void run() { 354 assertMessagesDescending(expression); 355 } 356 }); 357 } 358 359 /** 360 * Adds an expectation that no duplicate messages should be received using 361 * the expression to determine the message ID 362 * 363 * @param expression the expression used to create a unique message ID for 364 * message comparison (which could just be the message 365 * payload if the payload can be tested for uniqueness using 366 * {@link Object#equals(Object)} and 367 * {@link Object#hashCode()} 368 */ 369 public void expectsNoDuplicates(final Expression<Exchange> expression) { 370 expects(new Runnable() { 371 public void run() { 372 assertNoDuplicates(expression); 373 } 374 }); 375 } 376 377 /** 378 * Asserts that the messages have ascending values of the given expression 379 */ 380 public void assertMessagesAscending(Expression<Exchange> expression) { 381 assertMessagesSorted(expression, true); 382 } 383 384 /** 385 * Asserts that the messages have descending values of the given expression 386 */ 387 public void assertMessagesDescending(Expression<Exchange> expression) { 388 assertMessagesSorted(expression, false); 389 } 390 391 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) { 392 String type = ascending ? "ascending" : "descending"; 393 ExpressionComparator comparator = new ExpressionComparator(expression); 394 List<Exchange> list = getReceivedExchanges(); 395 for (int i = 1; i < list.size(); i++) { 396 int j = i - 1; 397 Exchange e1 = list.get(j); 398 Exchange e2 = list.get(i); 399 int result = comparator.compare(e1, e2); 400 if (result == 0) { 401 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and " 402 + e2); 403 } else { 404 if (!ascending) { 405 result = result * -1; 406 } 407 if (result > 0) { 408 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: " 409 + expression + ". Exchanges: " + e1 + " and " + e2); 410 } 411 } 412 } 413 } 414 415 public void assertNoDuplicates(Expression<Exchange> expression) { 416 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 417 List<Exchange> list = getReceivedExchanges(); 418 for (int i = 0; i < list.size(); i++) { 419 Exchange e2 = list.get(i); 420 Object key = expression.evaluate(e2); 421 Exchange e1 = map.get(key); 422 if (e1 != null) { 423 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 424 } else { 425 map.put(key, e2); 426 } 427 } 428 } 429 430 /** 431 * Adds the expection which will be invoked when enough messages are 432 * received 433 */ 434 public void expects(Runnable runnable) { 435 tests.add(runnable); 436 } 437 438 /** 439 * Adds an assertion to the given message index 440 * 441 * @param messageIndex the number of the message 442 * @return the assertion clause 443 */ 444 public AssertionClause message(final int messageIndex) { 445 AssertionClause clause = new AssertionClause() { 446 public void run() { 447 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 448 } 449 }; 450 expects(clause); 451 return clause; 452 } 453 454 /** 455 * Adds an assertion to all the received messages 456 * 457 * @return the assertion clause 458 */ 459 public AssertionClause allMessages() { 460 AssertionClause clause = new AssertionClause() { 461 public void run() { 462 List<Exchange> list = getReceivedExchanges(); 463 int index = 0; 464 for (Exchange exchange : list) { 465 applyAssertionOn(MockEndpoint.this, index++, exchange); 466 } 467 } 468 }; 469 expects(clause); 470 return clause; 471 } 472 473 /** 474 * Asserts that the given index of message is received (starting at zero) 475 */ 476 public Exchange assertExchangeReceived(int index) { 477 int count = getReceivedCounter(); 478 assertTrue("Not enough messages received. Was: " + count, count > index); 479 return getReceivedExchanges().get(index); 480 } 481 482 // Properties 483 // ------------------------------------------------------------------------- 484 public List<Throwable> getFailures() { 485 return failures; 486 } 487 488 public int getReceivedCounter() { 489 return getReceivedExchanges().size(); 490 } 491 492 public List<Exchange> getReceivedExchanges() { 493 return receivedExchanges; 494 } 495 496 public int getExpectedCount() { 497 return expectedCount; 498 } 499 500 public long getSleepForEmptyTest() { 501 return sleepForEmptyTest; 502 } 503 504 /** 505 * Allows a sleep to be specified to wait to check that this endpoint really 506 * is empty when {@link #expectedMessageCount(int)} is called with zero 507 * 508 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 509 * this endpoint really is empty 510 */ 511 public void setSleepForEmptyTest(long sleepForEmptyTest) { 512 this.sleepForEmptyTest = sleepForEmptyTest; 513 } 514 515 public long getResultWaitTime() { 516 return resultWaitTime; 517 } 518 519 /** 520 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 521 * wait on a latch until it is satisfied 522 */ 523 public void setResultWaitTime(long resultWaitTime) { 524 this.resultWaitTime = resultWaitTime; 525 } 526 527 /** 528 * Specifies the expected number of message exchanges that should be 529 * received by this endpoint 530 * 531 * @param expectedCount the number of message exchanges that should be 532 * expected by this endpoint 533 */ 534 public void setExpectedMessageCount(int expectedCount) { 535 this.expectedCount = expectedCount; 536 if (expectedCount <= 0) { 537 latch = null; 538 } else { 539 latch = new CountDownLatch(expectedCount); 540 } 541 } 542 543 /** 544 * Specifies the minimum number of expected message exchanges that should be 545 * received by this endpoint 546 * 547 * @param expectedCount the number of message exchanges that should be 548 * expected by this endpoint 549 */ 550 public void setMinimumExpectedMessageCount(int expectedCount) { 551 this.expectedMinimumCount = expectedCount; 552 if (expectedCount <= 0) { 553 latch = null; 554 } else { 555 latch = new CountDownLatch(expectedMinimumCount); 556 } 557 } 558 559 public Processor getReporter() { 560 return reporter; 561 } 562 563 /** 564 * Allows a processor to added to the endpoint to report on progress of the test 565 */ 566 public void setReporter(Processor reporter) { 567 this.reporter = reporter; 568 } 569 570 // Implementation methods 571 // ------------------------------------------------------------------------- 572 private void init() { 573 expectedCount = -1; 574 counter = 0; 575 processors = new HashMap<Integer, Processor>(); 576 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 577 failures = new CopyOnWriteArrayList<Throwable>(); 578 tests = new CopyOnWriteArrayList<Runnable>(); 579 latch = null; 580 sleepForEmptyTest = 1000L; 581 resultWaitTime = 20000L; 582 expectedMinimumCount = -1; 583 expectedBodyValues = null; 584 actualBodyValues = new ArrayList(); 585 } 586 587 protected synchronized void onExchange(Exchange exchange) { 588 try { 589 if (reporter != null) { 590 reporter.process(exchange); 591 } 592 593 performAssertions(exchange); 594 } catch (Throwable e) { 595 failures.add(e); 596 } 597 if (latch != null) { 598 latch.countDown(); 599 } 600 } 601 602 protected void performAssertions(Exchange exchange) throws Exception { 603 Message in = exchange.getIn(); 604 Object actualBody = in.getBody(); 605 606 if (headerName != null) { 607 actualHeader = in.getHeader(headerName); 608 } 609 610 if (expectedBodyValues != null) { 611 int index = actualBodyValues.size(); 612 if (expectedBodyValues.size() > index) { 613 Object expectedBody = expectedBodyValues.get(index); 614 if (expectedBody != null) { 615 actualBody = in.getBody(expectedBody.getClass()); 616 } 617 actualBodyValues.add(actualBody); 618 } 619 } 620 621 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody); 622 623 receivedExchanges.add(exchange); 624 625 Processor processor = processors.get(getReceivedCounter()) != null 626 ? processors.get(getReceivedCounter()) : defaultProcessor; 627 628 if (processor != null) { 629 processor.process(exchange); 630 } 631 } 632 633 protected void waitForCompleteLatch() throws InterruptedException { 634 if (latch == null) { 635 fail("Should have a latch!"); 636 } 637 638 // now lets wait for the results 639 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis"); 640 latch.await(resultWaitTime, TimeUnit.MILLISECONDS); 641 } 642 643 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 644 if (!ObjectHelper.equal(expectedValue, actualValue)) { 645 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 646 } 647 } 648 649 protected void assertTrue(String message, boolean predicate) { 650 if (!predicate) { 651 fail(message); 652 } 653 } 654 655 protected void fail(Object message) { 656 if (LOG.isDebugEnabled()) { 657 List<Exchange> list = getReceivedExchanges(); 658 int index = 0; 659 for (Exchange exchange : list) { 660 LOG.debug("Received[" + (++index) + "]: " + exchange); 661 } 662 } 663 throw new AssertionError(getEndpointUri() + " " + message); 664 } 665 666 public int getExpectedMinimumCount() { 667 return expectedMinimumCount; 668 } 669 670 public void await() throws InterruptedException { 671 if (latch != null) { 672 latch.await(); 673 } 674 } 675 676 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 677 if (latch != null) { 678 return latch.await(timeout, unit); 679 } 680 return true; 681 } 682 683 public boolean isSingleton() { 684 return true; 685 } 686 }