1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.broker.Broker;
23 import org.codehaus.activemq.broker.BrokerConnector;
24 import org.codehaus.activemq.broker.BrokerContainer;
25 import org.codehaus.activemq.broker.BrokerContainerFactory;
26 import org.codehaus.activemq.broker.BrokerContext;
27 import org.codehaus.activemq.broker.impl.BrokerClientImpl;
28 import org.codehaus.activemq.broker.impl.BrokerConnectorImpl;
29 import org.codehaus.activemq.broker.impl.BrokerContainerFactoryImpl;
30 import org.codehaus.activemq.jndi.JNDIBaseStorable;
31 import org.codehaus.activemq.management.JMSStatsImpl;
32 import org.codehaus.activemq.management.StatsCapable;
33 import org.codehaus.activemq.message.ActiveMQQueue;
34 import org.codehaus.activemq.message.ActiveMQTopic;
35 import org.codehaus.activemq.message.ConnectionInfo;
36 import org.codehaus.activemq.message.ConsumerInfo;
37 import org.codehaus.activemq.message.DefaultWireFormat;
38 import org.codehaus.activemq.message.WireFormat;
39 import org.codehaus.activemq.service.Service;
40 import org.codehaus.activemq.transport.TransportChannel;
41 import org.codehaus.activemq.transport.TransportChannelFactory;
42 import org.codehaus.activemq.transport.TransportChannelListener;
43 import org.codehaus.activemq.transport.TransportChannelProvider;
44 import org.codehaus.activemq.transport.vm.VmTransportChannel;
45 import org.codehaus.activemq.util.IdGenerator;
46
47 import javax.jms.Connection;
48 import javax.jms.ConnectionFactory;
49 import javax.jms.JMSException;
50 import javax.jms.QueueConnection;
51 import javax.jms.QueueConnectionFactory;
52 import javax.jms.TopicConnection;
53 import javax.jms.TopicConnectionFactory;
54 import javax.management.j2ee.statistics.Stats;
55 import java.net.URI;
56 import java.net.URISyntaxException;
57 import java.util.ArrayList;
58 import java.util.Iterator;
59 import java.util.List;
60 import java.util.Properties;
61
62 /***
63 * A ConnectionFactory is an an Administed object, and is used for creating
64 * Connections.
65 * <p/>
66 * This class also implements QueueConnectionFactory and TopicConnectionFactory and is an Administered object.
67 * You can use this connection to create both QueueConnections and TopicConnections.
68 *
69 * @version $Revision: 1.37 $
70 * @see javax.jms.ConnectionFactory
71 */
72 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable {
73
74 private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class);
75
76 private BrokerContext brokerContext = BrokerContext.getInstance();
77 private BrokerContainerFactory brokerContainerFactory;
78 protected BrokerContainer brokerContainer;
79
80 protected String userName;
81 protected String password;
82 protected String brokerURL;
83 protected String clientID;
84 protected String brokerName;
85 private boolean useEmbeddedBroker;
86 /***
87 * Should we use an async send for persistent non transacted messages ?
88 */
89 protected boolean useAsyncSend = true;
90
91 private List startedEmbeddedBrokers = new ArrayList();
92
93 private JMSStatsImpl stats = new JMSStatsImpl();
94 private WireFormat wireFormat = new DefaultWireFormat();
95 private IdGenerator idGenerator = new IdGenerator();
96 private int connectionCount;
97 private String brokerXmlConfig;
98
99 /***
100 * Default Constructor for ActiveMQConnectionFactory
101 */
102 public ActiveMQConnectionFactory() {
103 this.userName = ActiveMQConnection.DEFAULT_USER;
104 this.password = ActiveMQConnection.DEFAULT_PASSWORD;
105 this.brokerURL = ActiveMQConnection.DEFAULT_URL;
106 }
107
108
109 public ActiveMQConnectionFactory(String brokerURL) {
110 this();
111 this.brokerURL = brokerURL;
112 }
113
114 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
115 this.userName = userName;
116 this.password = password;
117 this.brokerURL = brokerURL;
118 }
119
120 /***
121 * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
122 * ready for use in embedded mode.
123 *
124 * @param container
125 */
126 public ActiveMQConnectionFactory(BrokerContainer container) {
127 this(container, "vm://" + container.getBroker().getName());
128 }
129
130 /***
131 * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
132 * ready for use in embedded mode and the brokerURL connection.
133 *
134 * @param container
135 */
136 public ActiveMQConnectionFactory(BrokerContainer container, String brokerURL) {
137 this();
138 this.brokerContainer = container;
139 this.useEmbeddedBroker = true;
140 this.brokerURL = brokerURL;
141 }
142
143
144 public Stats getStats() {
145 return stats;
146 }
147
148 public JMSStatsImpl getFactoryStats() {
149 return stats;
150 }
151
152 /***
153 * @return Returns the brokerURL.
154 */
155 public String getBrokerURL() {
156 return brokerURL;
157 }
158
159 /***
160 * @param brokerURL The brokerURL to set.
161 */
162 public void setBrokerURL(String brokerURL) {
163 this.brokerURL = brokerURL;
164 }
165
166 /***
167 * @return Returns the clientID.
168 */
169 public String getClientID() {
170 return clientID;
171 }
172
173 /***
174 * @param clientID The clientID to set.
175 */
176 public void setClientID(String clientID) {
177 this.clientID = clientID;
178 }
179
180 /***
181 * @return Returns the password.
182 */
183 public String getPassword() {
184 return password;
185 }
186
187 /***
188 * @param password The password to set.
189 */
190 public void setPassword(String password) {
191 this.password = password;
192 }
193
194 /***
195 * @return Returns the userName.
196 */
197 public String getUserName() {
198 return userName;
199 }
200
201 /***
202 * @param userName The userName to set.
203 */
204 public void setUserName(String userName) {
205 this.userName = userName;
206 }
207
208 /***
209 * Is an embedded broker used by this connection factory
210 *
211 * @return true if an embedded broker will be used by this connection factory
212 */
213 public boolean isUseEmbeddedBroker() {
214 return useEmbeddedBroker;
215 }
216
217 /***
218 * Allows embedded brokers to be associated with a connection factory
219 *
220 * @param useEmbeddedBroker
221 */
222 public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
223 this.useEmbeddedBroker = useEmbeddedBroker;
224 }
225
226 /***
227 * The name of the broker to use if creating an embedded broker
228 *
229 * @return
230 */
231 public String getBrokerName() {
232 if (brokerName == null) {
233
234 brokerName = idGenerator.generateId();
235 }
236 return brokerName;
237 }
238
239 public void setBrokerName(String brokerName) {
240 this.brokerName = brokerName;
241 }
242
243 /***
244 * @return Returns the useAsyncSend.
245 */
246 public boolean isUseAsyncSend() {
247 return useAsyncSend;
248 }
249
250 /***
251 * @param useAsyncSend The useAsyncSend to set.
252 */
253 public void setUseAsyncSend(boolean useAsyncSend) {
254 this.useAsyncSend = useAsyncSend;
255 }
256
257 public WireFormat getWireFormat() {
258 return wireFormat;
259 }
260
261 /***
262 * Allows a custom wire format to be used; otherwise the default Java wire format is used
263 * which is designed for minimum size and maximum speed on the Java platform
264 *
265 * @param wireFormat
266 */
267 public void setWireFormat(WireFormat wireFormat) {
268 this.wireFormat = wireFormat;
269 }
270
271 public String getBrokerXmlConfig() {
272 return brokerXmlConfig;
273 }
274
275 public BrokerContainer getBrokerContainer() {
276 return brokerContainer;
277 }
278
279 /***
280 * Sets the <a href="http://activemq.codehaus.org/Xml+Configuration">XML configuration file</a>
281 * used to configure the ActiveMQ broker via Spring if using embedded mode.
282 *
283 * @param brokerXmlConfig is the filename which is assumed to be on the classpath unless a URL
284 * is specified. So a value of <code>foo/bar.xml</code> would be assumed to be on the classpath
285 * whereas <code>file:dir/file.xml</code> would use the file system.
286 * Any valid URL string is supported.
287 * @see #setUseEmbeddedBroker(boolean)
288 */
289 public void setBrokerXmlConfig(String brokerXmlConfig) {
290 this.brokerXmlConfig = brokerXmlConfig;
291 }
292
293 public BrokerContainerFactory getBrokerContainerFactory() throws JMSException {
294 if (brokerContainerFactory == null) {
295 brokerContainerFactory = createBrokerContainerFactory();
296 }
297 return brokerContainerFactory;
298 }
299
300 public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) {
301 this.brokerContainerFactory = brokerContainerFactory;
302 }
303
304 /***
305 * Returns the context used to store broker containers and connectors which defaults
306 * to using the singleton
307 */
308 public BrokerContext getBrokerContext() {
309 return brokerContext;
310 }
311
312 public void setBrokerContext(BrokerContext brokerContext) {
313 this.brokerContext = brokerContext;
314 }
315
316 /***
317 * Create a JMS Connection
318 *
319 * @return the JMS Connection
320 * @throws JMSException if an error occurs creating the Connection
321 */
322 public Connection createConnection() throws JMSException {
323 return this.createConnection(this.userName, this.password);
324 }
325
326 /***
327 * @param userName
328 * @param password
329 * @return the Connection
330 * @throws JMSException if an error occurs creating the Connection
331 */
332 public Connection createConnection(String userName, String password) throws JMSException {
333 ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL));
334 connection.setUseAsyncSend(isUseAsyncSend());
335 if (this.clientID != null && this.clientID.length() > 0) {
336 connection.setClientID(this.clientID);
337 }
338 return connection;
339 }
340
341 /***
342 * Create a JMS QueueConnection
343 *
344 * @return the JMS QueueConnection
345 * @throws JMSException if an error occurs creating the Connection
346 */
347 public QueueConnection createQueueConnection() throws JMSException {
348 return this.createQueueConnection(this.userName, this.password);
349 }
350
351 /***
352 * @param userName
353 * @param password
354 * @return the QueueConnection
355 * @throws JMSException if an error occurs creating the Connection
356 */
357 public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
358 return (QueueConnection) createConnection(userName, password);
359 }
360
361 /***
362 * Create a JMS TopicConnection
363 *
364 * @return the JMS TopicConnection
365 * @throws JMSException if an error occurs creating the Connection
366 */
367 public TopicConnection createTopicConnection() throws JMSException {
368 return this.createTopicConnection(this.userName, this.password);
369 }
370
371 /***
372 * @param userName
373 * @param password
374 * @return the TopicConnection
375 * @throws JMSException if an error occurs creating the Connection
376 */
377 public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
378 return (TopicConnection) createConnection(userName, password);
379 }
380
381
382 public void start() throws JMSException {
383 }
384
385 /***
386 * A hook to allow any embedded JMS Broker's to be closed down
387 *
388 * @throws JMSException
389 */
390 public synchronized void stop() throws JMSException {
391
392 for (Iterator iter = startedEmbeddedBrokers.iterator(); iter.hasNext();) {
393 String uri = (String) iter.next();
394 brokerContext.deregisterConnector(uri);
395 }
396 if (brokerContainer != null) {
397 brokerContainer.stop();
398 brokerContainer = null;
399 }
400 }
401
402
403 public Broker getEmbeddedBroker() throws JMSException {
404 if (isUseEmbeddedBroker()) {
405 return getContainer(getBrokerName()).getBroker();
406 }
407 return null;
408 }
409
410 public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) {
411 BrokerContext.getInstance().registerConnector(theURLString, brokerConnector);
412 }
413
414 public static synchronized void unregisterBroker(String theURLString) {
415 BrokerContext.getInstance().deregisterConnector(theURLString);
416 }
417
418
419
420
421
422
423 /***
424 * Set the properties that will represent the instance in JNDI
425 *
426 * @param props
427 */
428 protected void buildFromProperties(Properties props) {
429 this.userName = props.getProperty("userName", this.userName);
430 this.password = props.getProperty("password", this.password);
431 this.brokerURL = props.getProperty("brokerURL", this.brokerURL);
432 this.brokerName = props.getProperty("brokerName", this.brokerName);
433 this.clientID = props.getProperty("clientID");
434 this.useAsyncSend = getBoolean(props, "useAsyncSend", true);
435 this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker");
436 this.brokerXmlConfig = props.getProperty("brokerXmlConfig", this.brokerXmlConfig);
437 }
438
439 /***
440 * Initialize the instance from properties stored in JNDI
441 *
442 * @param props
443 */
444 protected void populateProperties(Properties props) {
445 props.put("userName", this.userName);
446 props.put("password", this.password);
447 props.put("brokerURL", this.brokerURL);
448 props.put("brokerName", this.brokerName);
449 if (this.clientID != null) {
450 props.put("clientID", this.clientID);
451 }
452 props.put("useAsyncSend", (useAsyncSend) ? "true" : "false");
453 props.put("useEmbeddedBroker", (useEmbeddedBroker) ? "true" : "false");
454 if (this.brokerXmlConfig != null) {
455 props.put("brokerXmlConfig", this.brokerXmlConfig);
456 }
457 }
458
459 /***
460 * Helper method to return the property value as a boolean flag
461 *
462 * @param props
463 * @param key
464 * @return
465 */
466 protected boolean getBoolean(Properties props, String key) {
467 return getBoolean(props, key, false);
468 }
469
470 /***
471 * Helper method to return the property value as a boolean flag
472 *
473 * @param props
474 * @param key
475 * @param defaultValue
476 * @return
477 */
478 protected boolean getBoolean(Properties props, String key, boolean defaultValue) {
479 String value = props.getProperty(key);
480 return value != null ? value.equalsIgnoreCase("true") : defaultValue;
481 }
482
483 protected BrokerContainerFactory createBrokerContainerFactory() throws JMSException {
484 if (brokerXmlConfig != null) {
485 return XmlConfigHelper.createBrokerContainerFactory(brokerXmlConfig);
486 }
487 return new BrokerContainerFactoryImpl();
488 }
489
490 /***
491 * Factory method to create a TransportChannel from a URL
492 */
493 protected TransportChannel createTransportChannel(String theURLString) throws JMSException {
494 URI uri = createURI(theURLString);
495
496 TransportChannelFactory factory =
497 TransportChannelProvider.getFactory(uri);
498
499 BrokerConnector brokerConnector = null;
500 boolean created = false;
501 boolean embedServer = isUseEmbeddedBroker() || factory.requiresEmbeddedBroker();
502 if (embedServer) {
503 synchronized (this) {
504 brokerConnector = brokerContext.getConnectorByURL(theURLString);
505 if (brokerConnector == null) {
506 brokerConnector = createBrokerConnector(theURLString);
507 brokerContext.registerConnector(theURLString, brokerConnector);
508 startedEmbeddedBrokers.add(theURLString);
509 created = true;
510 }
511 }
512 }
513 TransportChannel transportChannel = factory.create(getWireFormat(), uri);
514 if (embedServer) {
515 return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created);
516 }
517 return transportChannel;
518 }
519
520 protected synchronized BrokerContainer getContainer(String brokerName) throws JMSException {
521 if (brokerContainer == null) {
522 brokerContainer = brokerContext.getBrokerContainerByName(brokerName, getBrokerContainerFactory());
523 }
524 return brokerContainer;
525 }
526
527 protected BrokerConnector createBrokerConnector(String url) throws JMSException {
528 BrokerConnector brokerConnector;
529 brokerConnector = new BrokerConnectorImpl(getContainer(getBrokerName()), url, getWireFormat());
530 brokerConnector.start();
531
532
533 log.info("Embedded JMS Broker has started");
534 try {
535 Thread.sleep(1000);
536 }
537 catch (InterruptedException e) {
538 System.out.println("Caught: " + e);
539 e.printStackTrace();
540 }
541 return brokerConnector;
542 }
543
544
545 protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
546 ensureVmServerIsAvailable(channel, brokerConnector);
547 if (channel.isMulticast()) {
548 return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created);
549 }
550 return channel;
551 }
552
553 private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException {
554 if (channel instanceof VmTransportChannel && brokerConnector instanceof TransportChannelListener) {
555 VmTransportChannel answer = (VmTransportChannel) channel;
556 answer.connect(brokerConnector);
557 }
558 }
559
560 protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
561 if (created) {
562 BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl) brokerConnector;
563
564 BrokerClientImpl client = new BrokerClientImpl();
565 client.initialize(brokerImpl, channel);
566 channel.start();
567 String brokerClientID = createMulticastClientID();
568 channel.setClientID(brokerClientID);
569
570
571
572 ConnectionInfo info = new ConnectionInfo();
573 info.setHostName(IdGenerator.getHostName());
574 info.setClientId(brokerClientID);
575 info.setStarted(true);
576 client.consumeConnectionInfo(info);
577
578 ConsumerInfo consumerInfo = new ConsumerInfo();
579 consumerInfo.setDestination(new ActiveMQTopic(">"));
580 consumerInfo.setNoLocal(true);
581 consumerInfo.setClientId(brokerClientID);
582 consumerInfo.setConsumerId(idGenerator.generateId());
583 consumerInfo.setId(consumerInfo.getConsumerId());
584 consumerInfo.setStarted(true);
585 client.consumeConsumerInfo(consumerInfo);
586
587 consumerInfo = new ConsumerInfo();
588 consumerInfo.setDestination(new ActiveMQQueue(">"));
589 consumerInfo.setNoLocal(true);
590 consumerInfo.setClientId(brokerClientID);
591 consumerInfo.setConsumerId(idGenerator.generateId());
592 consumerInfo.setId(consumerInfo.getConsumerId());
593 consumerInfo.setStarted(true);
594 client.consumeConsumerInfo(consumerInfo);
595 }
596
597
598
599 URI localURI = createURI("vm", remoteLocation);
600 TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI);
601 ensureVmServerIsAvailable(localChannel, brokerConnector);
602 return localChannel;
603 }
604
605 /***
606 * Creates the clientID for the multicast client (used to dispatch local
607 * messages over a multicast bus)
608 */
609 protected String createMulticastClientID() {
610 return idGenerator.generateId();
611 }
612
613 protected URI createURI(String protocol, URI uri) throws JMSException {
614 try {
615 return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment());
616 }
617 catch (URISyntaxException e) {
618 JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
619 jmsEx.setLinkedException(e);
620 throw jmsEx;
621
622 }
623 }
624
625 protected URI createURI(String uri) throws JMSException {
626 try {
627 if (uri == null) {
628 throw new JMSException("The connection URI must be specified!");
629 }
630 return new URI(uri);
631 }
632 catch (URISyntaxException e) {
633 JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
634 jmsEx.setLinkedException(e);
635 throw jmsEx;
636
637 }
638 }
639
640 /***
641 * Called when a connection is closed so that we can shut down any embedded brokers cleanly
642 *
643 * @param connection
644 */
645 synchronized void onConnectionClose(ActiveMQConnection connection) throws JMSException {
646 if (--connectionCount <= 0) {
647
648 stop();
649 }
650
651 }
652
653 synchronized void onConnectionCreate(ActiveMQConnection connection) {
654 ++connectionCount;
655 }
656 }