001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.dataset; 018 019 import java.util.concurrent.atomic.AtomicInteger; 020 021 import org.apache.camel.Component; 022 import org.apache.camel.Consumer; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Message; 025 import org.apache.camel.PollingConsumer; 026 import org.apache.camel.Processor; 027 import org.apache.camel.Service; 028 import org.apache.camel.component.mock.MockEndpoint; 029 import org.apache.camel.impl.EventDrivenPollingConsumer; 030 import org.apache.camel.util.ExchangeHelper; 031 import org.apache.camel.util.ObjectHelper; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * @version $Revision: 41278 $ 037 */ 038 public class DataSetEndpoint extends MockEndpoint implements Service { 039 private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class); 040 private DataSet dataSet; 041 private AtomicInteger receivedCounter = new AtomicInteger(); 042 private long produceDelay = -1; 043 private long consumeDelay = -1; 044 private long startTime; 045 private long preloadSize; 046 047 public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) { 048 super(endpointUri, component); 049 this.dataSet = dataSet; 050 } 051 052 public DataSetEndpoint(String endpointUri, DataSet dataSet) { 053 super(endpointUri); 054 this.dataSet = dataSet; 055 } 056 057 public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) { 058 if (!ObjectHelper.equal(expected, actual)) { 059 throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders()); 060 } 061 } 062 063 @Override 064 public PollingConsumer<Exchange> createPollingConsumer() throws Exception { 065 return new EventDrivenPollingConsumer<Exchange>(this); 066 } 067 068 @Override 069 public Consumer<Exchange> createConsumer(Processor processor) throws Exception { 070 return new DataSetConsumer(this, processor); 071 } 072 073 @Override 074 public void reset() { 075 super.reset(); 076 receivedCounter.set(0); 077 } 078 079 @Override 080 public int getReceivedCounter() { 081 return receivedCounter.get(); 082 } 083 084 /** 085 * Creates a message exchange for the given index in the {@link DataSet} 086 */ 087 public Exchange createExchange(long messageIndex) throws Exception { 088 Exchange exchange = createExchange(); 089 getDataSet().populateMessage(exchange, messageIndex); 090 091 Message in = exchange.getIn(); 092 in.setHeader(DataSet.INDEX_HEADER, messageIndex); 093 094 return exchange; 095 } 096 097 @Override 098 protected void waitForCompleteLatch() throws InterruptedException { 099 // TODO lets do a much better version of this! 100 long size = getDataSet().getSize(); 101 size *= 4000; 102 setResultWaitTime(size); 103 super.waitForCompleteLatch(); 104 } 105 106 // Properties 107 //------------------------------------------------------------------------- 108 109 public DataSet getDataSet() { 110 return dataSet; 111 } 112 113 public void setDataSet(DataSet dataSet) { 114 this.dataSet = dataSet; 115 } 116 117 public long getPreloadSize() { 118 return preloadSize; 119 } 120 121 /** 122 * Sets how many messages should be preloaded (sent) before the route completes its initialisation 123 */ 124 public void setPreloadSize(long preloadSize) { 125 this.preloadSize = preloadSize; 126 } 127 128 public long getConsumeDelay() { 129 return consumeDelay; 130 } 131 132 /** 133 * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers 134 */ 135 public void setConsumeDelay(long consumeDelay) { 136 this.consumeDelay = consumeDelay; 137 } 138 139 public long getProduceDelay() { 140 return produceDelay; 141 } 142 143 /** 144 * Allows a delay to be specified which causes producers to pause - to simpulate slow producers 145 */ 146 public void setProduceDelay(long produceDelay) { 147 this.produceDelay = produceDelay; 148 } 149 150 // Implementation methods 151 //------------------------------------------------------------------------- 152 153 @Override 154 protected void performAssertions(Exchange actual) throws Exception { 155 if (startTime == 0) { 156 startTime = System.currentTimeMillis(); 157 } 158 int receivedCount = receivedCounter.incrementAndGet(); 159 long index = receivedCount - 1; 160 Exchange expected = createExchange(index); 161 162 // now lets assert that they are the same 163 if (LOG.isDebugEnabled()) { 164 LOG.debug("Received message: " + index + " = " + actual); 165 } 166 167 assertMessageExpected(index, expected, actual); 168 169 if (consumeDelay > 0) { 170 Thread.sleep(consumeDelay); 171 } 172 173 long group = getDataSet().getReportCount(); 174 if (receivedCount % group == 0) { 175 reportProgress(actual, receivedCount); 176 } 177 } 178 179 protected void reportProgress(Exchange actual, int receivedCount) { 180 long time = System.currentTimeMillis(); 181 long elapsed = time - startTime; 182 startTime = time; 183 184 LOG.info("Received: " + receivedCount + " messages so far. Last group took: " + elapsed + " millis"); 185 } 186 187 protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception { 188 long actualCounter = ExchangeHelper.getMandatoryHeader(actual, DataSet.INDEX_HEADER, Long.class); 189 assertEquals("Header: " + DataSet.INDEX_HEADER, index, actualCounter, actual); 190 191 getDataSet().assertMessageExpected(this, expected, actual, index); 192 } 193 194 public void start() throws Exception { 195 long size = getDataSet().getSize(); 196 expectedMessageCount((int) size); 197 } 198 199 public void stop() throws Exception { 200 } 201 }