1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import javax.jms.Connection;
21  import javax.jms.ConnectionFactory;
22  import javax.jms.Destination;
23  import javax.jms.Message;
24  import javax.jms.MessageConsumer;
25  import javax.jms.MessageProducer;
26  import javax.jms.Session;
27  import java.util.ArrayList;
28  
29  /***
30   * @version $Revision: 1.7 $
31   */
32  abstract public class JmsTransactionTestSupport extends TestSupport {
33  
34      protected ConnectionFactory connectionFactory;
35      protected Connection connection;
36      protected Session session;
37      protected MessageConsumer consumer;
38      protected MessageProducer producer;
39  
40      public JmsTransactionTestSupport() {
41          super();
42      }
43  
44      public JmsTransactionTestSupport(String name) {
45          super(name);
46      }
47  
48  
49      public void testSendRollback() throws Exception {
50  
51          Message[] outbound = new Message[]{
52              session.createTextMessage("First Message"),
53              session.createTextMessage("Second Message")
54          };
55  
56          producer.send(outbound[0]);
57          session.commit();
58          producer.send(session.createTextMessage("I'm going to get rolled back."));
59          session.rollback();
60          producer.send(outbound[1]);
61          session.commit();
62  
63          ArrayList messages = new ArrayList();
64          System.out.println("About to consume message 1");
65          Message message = consumer.receive(1000);
66          messages.add(message);
67          System.out.println("Received: " + message);
68  
69          System.out.println("About to consume message 2");
70          message = consumer.receive(4000);
71          messages.add(message);
72          System.out.println("Received: " + message);
73  
74          session.commit();
75  
76          Message inbound[] = new Message[messages.size()];
77          messages.toArray(inbound);
78  
79          assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
80      }
81  
82      public void testReceiveRollback() throws Exception {
83  
84          Message[] outbound = new Message[]{
85              session.createTextMessage("First Message"),
86              session.createTextMessage("Second Message")
87          };
88  
89          // lets consume any outstanding messages from previous test runs
90          while (consumer.receive(1000) != null) {
91          }
92          session.commit();
93  
94          producer.send(outbound[0]);
95          producer.send(outbound[1]);
96          session.commit();
97  
98          System.out.println("Sent 0: " + outbound[0]);
99          System.out.println("Sent 1: " + outbound[1]);
100 
101         ArrayList messages = new ArrayList();
102         Message message = consumer.receive(1000);
103         messages.add(message);
104         assertEquals(outbound[0], message);
105         session.commit();
106 
107         // rollback so we can get that last message again.
108         message = consumer.receive(1000);
109         assertNotNull(message);
110         assertEquals(outbound[1], message);
111         session.rollback();
112 
113         // Consume again.. the previous message should
114         // get redelivered.
115         message = consumer.receive(5000);
116         assertNotNull("Should have re-received the message again!", message);
117         messages.add(message);
118         session.commit();
119 
120         Message inbound[] = new Message[messages.size()];
121         messages.toArray(inbound);
122 
123         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
124     }
125 
126     public void testReceiveTwoThenRollback() throws Exception {
127 
128         Message[] outbound = new Message[]{
129             session.createTextMessage("First Message"),
130             session.createTextMessage("Second Message")
131         };
132 
133         // lets consume any outstanding messages from previous test runs
134         while (consumer.receive(1000) != null) {
135         }
136         session.commit();
137 
138         producer.send(outbound[0]);
139         producer.send(outbound[1]);
140         session.commit();
141 
142         System.out.println("Sent 0: " + outbound[0]);
143         System.out.println("Sent 1: " + outbound[1]);
144 
145         ArrayList messages = new ArrayList();
146         Message message = consumer.receive(1000);
147         assertEquals(outbound[0], message);
148 
149         message = consumer.receive(1000);
150         assertNotNull(message);
151         assertEquals(outbound[1], message);
152         session.rollback();
153 
154         // Consume again.. the previous message should
155         // get redelivered.
156         message = consumer.receive(5000);
157         assertNotNull("Should have re-received the first message again!", message);
158         messages.add(message);
159         assertEquals(outbound[0], message);
160 
161         message = consumer.receive(5000);
162         assertNotNull("Should have re-received the second message again!", message);
163         messages.add(message);
164         assertEquals(outbound[1], message);
165         session.commit();
166 
167         Message inbound[] = new Message[messages.size()];
168         messages.toArray(inbound);
169 
170         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
171     }
172 
173     public void testSendRollbackWithPrefetchOfOne() throws Exception {
174         setPrefetchToOne();
175         testSendRollback();
176     }
177 
178     public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
179         setPrefetchToOne();
180         testReceiveRollback();
181     }
182 
183     protected abstract JmsResourceProvider getJmsResourceProvider();
184 
185     protected void setUp() throws Exception {
186         super.setUp();
187 
188         JmsResourceProvider p = getJmsResourceProvider();
189         // We will be using transacted sessions.
190         p.setTransacted(true);
191 
192         connectionFactory = p.createConnectionFactory();
193         connection = p.createConnection(connectionFactory);
194         System.out.println("Created connection: " + connection);
195         session = p.createSession(connection);
196         System.out.println("Created session: " + session);
197         Destination destination = p.createDestination(session, getSubject() + "." + getName());
198         System.out.println("Created destination: " + destination + " of type: " + destination.getClass());
199         producer = p.createProducer(session, destination);
200         System.out.println("Created producer: " + producer);
201         consumer = p.createConsumer(session, destination);
202         System.out.println("Created consumer: " + consumer);
203         connection.start();
204     }
205 
206     protected void tearDown() throws Exception {
207         //System.out.println("Test Done.  Stats");
208         //((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
209         System.out.println("Closing down connection");
210 
211         session.close();
212         connection.close();
213         System.out.println("Connection closed.");
214     }
215 
216     protected void setPrefetchToOne() {
217         ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection) connection).getPrefetchPolicy();
218         prefetchPolicy.setQueuePrefetch(1);
219         prefetchPolicy.setTopicPrefetch(1);
220         prefetchPolicy.setDurableTopicPrefetch(1);
221     }
222 }