1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import org.codehaus.activemq.message.XidStub;
21 import org.codehaus.activemq.util.IndentPrinter;
22
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.Destination;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageConsumer;
29 import javax.jms.MessageProducer;
30 import javax.jms.Session;
31 import javax.jms.XASession;
32 import javax.transaction.xa.XAResource;
33 import java.util.ArrayList;
34
35 /***
36 * @version $Revision: 1.7 $
37 */
38 public abstract class JmsXATransactionTestSupport extends TestSupport {
39
40 protected ConnectionFactory connectionFactory;
41 protected Connection connection;
42 protected Session session;
43 protected MessageConsumer consumer;
44 protected MessageProducer producer;
45
46 public JmsXATransactionTestSupport() {
47 super();
48 }
49
50 public JmsXATransactionTestSupport(String name) {
51 super(name);
52 }
53
54 public void testSendOutsideXATransaction() throws Exception {
55 try {
56 producer.send(session.createTextMessage("First Message"));
57 fail("Using an XA session outside of a transaction should throw an exception.");
58 }
59 catch (JMSException e) {
60
61 }
62 }
63
64
65 public void testSendRollback() throws Exception {
66
67 XAResource resource = ((XASession) session).getXAResource();
68 Message[] outbound = new Message[]{
69 session.createTextMessage("First Message"),
70 session.createTextMessage("Second Message")
71 };
72
73
74 XidStub xid1 = new XidStub(new byte[]{1, 2, 3, 4, 5});
75 resource.start(xid1, XAResource.TMNOFLAGS);
76 producer.send(outbound[0]);
77 resource.end(xid1, XAResource.TMSUCCESS);
78 resource.commit(xid1, true);
79
80 XidStub xid2 = new XidStub(new byte[]{2, 2, 3, 4, 5});
81 resource.start(xid2, XAResource.TMNOFLAGS);
82 producer.send(session.createTextMessage("I'm going to get rolled back."));
83 resource.end(xid2, XAResource.TMSUCCESS);
84 resource.rollback(xid2);
85
86 XidStub xid3 = new XidStub(new byte[]{3, 2, 3, 4, 5});
87 resource.start(xid3, XAResource.TMNOFLAGS);
88 producer.send(outbound[1]);
89 resource.end(xid3, XAResource.TMSUCCESS);
90 if (resource.prepare(xid3) == XAResource.XA_OK) {
91 resource.commit(xid3, false);
92 }
93
94 ArrayList messages = new ArrayList();
95 XidStub xid4 = new XidStub(new byte[]{4, 2, 3, 4, 5});
96 resource.start(xid4, XAResource.TMNOFLAGS);
97 messages.add(consumer.receive(1000));
98 messages.add(consumer.receive(1000));
99 resource.end(xid4, XAResource.TMSUCCESS);
100 resource.commit(xid4, true);
101
102 Message inbound[] = new Message[messages.size()];
103 messages.toArray(inbound);
104
105 assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
106 }
107
108 public void testSendPrepareRecoverRollback() throws Exception {
109
110 XAResource resource = ((XASession) session).getXAResource();
111 Message[] outbound = new Message[]{
112 session.createTextMessage("First Message"),
113 session.createTextMessage("Second Message")
114 };
115
116 XidStub xid2 = new XidStub(new byte[]{1, 2, 3, 4, 6});
117 resource.start(xid2, XAResource.TMNOFLAGS);
118 producer.send(session.createTextMessage("I'm going to get rolled back."));
119 resource.end(xid2, XAResource.TMSUCCESS);
120 assertEquals(XAResource.XA_OK, resource.prepare(xid2));
121
122
123 XidStub xid3 = new XidStub(new byte[]{2, 2, 3, 4, 6});
124 resource.start(xid3, XAResource.TMNOFLAGS);
125 producer.send(outbound[1]);
126 resource.end(xid3, XAResource.TMSUCCESS);
127
128 assertEquals(XAResource.XA_OK, resource.prepare(xid3));
129
130 restart();
131
132 resource = ((XASession) session).getXAResource();
133
134
135 resource.commit(xid3, false);
136
137
138 resource.rollback(xid2);
139
140 }
141
142
143 abstract protected JmsResourceProvider getJmsResourceProvider();
144
145 protected void setUp() throws Exception {
146 super.setUp();
147 openConnection();
148 }
149
150 protected void restart() throws JMSException {
151 closeConnection();
152 openConnection();
153 }
154
155 protected void openConnection() throws JMSException {
156 JmsResourceProvider p = getJmsResourceProvider();
157
158 connectionFactory = p.createConnectionFactory();
159 connection = p.createConnection(connectionFactory);
160 System.out.println("Created connection: " + connection);
161 session = p.createSession(connection);
162 System.out.println("Created session: " + session);
163 Destination destination = p.createDestination(session, "FOO");
164 System.out.println("Created destination: " + destination + " of type: " + destination.getClass());
165 producer = p.createProducer(session, destination);
166 System.out.println("Created producer: " + producer);
167 consumer = p.createConsumer(session, destination);
168 System.out.println("Created consumer: " + consumer);
169 connection.start();
170 }
171
172 protected void tearDown() throws Exception {
173 System.out.println("Test Done. Stats");
174 ((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
175 closeConnection();
176 }
177
178 private void closeConnection() throws JMSException {
179 System.out.println("Closing down connection");
180
181 connection.stop();
182 connection.close();
183 System.out.println("Connection closed.");
184 }
185
186 }