001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.mock;
018    
019    import java.beans.PropertyChangeListener;
020    import java.beans.PropertyChangeSupport;
021    import java.util.ArrayList;
022    import java.util.Arrays;
023    import java.util.Collection;
024    import java.util.HashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.concurrent.CopyOnWriteArrayList;
028    import java.util.concurrent.CountDownLatch;
029    import java.util.concurrent.TimeUnit;
030    
031    import org.apache.camel.CamelContext;
032    import org.apache.camel.Component;
033    import org.apache.camel.Consumer;
034    import org.apache.camel.Endpoint;
035    import org.apache.camel.Exchange;
036    import org.apache.camel.Expression;
037    import org.apache.camel.Message;
038    import org.apache.camel.Processor;
039    import org.apache.camel.Producer;
040    import org.apache.camel.impl.DefaultEndpoint;
041    import org.apache.camel.impl.DefaultProducer;
042    import org.apache.camel.spi.BrowsableEndpoint;
043    import org.apache.camel.util.ExpressionComparator;
044    import org.apache.camel.util.ObjectHelper;
045    import org.apache.commons.logging.Log;
046    import org.apache.commons.logging.LogFactory;
047    
048    /**
049     * A Mock endpoint which provides a literate, fluent API for testing routes
050     * using a <a href="http://jmock.org/">JMock style</a> API.
051     *
052     * @version $Revision: 41895 $
053     */
054    public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> {
055        private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
056        private int expectedCount;
057        private int counter;
058        private Processor defaultProcessor;
059        private Map<Integer, Processor> processors;
060        private List<Exchange> receivedExchanges;
061        private List<Throwable> failures;
062        private List<Runnable> tests;
063        private CountDownLatch latch;
064        private long sleepForEmptyTest;
065        private long resultWaitTime;
066        private int expectedMinimumCount;
067        private List expectedBodyValues;
068        private List actualBodyValues;
069        private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
070        private String headerName;
071        private String headerValue;
072        private Object actualHeader;
073        private Processor reporter;
074    
075        public MockEndpoint(String endpointUri, Component component) {
076            super(endpointUri, component);
077            init();
078        }
079    
080        public MockEndpoint(String endpointUri) {
081            super(endpointUri);
082            init();
083        }
084    
085        public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
086            long start = System.currentTimeMillis();
087            long left = unit.toMillis(timeout);
088            long end = start + left;
089            for (MockEndpoint endpoint : endpoints) {
090                if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
091                    throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
092                }
093                left = end - System.currentTimeMillis();
094                if (left <= 0) {
095                    left = 0;
096                }
097            }
098        }
099    
100        public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
101            assertWait(timeout, unit, endpoints);
102            for (MockEndpoint endpoint : endpoints) {
103                endpoint.assertIsSatisfied();
104            }
105        }
106    
107        public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
108            for (MockEndpoint endpoint : endpoints) {
109                endpoint.assertIsSatisfied();
110            }
111        }
112    
113    
114        /**
115         * Asserts that all the expectations on any {@link MockEndpoint} instances registered
116         * in the given context are valid
117         *
118         * @param context the camel context used to find all the available endpoints to be asserted
119         */
120        public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
121            ObjectHelper.notNull(context, "camelContext");
122            Collection<Endpoint> endpoints = context.getSingletonEndpoints();
123            for (Endpoint endpoint : endpoints) {
124                if (endpoint instanceof MockEndpoint) {
125                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
126                    mockEndpoint.assertIsSatisfied();
127                }
128            }
129        }
130    
131    
132        public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
133            for (MockEndpoint endpoint : endpoints) {
134                MockEndpoint.expectsMessageCount(count);
135            }
136        }
137    
138        public List<Exchange> getExchanges() {
139            return getReceivedExchanges();
140        }
141    
142        public void addPropertyChangeListener(PropertyChangeListener listener) {
143            propertyChangeSupport.addPropertyChangeListener(listener);
144        }
145    
146        public void removePropertyChangeListener(PropertyChangeListener listener) {
147            propertyChangeSupport.removePropertyChangeListener(listener);
148        }
149    
150        public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
151            throw new UnsupportedOperationException("You cannot consume from this endpoint");
152        }
153    
154        public Producer<Exchange> createProducer() throws Exception {
155            return new DefaultProducer<Exchange>(this) {
156                public void process(Exchange exchange) {
157                    onExchange(exchange);
158                }
159            };
160        }
161    
162        public void reset() {
163            init();
164        }
165    
166    
167        // Testing API
168        // -------------------------------------------------------------------------
169    
170        /**
171         * Set the processor that will be invoked when the index
172         * message is received.
173         *
174         * @param index
175         * @param processor
176         */
177        public void whenExchangeReceived(int index, Processor processor) {
178            this.processors.put(index, processor);
179        }
180    
181        /**
182         * Set the processor that will be invoked when the some message
183         * is received.
184         *
185         * This processor could be overwritten by
186         * {@link #whenExchangeReceived(int, Processor)} method.
187         *
188         * @param processor
189         */
190        public void whenAnyExchangeReceived(Processor processor) {
191            this.defaultProcessor = processor;
192        }
193    
194        /**
195         * Validates that all the available expectations on this endpoint are
196         * satisfied; or throw an exception
197         */
198        public void assertIsSatisfied() throws InterruptedException {
199            assertIsSatisfied(sleepForEmptyTest);
200        }
201    
202        /**
203         * Validates that all the available expectations on this endpoint are
204         * satisfied; or throw an exception
205         *
206         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
207         *                should wait for the test to be true
208         */
209        public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
210            LOG.info("Asserting: " + this + " is satisfied");
211            if (expectedCount >= 0) {
212                if (expectedCount != getReceivedCounter()) {
213                    if (expectedCount == 0) {
214                        // lets wait a little bit just in case
215                        if (timeoutForEmptyEndpoints > 0) {
216                            LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
217                            Thread.sleep(timeoutForEmptyEndpoints);
218                        }
219                    } else {
220                        waitForCompleteLatch();
221                    }
222                }
223                assertEquals("Received message count", expectedCount, getReceivedCounter());
224            } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
225                waitForCompleteLatch();
226            }
227    
228            if (expectedMinimumCount >= 0) {
229                int receivedCounter = getReceivedCounter();
230                assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter);
231            }
232    
233            for (Runnable test : tests) {
234                test.run();
235            }
236    
237            for (Throwable failure : failures) {
238                if (failure != null) {
239                    LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
240                    fail("Failed due to caught exception: " + failure);
241                }
242            }
243        }
244    
245        /**
246         * Validates that the assertions fail on this endpoint
247         */
248        public void assertIsNotSatisfied() throws InterruptedException {
249            try {
250                assertIsSatisfied();
251                fail("Expected assertion failure!");
252            } catch (AssertionError e) {
253                LOG.info("Caught expected failure: " + e);
254            }
255        }
256    
257        /**
258         * Specifies the expected number of message exchanges that should be
259         * received by this endpoint
260         *
261         * @param expectedCount the number of message exchanges that should be
262         *                expected by this endpoint
263         */
264        public void expectedMessageCount(int expectedCount) {
265            setExpectedMessageCount(expectedCount);
266        }
267    
268        /**
269         * Specifies the minimum number of expected message exchanges that should be
270         * received by this endpoint
271         *
272         * @param expectedCount the number of message exchanges that should be
273         *                expected by this endpoint
274         */
275        public void expectedMinimumMessageCount(int expectedCount) {
276            setMinimumExpectedMessageCount(expectedCount);
277        }
278    
279        /**
280         * Adds an expectation that the given header name & value are received by this
281         * endpoint
282         */
283        public void expectedHeaderReceived(String name, String value) {
284            this.headerName = name;
285            this.headerValue = value;
286    
287            expects(new Runnable() {
288                public void run() {
289                    assertTrue("No header with name " + headerName + " found.", actualHeader != null);
290    
291                    assertEquals("Header of message", headerValue, actualHeader);
292                }
293            });
294        }
295    
296        /**
297         * Adds an expectation that the given body values are received by this
298         * endpoint
299         */
300        public void expectedBodiesReceived(final List bodies) {
301            expectedMessageCount(bodies.size());
302            this.expectedBodyValues = bodies;
303            this.actualBodyValues = new ArrayList();
304    
305            expects(new Runnable() {
306                public void run() {
307                    for (int i = 0; i < expectedBodyValues.size(); i++) {
308                        Exchange exchange = getReceivedExchanges().get(i);
309                        assertTrue("No exchange received for counter: " + i, exchange != null);
310    
311                        Object expectedBody = expectedBodyValues.get(i);
312                        Object actualBody = actualBodyValues.get(i);
313    
314                        assertEquals("Body of message: " + i, expectedBody, actualBody);
315                    }
316                }
317            });
318        }
319    
320        /**
321         * Adds an expectation that the given body values are received by this
322         * endpoint
323         */
324        public void expectedBodiesReceived(Object... bodies) {
325            List bodyList = new ArrayList();
326            bodyList.addAll(Arrays.asList(bodies));
327            expectedBodiesReceived(bodyList);
328        }
329    
330        /**
331         * Adds an expectation that messages received should have ascending values
332         * of the given expression such as a user generated counter value
333         *
334         * @param expression
335         */
336        public void expectsAscending(final Expression<Exchange> expression) {
337            expects(new Runnable() {
338                public void run() {
339                    assertMessagesAscending(expression);
340                }
341            });
342        }
343    
344        /**
345         * Adds an expectation that messages received should have descending values
346         * of the given expression such as a user generated counter value
347         *
348         * @param expression
349         */
350        public void expectsDescending(final Expression<Exchange> expression) {
351            expects(new Runnable() {
352                public void run() {
353                    assertMessagesDescending(expression);
354                }
355            });
356        }
357    
358        /**
359         * Adds an expectation that no duplicate messages should be received using
360         * the expression to determine the message ID
361         *
362         * @param expression the expression used to create a unique message ID for
363         *                message comparison (which could just be the message
364         *                payload if the payload can be tested for uniqueness using
365         *                {@link Object#equals(Object)} and
366         *                {@link Object#hashCode()}
367         */
368        public void expectsNoDuplicates(final Expression<Exchange> expression) {
369            expects(new Runnable() {
370                public void run() {
371                    assertNoDuplicates(expression);
372                }
373            });
374        }
375    
376        /**
377         * Asserts that the messages have ascending values of the given expression
378         */
379        public void assertMessagesAscending(Expression<Exchange> expression) {
380            assertMessagesSorted(expression, true);
381        }
382    
383        /**
384         * Asserts that the messages have descending values of the given expression
385         */
386        public void assertMessagesDescending(Expression<Exchange> expression) {
387            assertMessagesSorted(expression, false);
388        }
389    
390        protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) {
391            String type = ascending ? "ascending" : "descending";
392            ExpressionComparator comparator = new ExpressionComparator(expression);
393            List<Exchange> list = getReceivedExchanges();
394            for (int i = 1; i < list.size(); i++) {
395                int j = i - 1;
396                Exchange e1 = list.get(j);
397                Exchange e2 = list.get(i);
398                int result = comparator.compare(e1, e2);
399                if (result == 0) {
400                    fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
401                         + e2);
402                } else {
403                    if (!ascending) {
404                        result = result * -1;
405                    }
406                    if (result > 0) {
407                        fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
408                             + expression + ". Exchanges: " + e1 + " and " + e2);
409                    }
410                }
411            }
412        }
413    
414        public void assertNoDuplicates(Expression<Exchange> expression) {
415            Map<Object, Exchange> map = new HashMap<Object, Exchange>();
416            List<Exchange> list = getReceivedExchanges();
417            for (int i = 0; i < list.size(); i++) {
418                Exchange e2 = list.get(i);
419                Object key = expression.evaluate(e2);
420                Exchange e1 = map.get(key);
421                if (e1 != null) {
422                    fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
423                } else {
424                    map.put(key, e2);
425                }
426            }
427        }
428    
429        /**
430         * Adds the expection which will be invoked when enough messages are
431         * received
432         */
433        public void expects(Runnable runnable) {
434            tests.add(runnable);
435        }
436    
437        /**
438         * Adds an assertion to the given message index
439         *
440         * @param messageIndex the number of the message
441         * @return the assertion clause
442         */
443        public AssertionClause message(final int messageIndex) {
444            AssertionClause clause = new AssertionClause() {
445                public void run() {
446                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
447                }
448            };
449            expects(clause);
450            return clause;
451        }
452    
453        /**
454         * Adds an assertion to all the received messages
455         *
456         * @return the assertion clause
457         */
458        public AssertionClause allMessages() {
459            AssertionClause clause = new AssertionClause() {
460                public void run() {
461                    List<Exchange> list = getReceivedExchanges();
462                    int index = 0;
463                    for (Exchange exchange : list) {
464                        applyAssertionOn(MockEndpoint.this, index++, exchange);
465                    }
466                }
467            };
468            expects(clause);
469            return clause;
470        }
471    
472        /**
473         * Asserts that the given index of message is received (starting at zero)
474         */
475        public Exchange assertExchangeReceived(int index) {
476            int count = getReceivedCounter();
477            assertTrue("Not enough messages received. Was: " + count, count > index);
478            return getReceivedExchanges().get(index);
479        }
480    
481        // Properties
482        // -------------------------------------------------------------------------
483        public List<Throwable> getFailures() {
484            return failures;
485        }
486    
487        public int getReceivedCounter() {
488            return getReceivedExchanges().size();
489        }
490    
491        public List<Exchange> getReceivedExchanges() {
492            return receivedExchanges;
493        }
494    
495        public int getExpectedCount() {
496            return expectedCount;
497        }
498    
499        public long getSleepForEmptyTest() {
500            return sleepForEmptyTest;
501        }
502    
503        /**
504         * Allows a sleep to be specified to wait to check that this endpoint really
505         * is empty when {@link #expectedMessageCount(int)} is called with zero
506         *
507         * @param sleepForEmptyTest the milliseconds to sleep for to determine that
508         *                this endpoint really is empty
509         */
510        public void setSleepForEmptyTest(long sleepForEmptyTest) {
511            this.sleepForEmptyTest = sleepForEmptyTest;
512        }
513    
514        public long getResultWaitTime() {
515            return resultWaitTime;
516        }
517    
518        /**
519         * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
520         * wait on a latch until it is satisfied
521         */
522        public void setResultWaitTime(long resultWaitTime) {
523            this.resultWaitTime = resultWaitTime;
524        }
525    
526        /**
527         * Specifies the expected number of message exchanges that should be
528         * received by this endpoint
529         *
530         * @param expectedCount the number of message exchanges that should be
531         *                expected by this endpoint
532         */
533        public void setExpectedMessageCount(int expectedCount) {
534            this.expectedCount = expectedCount;
535            if (expectedCount <= 0) {
536                latch = null;
537            } else {
538                latch = new CountDownLatch(expectedCount);
539            }
540        }
541    
542        /**
543         * Specifies the minimum number of expected message exchanges that should be
544         * received by this endpoint
545         *
546         * @param expectedCount the number of message exchanges that should be
547         *                expected by this endpoint
548         */
549        public void setMinimumExpectedMessageCount(int expectedCount) {
550            this.expectedMinimumCount = expectedCount;
551            if (expectedCount <= 0) {
552                latch = null;
553            } else {
554                latch = new CountDownLatch(expectedMinimumCount);
555            }
556        }
557    
558        public Processor getReporter() {
559            return reporter;
560        }
561    
562        /**
563         * Allows a processor to added to the endpoint to report on progress of the test
564         */
565        public void setReporter(Processor reporter) {
566            this.reporter = reporter;
567        }
568    
569        // Implementation methods
570        // -------------------------------------------------------------------------
571        private void init() {
572            expectedCount = -1;
573            counter = 0;
574            processors = new HashMap<Integer, Processor>();
575            receivedExchanges = new CopyOnWriteArrayList<Exchange>();
576            failures = new CopyOnWriteArrayList<Throwable>();
577            tests = new CopyOnWriteArrayList<Runnable>();
578            latch = null;
579            sleepForEmptyTest = 1000L;
580            resultWaitTime = 20000L;
581            expectedMinimumCount = -1;
582            expectedBodyValues = null;
583            actualBodyValues = new ArrayList();
584        }
585    
586        protected synchronized void onExchange(Exchange exchange) {
587            try {
588                if (reporter != null) {
589                    reporter.process(exchange);
590                }
591    
592                performAssertions(exchange);
593            } catch (Throwable e) {
594                failures.add(e);
595            }
596            if (latch != null) {
597                latch.countDown();
598            }
599        }
600    
601        protected void performAssertions(Exchange exchange) throws Exception {
602            Message in = exchange.getIn();
603            Object actualBody = in.getBody();
604    
605            if (headerName != null) {
606                actualHeader = in.getHeader(headerName);
607            }
608    
609            if (expectedBodyValues != null) {
610                int index = actualBodyValues.size();
611                if (expectedBodyValues.size() > index) {
612                    Object expectedBody = expectedBodyValues.get(index);
613                    if (expectedBody != null) {
614                        actualBody = in.getBody(expectedBody.getClass());
615                    }
616                    actualBodyValues.add(actualBody);
617                }
618            }
619    
620            LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
621    
622            receivedExchanges.add(exchange);
623    
624            Processor processor = processors.get(getReceivedCounter()) != null
625                    ? processors.get(getReceivedCounter()) : defaultProcessor;
626    
627            if (processor != null) {
628                processor.process(exchange);
629            }
630        }
631    
632        protected void waitForCompleteLatch() throws InterruptedException {
633            if (latch == null) {
634                fail("Should have a latch!");
635            }
636    
637            // now lets wait for the results
638            LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
639            latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
640        }
641    
642        protected void assertEquals(String message, Object expectedValue, Object actualValue) {
643            if (!ObjectHelper.equal(expectedValue, actualValue)) {
644                fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
645            }
646        }
647    
648        protected void assertTrue(String message, boolean predicate) {
649            if (!predicate) {
650                fail(message);
651            }
652        }
653    
654        protected void fail(Object message) {
655            if (LOG.isDebugEnabled()) {
656                List<Exchange> list = getReceivedExchanges();
657                int index = 0;
658                for (Exchange exchange : list) {
659                    LOG.debug("Received[" + (++index) + "]: " + exchange);
660                }
661            }
662            throw new AssertionError(getEndpointUri() + " " + message);
663        }
664    
665        public int getExpectedMinimumCount() {
666            return expectedMinimumCount;
667        }
668    
669        public void await() throws InterruptedException {
670            if (latch != null) {
671                latch.await();
672            }
673        }
674    
675        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
676            if (latch != null) {
677                return latch.await(timeout, unit);
678            }
679            return true;
680        }
681    
682        public boolean isSingleton() {
683            return true;
684        }
685    }