View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.broker.Broker;
23  import org.codehaus.activemq.message.ActiveMQXid;
24  import org.codehaus.activemq.service.Transaction;
25  import org.codehaus.activemq.store.PreparedTransactionStore;
26  import org.codehaus.activemq.util.SerializationHelper;
27  
28  import javax.transaction.xa.XAException;
29  import javax.transaction.xa.XAResource;
30  import java.io.IOException;
31  import java.io.ObjectInput;
32  import java.io.ObjectOutput;
33  import java.util.Map;
34  
35  /***
36   * @version $Revision: 1.3 $
37   */
38  public class XATransactionCommand extends AbstractTransaction {
39      private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
40  
41      private ActiveMQXid xid;
42      private transient Map xaTxs;
43      private transient PreparedTransactionStore preparedTransactions;
44  
45      /***
46       * This constructor is only used when deserializing from disk
47       */
48      public XATransactionCommand() {
49          super(null);
50      }
51  
52      public XATransactionCommand(Broker broker, ActiveMQXid xid, Map xaTxs, PreparedTransactionStore preparedTransactions) {
53          super(broker);
54          this.xid = xid;
55          this.xaTxs = xaTxs;
56          this.preparedTransactions = preparedTransactions;
57      }
58  
59  
60      /***
61       * Called after the transaction command has been recovered from disk
62       *
63       * @param xaTxs
64       * @param preparedTransactions
65       */
66      public void initialise(Map xaTxs, PreparedTransactionStore preparedTransactions) {
67          this.xaTxs = xaTxs;
68          this.preparedTransactions = preparedTransactions;
69      }
70  
71      public void commit(boolean onePhase) throws XAException {
72          switch (getState()) {
73              case START_STATE:
74                  // 1 phase commit, no work done.
75                  if (!onePhase) {
76                      XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
77                      xae.errorCode = XAException.XAER_PROTO;
78                      throw xae;
79                  }
80                  setState(AbstractTransaction.FINISHED_STATE);
81                  xaTxs.remove(xid);
82                  break;
83              case IN_USE_STATE:
84                  if (!onePhase) {
85                      XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
86                      xae.errorCode = XAException.XAER_PROTO;
87                      throw xae;
88                  }
89                  // 1 phase commit, work done.
90                  try {
91                      prePrepare();
92                  }
93                  catch (XAException e) {
94                      throw e;
95                  }
96                  catch (Throwable e) {
97                      log.warn("COMMIT FAILED: ", e);
98                      rollback();
99  
100                     XAException xae = new XAException("ONE PHASE COMMIT FAILED: Transaction rolled back.");
101                     xae.errorCode = XAException.XA_RBOTHER;
102                     xae.initCause(e);
103                     throw xae;
104                 }
105 
106                 setState(AbstractTransaction.FINISHED_STATE);
107                 xaTxs.remove(xid);
108 
109                 try {
110                     postCommit();
111                 }
112                 catch (Throwable e) {
113                     // I guess this could happen.  Post commit task failed
114                     // to execute properly.
115                     log.warn("POST COMMIT FAILED: ", e);
116                     XAException xae = new XAException("POST COMMIT FAILED");
117                     xae.errorCode = XAException.XAER_RMERR;
118                     xae.initCause(e);
119                     throw xae;
120                 }
121                 break;
122 
123             case PREPARED_STATE:
124                 // 2 phase commit, work done.
125                 // We would record commit here.
126                 setState(AbstractTransaction.FINISHED_STATE);
127                 xaTxs.remove(xid);
128 
129                 try {
130                     postCommit();
131                 }
132                 catch (Throwable e) {
133                     // I guess this could happen.  Post commit task failed
134                     // to execute properly.
135                     log.warn("POST COMMIT FAILED: ", e);
136                     XAException xae = new XAException("POST COMMIT FAILED");
137                     xae.errorCode = XAException.XAER_RMERR;
138                     xae.initCause(e);
139                     throw xae;
140                 }
141                 break;
142             default:
143                 XAException xae = new XAException("Cannot call commit now.");
144                 xae.errorCode = XAException.XAER_PROTO;
145                 throw xae;
146         }
147     }
148 
149     public void rollback() throws XAException {
150 
151         switch (getState()) {
152             case START_STATE:
153                 // 1 phase rollback no work done.
154                 xaTxs.remove(xid);
155                 setState(AbstractTransaction.FINISHED_STATE);
156 
157                 break;
158             case IN_USE_STATE:
159                 // 1 phase rollback work done.
160                 xaTxs.remove(xid);
161                 setState(AbstractTransaction.FINISHED_STATE);
162 
163                 try {
164                     postRollback();
165                 }
166                 catch (Throwable e) {
167                     // I guess this could happen.  Post commit task failed
168                     // to execute properly.
169                     log.warn("POST ROLLBACK FAILED: ", e);
170                     XAException xae = new XAException("POST ROLLBACK FAILED");
171                     xae.errorCode = XAException.XAER_RMERR;
172                     xae.initCause(e);
173                     throw xae;
174                 }
175 
176                 break;
177             case PREPARED_STATE:
178                 // 2 phase rollback work done.
179                 preparedTransactions.remove(xid);
180                 xaTxs.remove(xid);
181                 setState(AbstractTransaction.FINISHED_STATE);
182 
183                 try {
184                     postRollback();
185                 }
186                 catch (Throwable e) {
187                     // I guess this could happen.  Post commit task failed
188                     // to execute properly.
189                     log.warn("POST ROLLBACK FAILED: ", e);
190                     XAException xae = new XAException("POST ROLLBACK FAILED");
191                     xae.errorCode = XAException.XAER_RMERR;
192                     xae.initCause(e);
193                     throw xae;
194                 }
195                 break;
196         }
197 
198     }
199 
200     public int prepare() throws XAException {
201         switch (getState()) {
202             case START_STATE:
203                 // No work done.. no commit/rollback needed.
204                 xaTxs.remove(xid);
205                 setState(AbstractTransaction.FINISHED_STATE);
206                 return XAResource.XA_RDONLY;
207 
208             case IN_USE_STATE:
209                 try {
210                     prePrepare();
211                 }
212                 catch (XAException e) {
213                     throw e;
214                 }
215                 catch (Throwable e) {
216                     log.warn("PREPARE FAILED: ", e);
217                     rollback();
218 
219                     // Let them know we rolled back.
220                     XAException xae = new XAException("PREPARE FAILED: Transaction rollback.");
221                     xae.errorCode = XAException.XA_RBOTHER;
222                     xae.initCause(e);
223                     throw xae;
224                 }
225 
226                 // We would record prepare here.
227                 setState(AbstractTransaction.PREPARED_STATE);
228                 preparedTransactions.put(xid, this);
229 
230                 return XAResource.XA_OK;
231             default :
232                 XAException xae = new XAException("Cannot call prepare now.");
233                 xae.errorCode = XAException.XAER_PROTO;
234                 throw xae;
235         }
236     }
237 
238     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
239         super.readExternal(in);
240         xid = ActiveMQXid.read(in);
241     }
242 
243     public void writeExternal(ObjectOutput out) throws IOException {
244         super.writeExternal(out);
245         xid.writeExternal(out);
246     }
247 
248     public static Transaction fromBytes(byte[] data) throws IOException, ClassNotFoundException {
249         return (Transaction) SerializationHelper.deserialize(data);
250     }
251 
252     public byte[] toBytes() throws IOException {
253         return SerializationHelper.serialize(this);
254     }
255 }