View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.ra;
19  
20  import java.io.PrintWriter;
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  
24  import javax.jms.Connection;
25  import javax.jms.JMSException;
26  import javax.jms.Session;
27  import javax.jms.XASession;
28  import javax.resource.ResourceException;
29  import javax.resource.spi.ConnectionEvent;
30  import javax.resource.spi.ConnectionEventListener;
31  import javax.resource.spi.ConnectionRequestInfo;
32  import javax.resource.spi.LocalTransaction;
33  import javax.resource.spi.ManagedConnection;
34  import javax.resource.spi.ManagedConnectionMetaData;
35  import javax.security.auth.Subject;
36  import javax.transaction.xa.XAException;
37  import javax.transaction.xa.XAResource;
38  import javax.transaction.xa.Xid;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.codehaus.activemq.ActiveMQSession;
43  
44  /***
45   * ActiveMQManagedConnection maps to real physical connection to the
46   * server.  Since a ManagedConnection has to provide a transaction
47   * managment interface to the physical connection, and sessions
48   * are the objects implement transaction managment interfaces in
49   * the JMS API, this object also maps to a singe physical JMS session.
50   * <p/>
51   * The side-effect is that JMS connection the application gets
52   * will allways create the same session object.  This is good if
53   * running in an app server since the sessions are elisted in the
54   * context transaction.  This is bad if used outside of an app
55   * server since the user may be trying to create 2 different
56   * sessions to coordinate 2 different uow.
57   *
58   * @version $Revision: 1.14 $
59   */
60  public class ActiveMQManagedConnection implements ManagedConnection {
61  
62      private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
63  
64      private PrintWriter logWriter;
65  
66      private Subject subject;
67      private ActiveMQConnectionRequestInfo info;
68      private ArrayList listeners = new ArrayList();
69      private Connection physicalConnection;
70      private Session physicalSession;
71      private ArrayList proxyConnections = new ArrayList();
72  	private XAResource xaresource=null;
73  
74      public Connection getPhysicalConnection() {
75          return physicalConnection;
76      }
77  
78      public Session getPhysicalSession() {
79          return physicalSession;
80      }
81  
82  
83      public ActiveMQManagedConnection(Subject subject, ActiveMQResourceAdapter adapter, ActiveMQConnectionRequestInfo info) throws ResourceException {
84          this.subject = subject;
85  		this.info = info;
86          physicalConnection = adapter.getPhysicalConnection();
87          createSession();
88      }
89  
90      private void createSession() throws ResourceException {
91          try {
92              physicalSession = physicalConnection
93                      .createSession(true, Session.SESSION_TRANSACTED);
94              if (physicalSession instanceof ActiveMQSession) {
95                  ActiveMQSession session = (ActiveMQSession) physicalSession;
96                  LocalTransactionEventListener l = createLocalTransactionEventListener();
97                  session.setLocalTransactionEventListener(l);
98              }
99              else {
100                 log.trace("Cannot register LocalTransactionEventLister on non-ActiveMQ session");
101             }
102             
103             if (physicalSession instanceof XASession) {
104                 xaresource = ((XASession)physicalSession).getXAResource();
105             } else {
106             	xaresource=null;
107             }
108             
109         }
110         catch (JMSException e) {
111             throw new ResourceException("Could not create a new session.", e);
112         }    	
113     }
114     
115     /***
116      * @return
117      */
118     private LocalTransactionEventListener createLocalTransactionEventListener() {
119         return new LocalTransactionEventListener() {
120 
121             public void beginEvent() {
122                 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
123                 Iterator iterator = listeners.iterator();
124                 while (iterator.hasNext()) {
125                     ConnectionEventListener l = (ConnectionEventListener) iterator
126                             .next();
127                     l.localTransactionStarted(event);
128                 }
129             }
130 
131             public void commitEvent() {
132                 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
133                 Iterator iterator = listeners.iterator();
134                 while (iterator.hasNext()) {
135                     ConnectionEventListener l = (ConnectionEventListener) iterator
136                             .next();
137                     l.localTransactionCommitted(event);
138                 }
139             }
140 
141             public void rollbackEvent() {
142                 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
143                 Iterator iterator = listeners.iterator();
144                 while (iterator.hasNext()) {
145                     ConnectionEventListener l = (ConnectionEventListener) iterator
146                             .next();
147                     l.localTransactionRolledback(event);
148                 }
149             }
150         };
151     }
152 
153     /***
154      * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
155             *      javax.resource.spi.ConnectionRequestInfo)
156      */
157     public Object getConnection(Subject subject, ConnectionRequestInfo info)
158             throws ResourceException {
159         JMSConnectionProxy proxy = new JMSConnectionProxy(this);
160         proxyConnections.add(proxy);
161         return proxy;
162     }
163 
164     private boolean isDestroyed() {
165     	return physicalConnection == null;
166     }
167     
168     /***
169      * Close down the physical connection to the server.
170      *
171      * @see javax.resource.spi.ManagedConnection#destroy()
172      */
173     public void destroy() throws ResourceException {
174 
175         // Have we allready been destroyed??
176         if (isDestroyed()) {
177             return;
178         }
179 
180         cleanup();
181 
182         try {
183             physicalSession.close();
184             physicalConnection = null;
185         }
186         catch (JMSException e) {
187             log.info("Error occured during close of a JMS connection.", e);
188         }
189     }
190 
191     /***
192      * Cleans up all proxy handles attached to this physical connection so that
193      * they cannot be used anymore.
194      *
195      * @see javax.resource.spi.ManagedConnection#cleanup()
196      */
197     public void cleanup() throws ResourceException {
198     	
199         // Have we allready been destroyed??
200         if (isDestroyed()) {
201             return;
202         }
203     	
204         Iterator iterator = proxyConnections.iterator();
205         while (iterator.hasNext()) {
206             JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
207             proxy.cleanup();
208             iterator.remove();
209         }
210         
211         // closing and creating the session is sure way to clean up all state  
212         // the client may have left around when using the session.
213         try {        	
214         	physicalSession.close();
215         	physicalSession=null;
216         } catch (JMSException e) {
217             throw new ResourceException("Could close the JMS session.", e);
218         }
219         
220         // Create the new session.
221         createSession();
222     }
223 
224     /***
225      * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
226      */
227     public void associateConnection(Object connection) throws ResourceException {
228         throw new ResourceException("Not supported.");
229     }
230 
231     /***
232      * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
233      */
234     public void addConnectionEventListener(ConnectionEventListener listener) {
235         listeners.add(listener);
236     }
237 
238     /***
239      * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
240      */
241     public void removeConnectionEventListener(ConnectionEventListener listener) {
242         listeners.remove(listener);
243     }
244 
245     /***
246      * @see javax.resource.spi.ManagedConnection#getXAResource()
247      */
248     public XAResource getXAResource() throws ResourceException {
249     	if( xaresource == null )
250     		throw new ResourceException("This is not an XA connection.");
251     	
252     	// We proxy the XAResource because the XAResource object chanages
253     	// every time the managed connection is cleaned up.
254     	return new XAResource() {
255     		public void commit(Xid arg0, boolean arg1) throws XAException {
256     			xaresource.commit(arg0, arg1);
257     		}
258     		public void end(Xid arg0, int arg1) throws XAException {
259     			xaresource.end(arg0, arg1);
260     		}
261     		public void forget(Xid arg0) throws XAException {
262     			xaresource.forget(arg0);
263     		}
264     		public int getTransactionTimeout() throws XAException {
265     			return xaresource.getTransactionTimeout();
266     		}
267     		public boolean isSameRM(XAResource arg0) throws XAException {
268     			return xaresource.isSameRM(arg0);
269     		}
270     		public int prepare(Xid arg0) throws XAException {
271     			return xaresource.prepare(arg0);
272     		}
273     		public Xid[] recover(int arg0) throws XAException {
274     			return xaresource.recover(arg0);
275     		}
276     		public void rollback(Xid arg0) throws XAException {
277     			xaresource.rollback(arg0);
278     		}
279     		public boolean setTransactionTimeout(int arg0) throws XAException {
280     			return xaresource.setTransactionTimeout(arg0);
281     		}
282     		public void start(Xid arg0, int arg1) throws XAException {
283     			xaresource.start(arg0, arg1);
284     		}
285     	};
286     }
287 
288     /***
289      * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
290      */
291     public LocalTransaction getLocalTransaction() throws ResourceException {
292         return new LocalTransaction() {
293 
294             public void begin() {
295                 // TODO: jms api does not have a begin...
296                 // add a method to ActiveMQSession to allow for this.
297             }
298 
299             public void commit() throws ResourceException {
300                 try {
301                     physicalSession.commit();
302                 }
303                 catch (JMSException e) {
304                     throw new ResourceException("commit failed.", e);
305                 }
306             }
307 
308             public void rollback() throws ResourceException {
309                 try {
310                     physicalSession.rollback();
311                 }
312                 catch (JMSException e) {
313                     throw new ResourceException("rollback failed.", e);
314                 }
315             }
316         };
317     }
318 
319     /***
320      * @see javax.resource.spi.ManagedConnection#getMetaData()
321      */
322     public ManagedConnectionMetaData getMetaData() throws ResourceException {
323         return new ManagedConnectionMetaData() {
324 
325             public String getEISProductName() throws ResourceException {
326                 if (physicalConnection == null) {
327                     throw new ResourceException("Not connected.");
328                 }
329                 try {
330                     return physicalConnection.getMetaData()
331                             .getJMSProviderName();
332                 }
333                 catch (JMSException e) {
334                     throw new ResourceException("Error accessing provider.", e);
335                 }
336             }
337 
338             public String getEISProductVersion() throws ResourceException {
339                 if (physicalConnection == null) {
340                     throw new ResourceException("Not connected.");
341                 }
342                 try {
343                     return physicalConnection.getMetaData()
344                             .getProviderVersion();
345                 }
346                 catch (JMSException e) {
347                     throw new ResourceException("Error accessing provider.", e);
348                 }
349             }
350 
351             public int getMaxConnections() throws ResourceException {
352                 if (physicalConnection == null) {
353                     throw new ResourceException("Not connected.");
354                 }
355                 return Integer.MAX_VALUE;
356             }
357 
358             public String getUserName() throws ResourceException {
359                 if (physicalConnection == null) {
360                     throw new ResourceException("Not connected.");
361                 }
362                 try {
363                     return physicalConnection.getClientID();
364                 }
365                 catch (JMSException e) {
366                     throw new ResourceException("Error accessing provider.", e);
367                 }
368             }
369         };
370     }
371 
372     /***
373      * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
374      */
375     public void setLogWriter(PrintWriter logWriter) throws ResourceException {
376         this.logWriter = logWriter;
377     }
378 
379     /***
380      * @see javax.resource.spi.ManagedConnection#getLogWriter()
381      */
382     public PrintWriter getLogWriter() throws ResourceException {
383         return logWriter;
384     }
385 
386     /***
387      * @param subject
388      * @param info
389      * @return
390      */
391     public boolean matches(Subject subject, ConnectionRequestInfo info) {
392 
393         // Check to see if it is our info class
394         if (info == null) {
395             return false;
396         }
397         if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
398             return false;
399         }
400 
401         // Do the subjects match?
402         if (subject == null ^ this.subject == null) {
403             return false;
404         }
405         if (subject != null && !subject.equals(this.subject)) {
406             return false;
407         }
408 
409         // Does the info match?
410         return info.equals(this.info);
411     }
412 
413     /***
414      * When a proxy is closed this cleans up the proxy and notifys the
415      * ConnectionEventListeners that a connection closed.
416      *
417      * @param proxy
418      */
419     public void proxyClosedEvent(JMSConnectionProxy proxy) {
420         proxyConnections.remove(proxy);
421         proxy.cleanup();
422 
423         ConnectionEvent event = new ConnectionEvent(this,
424                 ConnectionEvent.CONNECTION_CLOSED);
425         event.setConnectionHandle(proxy);
426         Iterator iterator = listeners.iterator();
427         while (iterator.hasNext()) {
428             ConnectionEventListener l = (ConnectionEventListener) iterator
429                     .next();
430             l.connectionClosed(event);
431         }
432     }
433 
434 }