1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service;
19
20 import com.sleepycat.je.DatabaseEntry;
21 import junit.framework.TestCase;
22 import org.codehaus.activemq.broker.BrokerClient;
23 import org.codehaus.activemq.broker.impl.BrokerClientImpl;
24 import org.codehaus.activemq.filter.DestinationFilter;
25 import org.codehaus.activemq.filter.FilterFactoryImpl;
26 import org.codehaus.activemq.message.ActiveMQDestination;
27 import org.codehaus.activemq.message.ActiveMQMessage;
28 import org.codehaus.activemq.message.ActiveMQTextMessage;
29 import org.codehaus.activemq.message.ConsumerInfo;
30 import org.codehaus.activemq.service.impl.DispatcherImpl;
31 import org.codehaus.activemq.service.impl.DurableQueueMessageContainerManager;
32 import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
33 import org.codehaus.activemq.service.impl.DurableTopicSubscription;
34 import org.codehaus.activemq.service.impl.DurableTopicSubscriptionContainerImpl;
35 import org.codehaus.activemq.store.PersistenceAdapter;
36 import org.codehaus.activemq.util.Callback;
37 import org.codehaus.activemq.util.IdGenerator;
38 import org.codehaus.activemq.util.TransactionTemplate;
39
40 import javax.jms.JMSException;
41 import javax.jms.TextMessage;
42 import java.io.IOException;
43
44 /***
45 * @version $Revision: 1.13 $
46 */
47 public abstract class MessageStoreTestSupport extends TestCase {
48 protected PersistenceAdapter persistenceAapter;
49 protected MessageContainer container;
50 protected Subscription subscription;
51 protected int publishMessageCount = 10;
52 protected int ackCount = 5;
53 protected ActiveMQMessage[] messages;
54 protected ActiveMQDestination destination;
55 protected IdGenerator idGenerator = new IdGenerator();
56 protected MessageContainerManager messageContainerManager;
57 protected BrokerClient client = new BrokerClientImpl();
58 protected TransactionTemplate template;
59
60 public void testRecovery() throws Exception {
61 System.out.println("Publishing: " + publishMessageCount + " messages");
62
63 for (int i = 0; i < publishMessageCount; i++) {
64 doAddMessage(i);
65 }
66
67 dumpMessageIdentities("After add");
68
69 assertDeliveryList(0, publishMessageCount);
70
71
72 System.out.println("Acknowledging the first: " + ackCount + " messages");
73 for (int i = 0; i < ackCount; i++) {
74 doAcknowledgeMessage(i);
75 }
76
77
78 assertDeliveryList(0, 0);
79
80 dumpMessageIdentities("After ack of first part");
81
82
83 closeAndReopenContainer();
84
85 assertDeliveryList(ackCount, publishMessageCount);
86
87 dumpMessageIdentities("About to perform final ack");
88
89 for (int i = ackCount; i < publishMessageCount; i++) {
90 doAcknowledgeMessage(i);
91 }
92 }
93
94 public void testRecoveryOfNewConsumerWhichHasYetToAck() throws Exception {
95
96 assertDeliveryList(0, publishMessageCount);
97
98
99 assertDeliveryList(0, 0);
100
101
102 closeAndReopenContainer();
103
104 assertDeliveryList(0, publishMessageCount);
105 }
106
107 protected abstract void acknowledgeMessage(int i) throws JMSException;
108
109 protected abstract PersistenceAdapter createPersistenceAdapter() throws IOException, Exception;
110
111 protected abstract ActiveMQDestination createDestination();
112
113 protected abstract ActiveMQMessage[] getMessagesToDispatch() throws JMSException;
114
115
116 protected void doAcknowledgeMessage(final int i) throws JMSException {
117 template.run(new Callback() {
118 public void execute() throws Throwable {
119 acknowledgeMessage(i);
120 }
121 });
122 }
123
124 protected void doAddMessage(int i) throws JMSException {
125 final ActiveMQMessage message = getMessage(i);
126 template.run(new Callback() {
127 public void execute() throws Throwable {
128 container.addMessage(message);
129 }
130 });
131 }
132
133 protected void dumpMessageIdentities(String text) throws JMSException {
134 System.out.println("#### Dumping identities at: " + text);
135 for (int i = 0; i < publishMessageCount; i++) {
136 ActiveMQMessage message = getMessage(i);
137 MessageIdentity identity = message.getJMSMessageIdentity();
138 Object sequenceNo = identity.getSequenceNumber();
139 String sequenceText = null;
140 if (sequenceNo != null) {
141 sequenceText = sequenceNo.toString();
142 if (sequenceNo instanceof DatabaseEntry) {
143 DatabaseEntry entry = (DatabaseEntry) sequenceNo;
144 byte[] data = entry.getData();
145 sequenceText = asText(data);
146 }
147 }
148 System.out.println("item: " + i + " is: " + sequenceText);
149 }
150 System.out.println();
151 }
152
153 protected String asText(byte[] data) {
154 StringBuffer buffer = new StringBuffer("[ ");
155 for (int i = 0; i < data.length; i++) {
156 if (i > 0) {
157 buffer.append(", ");
158 }
159 buffer.append(Byte.toString(data[i]));
160 }
161 buffer.append(" ]");
162 return buffer.toString();
163 }
164
165
166 protected MessageContainer createTopicMessageContainer() throws JMSException {
167 if (destination.isTopic()) {
168 return persistenceAapter.createTopicMessageContainer(destination.toString());
169 }
170 else {
171 return persistenceAapter.createQueueMessageContainer(destination.toString());
172 }
173 }
174
175 protected Subscription createSubscription() throws JMSException {
176 DestinationFilter filter = DestinationFilter.parseFilter(destination);
177 ConsumerInfo consumerInfo = createConsumerInfo();
178
179
180 messageContainerManager.addMessageConsumer(client, consumerInfo);
181
182 return new DurableTopicSubscription(new DispatcherImpl(), client, consumerInfo, filter, new RedeliveryPolicy());
183 }
184
185 protected ConsumerInfo createConsumerInfo() {
186 ConsumerInfo answer = new ConsumerInfo();
187 answer.setClientId(getClientID());
188 answer.setConsumerId(idGenerator.generateId());
189 answer.setConsumerName(getConsumerName());
190 answer.setDestination(destination);
191 answer.setPrefetchNumber(100);
192 answer.setSessionId(idGenerator.generateId());
193 answer.setStarted(true);
194 return answer;
195 }
196
197 protected String getConsumerName() {
198 return getName();
199 }
200
201 protected String getClientID() {
202 return getClass().getName();
203 }
204
205 protected void setUp() throws Exception {
206 super.setUp();
207 this.messages = new ActiveMQMessage[publishMessageCount];
208 this.destination = createDestination();
209
210 this.persistenceAapter = createPersistenceAdapter();
211 persistenceAapter.start();
212
213 template = new TransactionTemplate(persistenceAapter);
214
215 this.messageContainerManager = createMessageContainerManager();
216
217 this.container = messageContainerManager.getContainer(this.destination.getPhysicalName());
218 assertTrue("Should have created a container", container != null);
219
220 this.subscription = createSubscription();
221
222 }
223
224 protected void tearDown() throws Exception {
225 messageContainerManager.stop();
226 persistenceAapter.stop();
227 super.tearDown();
228 }
229
230 protected MessageContainerManager createMessageContainerManager() {
231 if (destination.isTopic()) {
232 return new DurableTopicMessageContainerManager(persistenceAapter, new DurableTopicSubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
233 }
234 else {
235 return new DurableQueueMessageContainerManager(persistenceAapter, new DurableTopicSubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
236 }
237 }
238
239 protected void assertDeliveryList(final int startIndex, final int lastIndex) throws JMSException {
240 template.run(new Callback() {
241 public void execute() throws Throwable {
242 ActiveMQMessage[] messagesToDispatch = getMessagesToDispatch();
243 int count = lastIndex - startIndex;
244 assertTrue("Not enough messages available to dispatch. Expected: " + count
245 + " messages but was: " + messagesToDispatch.length, messagesToDispatch.length >= count);
246
247 for (int i = 0; i < count; i++) {
248 ActiveMQMessage expected = getMessage(i + startIndex);
249 ActiveMQMessage actual = messagesToDispatch[i];
250 assertMessagesEqual("Dispatched message at index: " + i, expected, actual);
251 }
252 }
253 });
254 }
255
256 protected void assertMessagesEqual(String description, ActiveMQMessage expected, ActiveMQMessage actual) throws JMSException {
257 assertEquals("MessageText compare. " + description, ((TextMessage) expected).getText(), ((TextMessage) actual).getText());
258 assertEquals("MessageID compare. " + description + " expected: " + expected + " actual: " + actual, expected.getJMSMessageID(), actual.getJMSMessageID());
259 assertEquals(description, expected, actual);
260 }
261
262 protected ActiveMQMessage getMessage(int i) throws JMSException {
263 if (messages[i] == null) {
264 messages[i] = createMessage(i);
265 }
266 return messages[i];
267 }
268
269 protected ActiveMQMessage createMessage(int i) throws JMSException {
270 ActiveMQTextMessage answer = new ActiveMQTextMessage();
271 answer.setJMSMessageID(idGenerator.generateId());
272 answer.setJMSClientID(getClientID());
273 answer.setJMSDestination(destination);
274 answer.setText("message index: " + i);
275 return answer;
276 }
277
278 protected void closeAndReopenContainer() throws Exception {
279 subscription.clear();
280
281 messageContainerManager.stop();
282 persistenceAapter.stop();
283
284 persistenceAapter = createPersistenceAdapter();
285 persistenceAapter.start();
286
287 template = new TransactionTemplate(persistenceAapter);
288
289 this.messageContainerManager = createMessageContainerManager();
290
291 container = messageContainerManager.getContainer(destination.getPhysicalName());
292
293 this.subscription = createSubscription();
294
295 template.run(new Callback() {
296 public void execute() throws Throwable {
297 recover();
298 }
299 });
300 }
301
302 protected void recover() throws JMSException {
303 }
304
305 protected String getSubject() {
306 return getClass().getName() + "." + getName();
307 }
308 }