1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.ra;
19
20 import java.util.HashMap;
21
22 import javax.jms.Connection;
23 import javax.jms.ConnectionFactory;
24 import javax.jms.JMSException;
25 import javax.jms.XAConnection;
26 import javax.jms.XASession;
27 import javax.resource.NotSupportedException;
28 import javax.resource.ResourceException;
29 import javax.resource.spi.ActivationSpec;
30 import javax.resource.spi.BootstrapContext;
31 import javax.resource.spi.ResourceAdapter;
32 import javax.resource.spi.ResourceAdapterInternalException;
33 import javax.resource.spi.endpoint.MessageEndpointFactory;
34 import javax.transaction.xa.XAResource;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.codehaus.activemq.ActiveMQXAConnectionFactory;
39
40
41 /***
42 * Knows how to connect to one ActiveMQ server. It can then activate endpoints and deliver
43 * messages to those enpoints using the connection configure in the resource adapter.
44 *
45 * @version $Revision: 1.11 $
46 */
47 public class ActiveMQResourceAdapter implements ResourceAdapter {
48 private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
49
50 private static final String ASF_ENDPOINT_WORKER_TYPE="asf";
51 private static final String POLLING_ENDPOINT_WORKER_TYPE="polling";
52
53 private BootstrapContext bootstrapContext;
54 private HashMap endpointWorkers = new HashMap();
55 final private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
56 private String endpointWorkerType=POLLING_ENDPOINT_WORKER_TYPE;
57 private Connection physicalConnection;
58 private ConnectionFactory connectionFactory;
59
60 public ActiveMQResourceAdapter() {
61 this(null);
62 }
63
64 public ActiveMQResourceAdapter(ConnectionFactory connectionFactory) {
65 this.connectionFactory = connectionFactory;
66 }
67
68 /***
69 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
70 */
71 public void start(BootstrapContext bootstrapContext)
72 throws ResourceAdapterInternalException {
73 this.bootstrapContext = bootstrapContext;
74
75 try {
76 physicalConnection = makeConnection(connectionFactory, info);
77 physicalConnection.start();
78 }
79 catch (JMSException e) {
80 throw new ResourceAdapterInternalException("Could not establish a connection to the server.", e);
81 }
82
83 }
84
85 /***
86 * A helper method to create a new JMS connection from the connection request info.
87 * If no specific connection factory instance is passed in then the default ActiveMQ
88 * implementation is used
89 *
90 * @param connectionFactory an optional connection factory to use or null to use the default
91 * @param info the connection request info
92 * @return a newly created connection
93 * @throws JMSException
94 */
95 public static Connection makeConnection(ConnectionFactory connectionFactory, ActiveMQConnectionRequestInfo info) throws JMSException {
96 if (connectionFactory == null) {
97 if (info.isXa()) {
98 connectionFactory = new ActiveMQXAConnectionFactory(info.getServerUrl());
99 }
100 else {
101 connectionFactory = new org.codehaus.activemq.ActiveMQConnectionFactory(info.getServerUrl());
102 }
103 }
104 Connection physicalConnection = connectionFactory.createConnection(info.getUserName(), info.getPassword());
105 if (info.getClientid() != null) {
106 physicalConnection.setClientID(info.getClientid());
107 }
108 return physicalConnection;
109 }
110
111 /***
112 * @return Returns the physicalConnection.
113 */
114 public Connection getPhysicalConnection() {
115 return physicalConnection;
116 }
117
118 /***
119 * @see javax.resource.spi.ResourceAdapter#stop()
120 */
121 public void stop() {
122 this.bootstrapContext = null;
123 if (physicalConnection != null) {
124 try {
125 physicalConnection.close();
126 physicalConnection = null;
127 }
128 catch (JMSException e) {
129 log.debug("Error occured during ResourceAdapter stop: ", e);
130 }
131 }
132 }
133
134 /***
135 * @return
136 */
137 public BootstrapContext getBootstrapContext() {
138 return bootstrapContext;
139 }
140
141 /***
142 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
143 * javax.resource.spi.ActivationSpec)
144 */
145 public void endpointActivation(MessageEndpointFactory endpointFactory,
146 ActivationSpec activationSpec) throws ResourceException {
147
148
149 if (activationSpec.getResourceAdapter() != this) {
150 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
151 }
152
153 if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
154
155 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec) activationSpec);
156
157 if (endpointWorkers.containsKey(key)) {
158 throw new IllegalStateException("Endpoint previously activated");
159 }
160
161 ActiveMQBaseEndpointWorker worker;
162 if( POLLING_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
163 worker = new ActiveMQPollingEndpointWorker(this, key);
164 } else if( ASF_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
165 worker = new ActiveMQAsfEndpointWorker(this, key);
166 } else {
167 throw new NotSupportedException("That type of EndpointWorkerType is not supported: " + getEndpointWorkerType());
168 }
169
170 endpointWorkers.put(key, worker);
171 worker.start();
172
173 }
174 else {
175 throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
176 }
177
178 }
179
180 /***
181 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
182 * javax.resource.spi.ActivationSpec)
183 */
184 public void endpointDeactivation(MessageEndpointFactory endpointFactory,
185 ActivationSpec activationSpec) {
186
187 if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
188 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec) activationSpec);
189 ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker) endpointWorkers.get(key);
190 if (worker == null) {
191
192
193 return;
194 }
195 try {
196 worker.stop();
197 }
198 catch (InterruptedException e) {
199
200
201 Thread.currentThread().interrupt();
202 }
203
204 }
205
206 }
207
208 /***
209 * We only connect to one resource manager per ResourceAdapter instance, so any ActivationSpec
210 * will return the same XAResource.
211 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
212 */
213 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
214 try {
215 Connection connection = getPhysicalConnection();
216 if (connection instanceof XAConnection) {
217 XASession session = ((XAConnection)connection).createXASession();
218 XAResource xaResource = session.getXAResource();
219 return new XAResource[] {xaResource};
220 } else {
221 return new XAResource[] {};
222 }
223 } catch (JMSException e) {
224 throw new ResourceException(e);
225 }
226 }
227
228
229
230
231
232
233
234 /***
235 * @return
236 */
237 public String getClientid() {
238 return info.getClientid();
239 }
240
241 /***
242 * @return
243 */
244 public String getPassword() {
245 return info.getPassword();
246 }
247
248 /***
249 * @return
250 */
251 public String getServerUrl() {
252 return info.getServerUrl();
253 }
254
255 /***
256 * @return
257 */
258 public String getUserName() {
259 return info.getUserName();
260 }
261
262 /***
263 * @param clientid
264 */
265 public void setClientid(String clientid) {
266 info.setClientid(clientid);
267 }
268
269 /***
270 * @param password
271 */
272 public void setPassword(String password) {
273 info.setPassword(password);
274 }
275
276 /***
277 * @param url
278 */
279 public void setServerUrl(String url) {
280 info.setServerUrl(url);
281 }
282
283 /***
284 * @param userid
285 */
286 public void setUserName(String userid) {
287 info.setUserName(userid);
288 }
289
290 public Boolean isXA() {
291 return Boolean.valueOf(info.isXa());
292 }
293
294 public void setXA(Boolean xa) {
295 info.setXa(xa.booleanValue());
296 }
297
298 /***
299 * @return Returns the endpointWorkerType.
300 */
301 public String getEndpointWorkerType() {
302 return endpointWorkerType;
303 }
304 /***
305 * @param endpointWorkerType The endpointWorkerType to set.
306 */
307 public void setEndpointWorkerType(String endpointWorkerType) {
308 this.endpointWorkerType = endpointWorkerType;
309 }
310
311 /***
312 * @return Returns the info.
313 */
314 public ActiveMQConnectionRequestInfo getInfo() {
315 return info;
316 }
317 }