1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.ra;
19
20 import java.io.PrintWriter;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23
24 import javax.jms.Connection;
25 import javax.jms.JMSException;
26 import javax.jms.Session;
27 import javax.jms.XASession;
28 import javax.resource.ResourceException;
29 import javax.resource.spi.ConnectionEvent;
30 import javax.resource.spi.ConnectionEventListener;
31 import javax.resource.spi.ConnectionRequestInfo;
32 import javax.resource.spi.LocalTransaction;
33 import javax.resource.spi.ManagedConnection;
34 import javax.resource.spi.ManagedConnectionMetaData;
35 import javax.security.auth.Subject;
36 import javax.transaction.xa.XAException;
37 import javax.transaction.xa.XAResource;
38 import javax.transaction.xa.Xid;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.codehaus.activemq.ActiveMQSession;
43
44 /***
45 * ActiveMQManagedConnection maps to real physical connection to the
46 * server. Since a ManagedConnection has to provide a transaction
47 * managment interface to the physical connection, and sessions
48 * are the objects implement transaction managment interfaces in
49 * the JMS API, this object also maps to a singe physical JMS session.
50 * <p/>
51 * The side-effect is that JMS connection the application gets
52 * will allways create the same session object. This is good if
53 * running in an app server since the sessions are elisted in the
54 * context transaction. This is bad if used outside of an app
55 * server since the user may be trying to create 2 different
56 * sessions to coordinate 2 different uow.
57 *
58 * @version $Revision: 1.14 $
59 */
60 public class ActiveMQManagedConnection implements ManagedConnection {
61
62 private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
63
64 private PrintWriter logWriter;
65
66 private Subject subject;
67 private ActiveMQConnectionRequestInfo info;
68 private ArrayList listeners = new ArrayList();
69 private Connection physicalConnection;
70 private Session physicalSession;
71 private ArrayList proxyConnections = new ArrayList();
72 private XAResource xaresource=null;
73
74 public Connection getPhysicalConnection() {
75 return physicalConnection;
76 }
77
78 public Session getPhysicalSession() {
79 return physicalSession;
80 }
81
82
83 public ActiveMQManagedConnection(Subject subject, ActiveMQResourceAdapter adapter, ActiveMQConnectionRequestInfo info) throws ResourceException {
84 this.subject = subject;
85 this.info = info;
86 physicalConnection = adapter.getPhysicalConnection();
87 createSession();
88 }
89
90 private void createSession() throws ResourceException {
91 try {
92 physicalSession = physicalConnection
93 .createSession(true, Session.SESSION_TRANSACTED);
94 if (physicalSession instanceof ActiveMQSession) {
95 ActiveMQSession session = (ActiveMQSession) physicalSession;
96 LocalTransactionEventListener l = createLocalTransactionEventListener();
97 session.setLocalTransactionEventListener(l);
98 }
99 else {
100 log.trace("Cannot register LocalTransactionEventLister on non-ActiveMQ session");
101 }
102
103 if (physicalSession instanceof XASession) {
104 xaresource = ((XASession)physicalSession).getXAResource();
105 } else {
106 xaresource=null;
107 }
108
109 }
110 catch (JMSException e) {
111 throw new ResourceException("Could not create a new session.", e);
112 }
113 }
114
115 /***
116 * @return
117 */
118 private LocalTransactionEventListener createLocalTransactionEventListener() {
119 return new LocalTransactionEventListener() {
120
121 public void beginEvent() {
122 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
123 Iterator iterator = listeners.iterator();
124 while (iterator.hasNext()) {
125 ConnectionEventListener l = (ConnectionEventListener) iterator
126 .next();
127 l.localTransactionStarted(event);
128 }
129 }
130
131 public void commitEvent() {
132 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
133 Iterator iterator = listeners.iterator();
134 while (iterator.hasNext()) {
135 ConnectionEventListener l = (ConnectionEventListener) iterator
136 .next();
137 l.localTransactionCommitted(event);
138 }
139 }
140
141 public void rollbackEvent() {
142 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
143 Iterator iterator = listeners.iterator();
144 while (iterator.hasNext()) {
145 ConnectionEventListener l = (ConnectionEventListener) iterator
146 .next();
147 l.localTransactionRolledback(event);
148 }
149 }
150 };
151 }
152
153 /***
154 * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
155 * javax.resource.spi.ConnectionRequestInfo)
156 */
157 public Object getConnection(Subject subject, ConnectionRequestInfo info)
158 throws ResourceException {
159 JMSConnectionProxy proxy = new JMSConnectionProxy(this);
160 proxyConnections.add(proxy);
161 return proxy;
162 }
163
164 private boolean isDestroyed() {
165 return physicalConnection == null;
166 }
167
168 /***
169 * Close down the physical connection to the server.
170 *
171 * @see javax.resource.spi.ManagedConnection#destroy()
172 */
173 public void destroy() throws ResourceException {
174
175
176 if (isDestroyed()) {
177 return;
178 }
179
180 cleanup();
181
182 try {
183 physicalSession.close();
184 physicalConnection = null;
185 }
186 catch (JMSException e) {
187 log.info("Error occured during close of a JMS connection.", e);
188 }
189 }
190
191 /***
192 * Cleans up all proxy handles attached to this physical connection so that
193 * they cannot be used anymore.
194 *
195 * @see javax.resource.spi.ManagedConnection#cleanup()
196 */
197 public void cleanup() throws ResourceException {
198
199
200 if (isDestroyed()) {
201 return;
202 }
203
204 Iterator iterator = proxyConnections.iterator();
205 while (iterator.hasNext()) {
206 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
207 proxy.cleanup();
208 iterator.remove();
209 }
210
211
212
213 try {
214 physicalSession.close();
215 physicalSession=null;
216 } catch (JMSException e) {
217 throw new ResourceException("Could close the JMS session.", e);
218 }
219
220
221 createSession();
222 }
223
224 /***
225 * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
226 */
227 public void associateConnection(Object connection) throws ResourceException {
228 throw new ResourceException("Not supported.");
229 }
230
231 /***
232 * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
233 */
234 public void addConnectionEventListener(ConnectionEventListener listener) {
235 listeners.add(listener);
236 }
237
238 /***
239 * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
240 */
241 public void removeConnectionEventListener(ConnectionEventListener listener) {
242 listeners.remove(listener);
243 }
244
245 /***
246 * @see javax.resource.spi.ManagedConnection#getXAResource()
247 */
248 public XAResource getXAResource() throws ResourceException {
249 if( xaresource == null )
250 throw new ResourceException("This is not an XA connection.");
251
252
253
254 return new XAResource() {
255 public void commit(Xid arg0, boolean arg1) throws XAException {
256 xaresource.commit(arg0, arg1);
257 }
258 public void end(Xid arg0, int arg1) throws XAException {
259 xaresource.end(arg0, arg1);
260 }
261 public void forget(Xid arg0) throws XAException {
262 xaresource.forget(arg0);
263 }
264 public int getTransactionTimeout() throws XAException {
265 return xaresource.getTransactionTimeout();
266 }
267 public boolean isSameRM(XAResource arg0) throws XAException {
268 return xaresource.isSameRM(arg0);
269 }
270 public int prepare(Xid arg0) throws XAException {
271 return xaresource.prepare(arg0);
272 }
273 public Xid[] recover(int arg0) throws XAException {
274 return xaresource.recover(arg0);
275 }
276 public void rollback(Xid arg0) throws XAException {
277 xaresource.rollback(arg0);
278 }
279 public boolean setTransactionTimeout(int arg0) throws XAException {
280 return xaresource.setTransactionTimeout(arg0);
281 }
282 public void start(Xid arg0, int arg1) throws XAException {
283 xaresource.start(arg0, arg1);
284 }
285 };
286 }
287
288 /***
289 * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
290 */
291 public LocalTransaction getLocalTransaction() throws ResourceException {
292 return new LocalTransaction() {
293
294 public void begin() {
295
296
297 }
298
299 public void commit() throws ResourceException {
300 try {
301 physicalSession.commit();
302 }
303 catch (JMSException e) {
304 throw new ResourceException("commit failed.", e);
305 }
306 }
307
308 public void rollback() throws ResourceException {
309 try {
310 physicalSession.rollback();
311 }
312 catch (JMSException e) {
313 throw new ResourceException("rollback failed.", e);
314 }
315 }
316 };
317 }
318
319 /***
320 * @see javax.resource.spi.ManagedConnection#getMetaData()
321 */
322 public ManagedConnectionMetaData getMetaData() throws ResourceException {
323 return new ManagedConnectionMetaData() {
324
325 public String getEISProductName() throws ResourceException {
326 if (physicalConnection == null) {
327 throw new ResourceException("Not connected.");
328 }
329 try {
330 return physicalConnection.getMetaData()
331 .getJMSProviderName();
332 }
333 catch (JMSException e) {
334 throw new ResourceException("Error accessing provider.", e);
335 }
336 }
337
338 public String getEISProductVersion() throws ResourceException {
339 if (physicalConnection == null) {
340 throw new ResourceException("Not connected.");
341 }
342 try {
343 return physicalConnection.getMetaData()
344 .getProviderVersion();
345 }
346 catch (JMSException e) {
347 throw new ResourceException("Error accessing provider.", e);
348 }
349 }
350
351 public int getMaxConnections() throws ResourceException {
352 if (physicalConnection == null) {
353 throw new ResourceException("Not connected.");
354 }
355 return Integer.MAX_VALUE;
356 }
357
358 public String getUserName() throws ResourceException {
359 if (physicalConnection == null) {
360 throw new ResourceException("Not connected.");
361 }
362 try {
363 return physicalConnection.getClientID();
364 }
365 catch (JMSException e) {
366 throw new ResourceException("Error accessing provider.", e);
367 }
368 }
369 };
370 }
371
372 /***
373 * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
374 */
375 public void setLogWriter(PrintWriter logWriter) throws ResourceException {
376 this.logWriter = logWriter;
377 }
378
379 /***
380 * @see javax.resource.spi.ManagedConnection#getLogWriter()
381 */
382 public PrintWriter getLogWriter() throws ResourceException {
383 return logWriter;
384 }
385
386 /***
387 * @param subject
388 * @param info
389 * @return
390 */
391 public boolean matches(Subject subject, ConnectionRequestInfo info) {
392
393
394 if (info == null) {
395 return false;
396 }
397 if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
398 return false;
399 }
400
401
402 if (subject == null ^ this.subject == null) {
403 return false;
404 }
405 if (subject != null && !subject.equals(this.subject)) {
406 return false;
407 }
408
409
410 return info.equals(this.info);
411 }
412
413 /***
414 * When a proxy is closed this cleans up the proxy and notifys the
415 * ConnectionEventListeners that a connection closed.
416 *
417 * @param proxy
418 */
419 public void proxyClosedEvent(JMSConnectionProxy proxy) {
420 proxyConnections.remove(proxy);
421 proxy.cleanup();
422
423 ConnectionEvent event = new ConnectionEvent(this,
424 ConnectionEvent.CONNECTION_CLOSED);
425 event.setConnectionHandle(proxy);
426 Iterator iterator = listeners.iterator();
427 while (iterator.hasNext()) {
428 ConnectionEventListener l = (ConnectionEventListener) iterator
429 .next();
430 l.connectionClosed(event);
431 }
432 }
433
434 }