1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.broker.Broker;
24 import org.codehaus.activemq.broker.BrokerClient;
25 import org.codehaus.activemq.message.ActiveMQXid;
26 import org.codehaus.activemq.service.Transaction;
27 import org.codehaus.activemq.service.TransactionManager;
28 import org.codehaus.activemq.store.PreparedTransactionStore;
29 import org.codehaus.activemq.util.JMSExceptionHelper;
30
31 import javax.jms.JMSException;
32 import javax.transaction.xa.XAException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36
37 /***
38 * @version $Revision: 1.5 $
39 */
40 public class TransactionManagerImpl implements TransactionManager {
41 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
42
43
44 private Broker broker;
45
46 private PreparedTransactionStore preparedTransactions;
47
48 private Map activeClients = new ConcurrentHashMap();
49
50 private Map localTxs = new ConcurrentHashMap();
51
52 private Map xaTxs = new ConcurrentHashMap();
53
54 private final ThreadLocal contextTx = new ThreadLocal();
55
56 public TransactionManagerImpl(Broker broker, PreparedTransactionStore preparedTransactions) {
57 this.preparedTransactions = preparedTransactions;
58 this.broker = broker;
59 }
60
61 /***
62 * @see org.codehaus.activemq.service.TransactionManager#createLocalTransaction(org.codehaus.activemq.broker.BrokerClient, String)
63 */
64 public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
65 AbstractTransaction t = new LocalTransactionCommand(broker, localTxs, txid);
66 localTxs.put(txid, t);
67 return t;
68 }
69
70 /***
71 * @see org.codehaus.activemq.service.TransactionManager#createXATransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
72 */
73 public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
74 AbstractTransaction t = new XATransactionCommand(broker, xid, xaTxs, preparedTransactions);
75 xaTxs.put(xid, t);
76 return t;
77 }
78
79 /***
80 * @see org.codehaus.activemq.service.TransactionManager#getLocalTransaction(String)
81 */
82 public Transaction getLocalTransaction(String txid) throws JMSException {
83 Transaction tx = (Transaction) localTxs.get(txid);
84 if (tx == null) {
85 throw new JMSException("Transaction '" + txid
86 + "' has not been started.");
87 }
88 return tx;
89 }
90
91 /***
92 * @see org.codehaus.activemq.service.TransactionManager#getXATransaction(org.codehaus.activemq.message.ActiveMQXid)
93 */
94 public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
95 Transaction tx = (Transaction) xaTxs.get(xid);
96 if (tx == null) {
97 XAException e = new XAException("Transaction '" + xid + "' has not been started.");
98 e.errorCode = XAException.XAER_NOTA;
99 throw e;
100 }
101 return tx;
102 }
103
104 /***
105 * @see org.codehaus.activemq.service.TransactionManager#getPreparedXATransactions()
106 */
107 public ActiveMQXid[] getPreparedXATransactions() throws XAException {
108 return preparedTransactions.getXids();
109 }
110
111 /***
112 * @see org.codehaus.activemq.service.TransactionManager#setContexTransaction(org.codehaus.activemq.service.Transaction)
113 */
114 public void setContexTransaction(Transaction tx) {
115 contextTx.set(tx);
116 }
117
118 /***
119 * @see org.codehaus.activemq.service.TransactionManager#getContexTransaction()
120 */
121 public Transaction getContexTransaction() {
122 return (Transaction) contextTx.get();
123 }
124
125 /***
126 * @see org.codehaus.activemq.service.TransactionManager#cleanUpClient(org.codehaus.activemq.broker.BrokerClient)
127 */
128 public void cleanUpClient(BrokerClient client) throws JMSException {
129
130
131
132 List list = (List) activeClients.remove(client);
133 if (list != null) {
134 for (int i = 0; i < list.size(); i++) {
135 try {
136 Object o = list.get(i);
137 if (o instanceof String) {
138 Transaction t = this.getLocalTransaction((String) o);
139 t.rollback();
140 }
141 else {
142 Transaction t = this.getXATransaction((ActiveMQXid) o);
143 t.rollback();
144 }
145 }
146 catch (Exception e) {
147 log.warn("ERROR Rolling back disconnected client's transactions: ", e);
148 }
149 }
150 list.clear();
151 }
152 }
153
154 public void loadTransaction(ActiveMQXid xid, Transaction transaction) throws XAException {
155
156
157 if (transaction instanceof XATransactionCommand) {
158 XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
159 xaTransaction.initialise(xaTxs, preparedTransactions);
160 }
161 transaction.setBroker(broker);
162
163 xaTxs.put(xid, transaction);
164 }
165
166 public void start() throws JMSException {
167 preparedTransactions.start();
168 try {
169 preparedTransactions.loadPreparedTransactions(this);
170 }
171 catch (XAException e) {
172 throw JMSExceptionHelper.newJMSException("Failed to recover: " + e, e);
173 }
174 }
175
176 public void stop() throws JMSException {
177 preparedTransactions.stop();
178 }
179
180
181
182
183
184
185 private void addActiveTransaction(BrokerClient client, Object transactionId) {
186 List list = (List) activeClients.get(client);
187 if (list == null) {
188 list = new ArrayList();
189 activeClients.put(client, list);
190 }
191 list.add(transactionId);
192 }
193
194 private void removeActiveTransaction(BrokerClient client, Object transactionId) {
195 List list = (List) activeClients.get(client);
196 if (list != null) {
197 list.remove(transactionId);
198 }
199 }
200
201
202 }