1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq;
20
21 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.codehaus.activemq.capacity.CapacityMonitorEvent;
27 import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
28 import org.codehaus.activemq.management.JMSConnectionStatsImpl;
29 import org.codehaus.activemq.management.JMSStatsImpl;
30 import org.codehaus.activemq.management.StatsCapable;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.message.CapacityInfo;
33 import org.codehaus.activemq.message.ConnectionInfo;
34 import org.codehaus.activemq.message.ConsumerInfo;
35 import org.codehaus.activemq.message.Packet;
36 import org.codehaus.activemq.message.PacketListener;
37 import org.codehaus.activemq.message.ProducerInfo;
38 import org.codehaus.activemq.message.Receipt;
39 import org.codehaus.activemq.message.SessionInfo;
40 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
41 import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
42 import org.codehaus.activemq.transport.TransportChannel;
43 import org.codehaus.activemq.transport.TransportStatusEvent;
44 import org.codehaus.activemq.transport.TransportStatusEventListener;
45 import org.codehaus.activemq.util.IdGenerator;
46 import org.codehaus.activemq.util.JMSExceptionHelper;
47
48 import javax.jms.*;
49 import javax.jms.IllegalStateException;
50 import javax.management.j2ee.statistics.Stats;
51 import java.util.Iterator;
52
53 /***
54 * A <CODE>Connection</CODE> object is a client's active connection to its JMS provider. It typically allocates
55 * provider resources outside the Java virtual machine (JVM).
56 * <P>
57 * Connections support concurrent use.
58 * <P>
59 * A connection serves several purposes:
60 * <UL>
61 * <LI>It encapsulates an open connection with a JMS provider. It typically represents an open TCP/IP socket between a
62 * client and the service provider software.
63 * <LI>Its creation is where client authentication takes place.
64 * <LI>It can specify a unique client identifier.
65 * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
66 * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
67 * </UL>
68 * <P>
69 * Because the creation of a connection involves setting up authentication and communication, a connection is a
70 * relatively heavyweight object. Most clients will do all their messaging with a single connection. Other more advanced
71 * applications may use several connections. The JMS API does not architect a reason for using multiple connections;
72 * however, there may be operational reasons for doing so.
73 * <P>
74 * A JMS client typically creates a connection, one or more sessions, and a number of message producers and consumers.
75 * When a connection is created, it is in stopped mode. That means that no messages are being delivered.
76 * <P>
77 * It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers
78 * have been created). At that point, the client calls the connection's <CODE>start</CODE> method, and messages begin
79 * arriving at the connection's consumers. This setup convention minimizes any client confusion that may result from
80 * asynchronous message delivery while the client is still in the process of setting itself up.
81 * <P>
82 * A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared
83 * to handle asynchronous message delivery while they are still in the process of setting up.
84 * <P>
85 * A message producer can send messages while a connection is stopped. <p/>This class is also a <CODE>TopicConnection
86 * </CODE>. A <CODE>TopicConnection</CODE> object is an active connection to a publish/subscribe JMS provider. A
87 * client uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE> objects for
88 * producing and consuming messages.
89 * <P>
90 * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>, from which specialized
91 * topic-related objects can be created. A more general, and recommended approach is to use the <CODE>Connection
92 * </CODE> object.
93 * <P>
94 * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE> object is an active
95 * connection to a point-to-point JMS provider. A client uses a <CODE>QueueConnection</CODE> object to create one or
96 * more <CODE>QueueSession</CODE> objects for producing and consuming messages.
97 * <P>
98 * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>, from which specialized
99 * queue-related objects can be created. A more general, and recommended, approach is to use the <CODE>Connection
100 * </CODE> object.
101 * <P>
102 * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to the publish/subscribe domain. The
103 * <CODE>createDurableConnectionConsumer</CODE> method inherits from <CODE>Connection</CODE>, but must throw an
104 * <CODE>IllegalStateException</CODE> if used from <CODE>QueueConnection</CODE>. // *
105 *
106 * @version $Revision: 1.72 $
107 * @see javax.jms.Connection
108 * @see javax.jms.ConnectionFactory
109 * @see javax.jms.QueueConnection
110 * @see javax.jms.TopicConnection
111 * @see javax.jms.TopicConnectionFactory
112 * @see javax.jms.QueueConnection
113 * @see javax.jms.QueueConnectionFactory
114 */
115 public class ActiveMQConnection
116 implements
117 Connection,
118 PacketListener,
119 ExceptionListener,
120 TopicConnection,
121 QueueConnection,
122 StatsCapable,
123 CapacityMonitorEventListener,
124 TransportStatusEventListener {
125
126 /***
127 * Default UserName for the Connection
128 */
129 public static final String DEFAULT_USER = "defaultUser";
130 /***
131 * Default URL for the ActiveMQ Broker
132 */
133 public static final String DEFAULT_URL = "tcp://localhost:61616";
134 /***
135 * Default Password for the Connection
136 */
137 public static final String DEFAULT_PASSWORD = "defaultPassword";
138
139 private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
140 private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
141
142
143 private ActiveMQConnectionFactory factory;
144 private String userName;
145 private String password;
146 protected String clientID;
147 private int sendCloseTimeout = 2000;
148 private TransportChannel transportChannel;
149 private ExceptionListener exceptionListener;
150 private ActiveMQPrefetchPolicy prefetchPolicy;
151 private JMSStatsImpl factoryStats;
152 private MemoryBoundedQueueManager boundedQueueManager;
153 protected IdGenerator consumerIdGenerator;
154 private IdGenerator clientIdGenerator;
155 protected IdGenerator packetIdGenerator;
156 private IdGenerator sessionIdGenerator;
157 private JMSConnectionStatsImpl stats;
158
159
160 private CopyOnWriteArrayList sessions;
161 private CopyOnWriteArrayList messageDispatchers;
162 private CopyOnWriteArrayList connectionConsumers;
163 private SynchronizedInt consumerNumberGenerator;
164 private ActiveMQConnectionMetaData connectionMetaData;
165 private SynchronizedBoolean closed;
166 private SynchronizedBoolean started;
167 private boolean clientIDSet;
168 private boolean isConnectionInfoSentToBroker;
169 private boolean isTransportOK;
170 private boolean startedTransport;
171 private long startTime;
172 private long flowControlSleepTime = 0;
173
174 private boolean userSpecifiedClientID;
175 /***
176 * Should we use an async send for persistent non transacted messages ?
177 */
178 protected boolean useAsyncSend = true;
179 private int sendConnectionInfoTimeout = 30000;
180
181 /***
182 * A static helper method to create a new connection
183 *
184 * @return an ActiveMQConnection
185 * @throws JMSException
186 */
187 public static ActiveMQConnection makeConnection() throws JMSException {
188 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
189 return (ActiveMQConnection) factory.createConnection();
190 }
191
192 /***
193 * A static helper method to create a new connection
194 *
195 * @param uri
196 * @return and ActiveMQConnection
197 * @throws JMSException
198 */
199 public static ActiveMQConnection makeConnection(String uri) throws JMSException {
200 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
201 return (ActiveMQConnection) factory.createConnection();
202 }
203
204 /***
205 * A static helper method to create a new connection
206 *
207 * @param user
208 * @param password
209 * @param uri
210 * @return an ActiveMQConnection
211 * @throws JMSException
212 */
213 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException {
214 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, uri);
215 return (ActiveMQConnection) factory.createConnection();
216 }
217
218 /***
219 * Constructs a connection from an existing TransportChannel and user/password.
220 *
221 * @param factory
222 * @param theUserName the users name
223 * @param thePassword the password
224 * @param transportChannel the transport channel to communicate with the server
225 * @throws JMSException
226 */
227 public ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword,
228 TransportChannel transportChannel) throws JMSException {
229 this(factory, theUserName, thePassword);
230 this.transportChannel = transportChannel;
231 this.transportChannel.setPacketListener(this);
232 this.transportChannel.setExceptionListener(this);
233 this.transportChannel.addTransportStatusEventListener(this);
234 this.isTransportOK = true;
235 }
236
237 protected ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword) {
238 this.factory = factory;
239 this.userName = theUserName;
240 this.password = thePassword;
241 this.clientIdGenerator = new IdGenerator();
242 this.packetIdGenerator = new IdGenerator();
243 this.consumerIdGenerator = new IdGenerator();
244 this.sessionIdGenerator = new IdGenerator();
245 this.consumerNumberGenerator = new SynchronizedInt(0);
246 this.sessions = new CopyOnWriteArrayList();
247 this.messageDispatchers = new CopyOnWriteArrayList();
248 this.connectionConsumers = new CopyOnWriteArrayList();
249 this.connectionMetaData = new ActiveMQConnectionMetaData();
250 this.closed = new SynchronizedBoolean(false);
251 this.started = new SynchronizedBoolean(false);
252 this.startTime = System.currentTimeMillis();
253 this.prefetchPolicy = new ActiveMQPrefetchPolicy();
254 this.boundedQueueManager = new MemoryBoundedQueueManager(clientID, DEFAULT_CONNECTION_MEMORY_LIMIT);
255 this.boundedQueueManager.addCapacityEventListener(this);
256 boolean transactional = this instanceof XAConnection;
257 factoryStats = factory.getFactoryStats();
258 factoryStats.addConnection(this);
259 stats = new JMSConnectionStatsImpl(sessions, transactional);
260 factory.onConnectionCreate(this);
261 }
262
263 /***
264 * @return statistics for this Connection
265 */
266 public Stats getStats() {
267 return stats;
268 }
269
270 /***
271 * @return a number unique for this connection
272 */
273 public JMSConnectionStatsImpl getConnectionStats() {
274 return stats;
275 }
276
277 /***
278 * Creates a <CODE>Session</CODE> object.
279 *
280 * @param transacted indicates whether the session is transacted
281 * @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
282 * ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
283 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
284 * @return a newly created session
285 * @throws JMSException if the <CODE>Connection</CODE> object fails to create a session due to some internal error
286 * or lack of support for the specific transaction and acknowledgement mode.
287 * @see Session#AUTO_ACKNOWLEDGE
288 * @see Session#CLIENT_ACKNOWLEDGE
289 * @see Session#DUPS_OK_ACKNOWLEDGE
290 * @since 1.1
291 */
292 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
293 checkClosed();
294 ensureClientIDInitialised();
295 return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
296 }
297
298 /***
299 * Gets the client identifier for this connection.
300 * <P>
301 * This value is specific to the JMS provider. It is either preconfigured by an administrator in a <CODE>
302 * ConnectionFactory</CODE> object or assigned dynamically by the application by calling the
303 * <code>setClientID</code> method.
304 *
305 * @return the unique client identifier
306 * @throws JMSException if the JMS provider fails to return the client ID for this connection due to some internal
307 * error.
308 */
309 public String getClientID() throws JMSException {
310 checkClosed();
311 return this.clientID;
312 }
313
314
315 /***
316 * Sets the client identifier for this connection.
317 * <P>
318 * The preferred way to assign a JMS client's client identifier is for it to be configured in a client-specific
319 * <CODE>ConnectionFactory</CODE> object and transparently assigned to the <CODE>Connection</CODE> object it
320 * creates.
321 * <P>
322 * Alternatively, a client can set a connection's client identifier using a provider-specific value. The facility to
323 * set a connection's client identifier explicitly is not a mechanism for overriding the identifier that has been
324 * administratively configured. It is provided for the case where no administratively specified identifier exists.
325 * If one does exist, an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>. If
326 * a client sets the client identifier explicitly, it must do so immediately after it creates the connection and
327 * before any other action on the connection is taken. After this point, setting the client identifier is a
328 * programming error that should throw an <CODE>IllegalStateException</CODE>.
329 * <P>
330 * The purpose of the client identifier is to associate a connection and its objects with a state maintained on
331 * behalf of the client by a provider. The only such state identified by the JMS API is that required to support
332 * durable subscriptions.
333 * <P>
334 * If another connection with the same <code>clientID</code> is already running when this method is called, the
335 * JMS provider should detect the duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
336 *
337 * @param newClientID the unique client identifier
338 * @throws JMSException if the JMS provider fails to set the client ID for this connection due to some internal
339 * error.
340 * @throws javax.jms.InvalidClientIDException
341 * if the JMS client specifies an invalid or duplicate client ID.
342 * @throws javax.jms.IllegalStateException
343 * if the JMS client attempts to set a connection's client ID at the wrong
344 * time or when it has been administratively configured.
345 */
346 public void setClientID(String newClientID) throws JMSException {
347 if (this.clientIDSet) {
348 throw new IllegalStateException("The clientID has already been set");
349 }
350 if (this.isConnectionInfoSentToBroker) {
351 throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
352 }
353 checkClosed();
354 this.clientID = newClientID;
355 this.userSpecifiedClientID = true;
356 }
357
358 /***
359 * Gets the metadata for this connection.
360 *
361 * @return the connection metadata
362 * @throws JMSException if the JMS provider fails to get the connection metadata for this connection.
363 * @see javax.jms.ConnectionMetaData
364 */
365 public ConnectionMetaData getMetaData() throws JMSException {
366 checkClosed();
367 return this.connectionMetaData;
368 }
369
370 /***
371 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not every <CODE>Connection</CODE> has an
372 * <CODE>ExceptionListener</CODE> associated with it.
373 *
374 * @return the <CODE>ExceptionListener</CODE> for this connection, or null. if no <CODE>ExceptionListener</CODE>
375 * is associated with this connection.
376 * @throws JMSException if the JMS provider fails to get the <CODE>ExceptionListener</CODE> for this connection.
377 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
378 */
379 public ExceptionListener getExceptionListener() throws JMSException {
380 checkClosed();
381 return this.exceptionListener;
382 }
383
384 /***
385 * Sets an exception listener for this connection.
386 * <P>
387 * If a JMS provider detects a serious problem with a connection, it informs the connection's <CODE>
388 * ExceptionListener</CODE>, if one has been registered. It does this by calling the listener's <CODE>onException
389 * </CODE> method, passing it a <CODE>JMSException</CODE> object describing the problem.
390 * <P>
391 * An exception listener allows a client to be notified of a problem asynchronously. Some connections only consume
392 * messages, so they would have no other way to learn their connection has failed.
393 * <P>
394 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
395 * <P>
396 * A JMS provider should attempt to resolve connection problems itself before it notifies the client of them.
397 *
398 * @param listener the exception listener
399 * @throws JMSException if the JMS provider fails to set the exception listener for this connection.
400 */
401 public void setExceptionListener(ExceptionListener listener) throws JMSException {
402 checkClosed();
403 this.exceptionListener = listener;
404 this.transportChannel.setExceptionListener(listener);
405 }
406
407 /***
408 * Starts (or restarts) a connection's delivery of incoming messages. A call to <CODE>start</CODE> on a connection
409 * that has already been started is ignored.
410 *
411 * @throws JMSException if the JMS provider fails to start message delivery due to some internal error.
412 * @see javax.jms.Connection#stop()
413 */
414 public void start() throws JMSException {
415 checkClosed();
416 if (started.commit(false, true)) {
417 sendConnectionInfoToBroker();
418 for (Iterator i = sessions.iterator(); i.hasNext();) {
419 ActiveMQSession s = (ActiveMQSession) i.next();
420 s.start();
421 }
422 }
423 }
424
425 /***
426 * @return true if this Connection is started
427 */
428 protected boolean isStarted() {
429 return started.get();
430 }
431
432 /***
433 * Temporarily stops a connection's delivery of incoming messages. Delivery can be restarted using the connection's
434 * <CODE>start</CODE> method. When the connection is stopped, delivery to all the connection's message consumers
435 * is inhibited: synchronous receives block, and messages are not delivered to message listeners.
436 * <P>
437 * This call blocks until receives and/or message listeners in progress have completed.
438 * <P>
439 * Stopping a connection has no effect on its ability to send messages. A call to <CODE>stop</CODE> on a
440 * connection that has already been stopped is ignored.
441 * <P>
442 * A call to <CODE>stop</CODE> must not return until delivery of messages has paused. This means that a client can
443 * rely on the fact that none of its message listeners will be called and that all threads of control waiting for
444 * <CODE>receive</CODE> calls to return will not return with a message until the connection is restarted. The
445 * receive timers for a stopped connection continue to advance, so receives may time out while the connection is
446 * stopped.
447 * <P>
448 * If message listeners are running when <CODE>stop</CODE> is invoked, the <CODE>stop</CODE> call must wait
449 * until all of them have returned before it may return. While these message listeners are completing, they must
450 * have the full services of the connection available to them.
451 *
452 * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
453 * @see javax.jms.Connection#start()
454 */
455 public void stop() throws JMSException {
456 checkClosed();
457 if (started.commit(true, false)) {
458 for (Iterator i = sessions.iterator(); i.hasNext();) {
459 ActiveMQSession s = (ActiveMQSession) i.next();
460 s.stop();
461 }
462 sendConnectionInfoToBroker(2000, closed.get());
463 transportChannel.stop();
464 }
465 }
466
467 /***
468 * Closes the connection.
469 * <P>
470 * Since a provider typically allocates significant resources outside the JVM on behalf of a connection, clients
471 * should close these resources when they are not needed. Relying on garbage collection to eventually reclaim these
472 * resources may not be timely enough.
473 * <P>
474 * There is no need to close the sessions, producers, and consumers of a closed connection.
475 * <P>
476 * Closing a connection causes all temporary destinations to be deleted.
477 * <P>
478 * When this method is invoked, it should not return until message processing has been shut down in an orderly
479 * fashion. This means that all message listeners that may have been running have returned, and that all pending
480 * receives have returned. A close terminates all pending message receives on the connection's sessions' consumers.
481 * The receives may return with a message or with null, depending on whether there was a message available at the
482 * time of the close. If one or more of the connection's sessions' message listeners is processing a message at the
483 * time when connection <CODE>close</CODE> is invoked, all the facilities of the connection and its sessions must
484 * remain available to those listeners until they return control to the JMS provider.
485 * <P>
486 * Closing a connection causes any of its sessions' transactions in progress to be rolled back. In the case where a
487 * session's work is coordinated by an external transaction manager, a session's <CODE>commit</CODE> and <CODE>
488 * rollback</CODE> methods are not used and the result of a closed session's work is determined later by the
489 * transaction manager. Closing a connection does NOT force an acknowledgment of client-acknowledged sessions.
490 * <P>
491 * Invoking the <CODE>acknowledge</CODE> method of a received message from a closed connection's session must
492 * throw an <CODE>IllegalStateException</CODE>. Closing a closed connection must NOT throw an exception.
493 *
494 * @throws JMSException if the JMS provider fails to close the connection due to some internal error. For example, a
495 * failure to release resources or to close a socket connection can cause this exception to be thrown.
496 */
497 public synchronized void close() throws JMSException {
498 this.transportChannel.setPendingStop(true);
499 if (!closed.get()) {
500 boundedQueueManager.removeCapacityEventListener(this);
501 try {
502 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
503 ActiveMQSession s = (ActiveMQSession) i.next();
504 s.close();
505 }
506 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
507 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
508 c.close();
509 }
510 try {
511 sendConnectionInfoToBroker(sendCloseTimeout, true);
512 }
513 catch (TimeoutExpiredException e) {
514 log.warn("Failed to send close to broker, timeout expired of: " + sendCloseTimeout + " millis");
515 }
516 this.connectionConsumers.clear();
517 this.messageDispatchers.clear();
518 this.transportChannel.stop();
519 }
520 finally {
521 this.sessions.clear();
522 started.set(false);
523 factory.onConnectionClose(this);
524 }
525 closed.set(true);
526 }
527 }
528
529
530 /***
531 * simply throws an exception if the Connection is already closed
532 *
533 * @throws JMSException
534 */
535 protected synchronized void checkClosed() throws JMSException {
536 if (!startedTransport) {
537 startedTransport = true;
538 this.transportChannel.start();
539 }
540 if (this.closed.get()) {
541 throw new IllegalStateException("The Connection is closed");
542 }
543 }
544
545 /***
546 * Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
547 * regular JMS clients.
548 *
549 * @param destination the destination to access
550 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
551 * value of null or an empty string indicates that there is no message selector for the message consumer.
552 * @param sessionPool the server session pool to associate with this connection consumer
553 * @param maxMessages the maximum number of messages that can be assigned to a server session at one time
554 * @return the connection consumer
555 * @throws JMSException if the <CODE>Connection</CODE> object fails to create a connection consumer due to some
556 * internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
557 * @throws javax.jms.InvalidDestinationException
558 * if an invalid destination is specified.
559 * @throws javax.jms.InvalidSelectorException
560 * if the message selector is invalid.
561 * @see javax.jms.ConnectionConsumer
562 * @since 1.1
563 */
564 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
565 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
566 checkClosed();
567 ConsumerInfo info = new ConsumerInfo();
568 info.setId(this.packetIdGenerator.generateId());
569 info.setConsumerId(consumerIdGenerator.generateId());
570 info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
571 info.setSelector(messageSelector);
572 return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
573 }
574
575 /***
576 * Create a durable connection consumer for this connection (optional operation). This is an expert facility not
577 * used by regular JMS clients.
578 *
579 * @param topic topic to access
580 * @param subscriptionName durable subscription name
581 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
582 * value of null or an empty string indicates that there is no message selector for the message consumer.
583 * @param sessionPool the server session pool to associate with this durable connection consumer
584 * @param maxMessages the maximum number of messages that can be assigned to a server session at one time
585 * @return the durable connection consumer
586 * @throws JMSException if the <CODE>Connection</CODE> object fails to create a connection consumer due to some
587 * internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
588 * @throws javax.jms.InvalidDestinationException
589 * if an invalid destination is specified.
590 * @throws javax.jms.InvalidSelectorException
591 * if the message selector is invalid.
592 * @see javax.jms.ConnectionConsumer
593 * @since 1.1
594 */
595 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
596 String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
597 checkClosed();
598 ConsumerInfo info = new ConsumerInfo();
599 info.setId(this.packetIdGenerator.generateId());
600 info.setConsumerId(this.consumerIdGenerator.generateId());
601 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
602 info.setSelector(messageSelector);
603 info.setConsumerName(subscriptionName);
604 return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
605 }
606
607 /***
608 * Implementation of the PacketListener interface - consume a packet
609 *
610 * @param packet - the Packet to consume
611 * @see org.codehaus.activemq.message.PacketListener#consume(org.codehaus.activemq.message.Packet)
612 */
613 public void consume(Packet packet) {
614 if (!closed.get() && packet != null) {
615 if (packet.isJMSMessage()) {
616 ActiveMQMessage message = (ActiveMQMessage) packet;
617 message.setReadOnly(true);
618 message.setProducerID(clientID);
619
620 try {
621 int count = 0;
622 for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
623 ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
624 if (dispatcher.isTarget(message)) {
625 if (count > 0) {
626
627 message = message.deepCopy();
628 }
629 dispatcher.dispatch(message);
630 count++;
631 }
632 }
633 }
634 catch (JMSException jmsEx) {
635 handleAsyncException(jmsEx);
636 }
637 }
638 else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
639 CapacityInfo info = (CapacityInfo) packet;
640 flowControlSleepTime = info.getFlowControlTimeout();
641
642 }
643 }
644 }
645
646 /***
647 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
648 */
649 public void onException(JMSException jmsEx) {
650
651 handleAsyncException(jmsEx);
652 isTransportOK = false;
653 try {
654 close();
655 }
656 catch (JMSException ex) {
657 log.warn("Got an exception closing the connection", ex);
658 }
659 }
660
661 /***
662 * Creates a <CODE>TopicSession</CODE> object.
663 *
664 * @param transacted indicates whether the session is transacted
665 * @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
666 * ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
667 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
668 * @return a newly created topic session
669 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails to create a session due to some internal
670 * error or lack of support for the specific transaction and acknowledgement mode.
671 * @see Session#AUTO_ACKNOWLEDGE
672 * @see Session#CLIENT_ACKNOWLEDGE
673 * @see Session#DUPS_OK_ACKNOWLEDGE
674 */
675 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
676 checkClosed();
677 return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
678 }
679
680 /***
681 * Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
682 * regular JMS clients.
683 *
684 * @param topic the topic to access
685 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
686 * value of null or an empty string indicates that there is no message selector for the message consumer.
687 * @param sessionPool the server session pool to associate with this connection consumer
688 * @param maxMessages the maximum number of messages that can be assigned to a server session at one time
689 * @return the connection consumer
690 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails to create a connection consumer due to
691 * some internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
692 * @throws InvalidDestinationException if an invalid topic is specified.
693 * @throws InvalidSelectorException if the message selector is invalid.
694 * @see javax.jms.ConnectionConsumer
695 */
696 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
697 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
698 checkClosed();
699 ConsumerInfo info = new ConsumerInfo();
700 info.setId(this.packetIdGenerator.generateId());
701 info.setConsumerId(this.consumerIdGenerator.generateId());
702 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
703 info.setSelector(messageSelector);
704 return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
705 }
706
707 /***
708 * Creates a <CODE>QueueSession</CODE> object.
709 *
710 * @param transacted indicates whether the session is transacted
711 * @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
712 * ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
713 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
714 * @return a newly created queue session
715 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails to create a session due to some internal
716 * error or lack of support for the specific transaction and acknowledgement mode.
717 * @see Session#AUTO_ACKNOWLEDGE
718 * @see Session#CLIENT_ACKNOWLEDGE
719 * @see Session#DUPS_OK_ACKNOWLEDGE
720 */
721 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
722 checkClosed();
723 return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
724 }
725
726 /***
727 * Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
728 * regular JMS clients.
729 *
730 * @param queue the queue to access
731 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
732 * value of null or an empty string indicates that there is no message selector for the message consumer.
733 * @param sessionPool the server session pool to associate with this connection consumer
734 * @param maxMessages the maximum number of messages that can be assigned to a server session at one time
735 * @return the connection consumer
736 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails to create a connection consumer due to
737 * some internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
738 * @throws InvalidDestinationException if an invalid queue is specified.
739 * @throws InvalidSelectorException if the message selector is invalid.
740 * @see javax.jms.ConnectionConsumer
741 */
742 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
743 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
744 checkClosed();
745 ConsumerInfo info = new ConsumerInfo();
746 info.setId(this.packetIdGenerator.generateId());
747 info.setConsumerId(this.consumerIdGenerator.generateId());
748 info.setDestination(ActiveMQMessageTransformation.transformDestination(queue));
749 info.setSelector(messageSelector);
750 return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
751 }
752
753 /***
754 * Ensures that the clientID was manually specified and not auto-generated. If the clientID was not specified this
755 * method will throw an exception. This method is used to ensure that the clientID + durableSubscriber name are used
756 * correctly.
757 *
758 * @throws JMSException
759 */
760 public void checkClientIDWasManuallySpecified() throws JMSException {
761 if (!userSpecifiedClientID) {
762 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
763 }
764 }
765
766 /***
767 * handle disconnect/reconnect events
768 *
769 * @param event
770 */
771 public void statusChanged(TransportStatusEvent event) {
772 log.info("channel status changed: " + event);
773 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
774 doReconnect();
775 }
776 else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
777 clearMessagesInProgress();
778 }
779 }
780
781
782 /***
783 * send a Packet through the Connection - for internal use only
784 *
785 * @param packet
786 * @throws JMSException
787 */
788 public void asyncSendPacket(Packet packet) throws JMSException {
789 asyncSendPacket(packet, true);
790 }
791
792 /***
793 * send a Packet through the Connection - for internal use only
794 *
795 * @param packet
796 * @param doSendWhileReconnecting
797 * @throws JMSException
798 */
799 public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting) throws JMSException {
800 if (isTransportOK && !closed.get() && (doSendWhileReconnecting || transportChannel.isTransportConnected())) {
801 packet.setReceiptRequired(false);
802 if (packet.isJMSMessage() && flowControlSleepTime > 0) {
803 try {
804 Thread.sleep(flowControlSleepTime);
805 }
806 catch (InterruptedException e) {
807 }
808 }
809 this.transportChannel.asyncSend(packet);
810 }
811 }
812
813 /***
814 * send a Packet through a Connection - for internal use only
815 *
816 * @param packet
817 * @throws JMSException
818 */
819 public void syncSendPacket(Packet packet) throws JMSException {
820 syncSendPacket(packet, 0);
821 }
822
823 /***
824 * Send a packet through a Connection - for internal use only
825 *
826 * @param packet
827 * @param timeout
828 * @throws JMSException
829 */
830 public void syncSendPacket(Packet packet, int timeout) throws JMSException {
831 if (isTransportOK && !closed.get()) {
832 Receipt receipt;
833 packet.setReceiptRequired(true);
834 receipt = this.transportChannel.send(packet, timeout);
835 if (receipt != null) {
836 if (receipt.isFailed()) {
837 Throwable e = receipt.getException();
838 if (e != null) {
839 throw JMSExceptionHelper.newJMSException(e);
840 }
841 throw new JMSException("syncSendPacket failed with unknown exception");
842 }
843 }
844 }
845 else {
846 throw new JMSException("syncSendTimedOut");
847 }
848 }
849
850 /***
851 * send a Packet and get a receipt
852 * @param packet
853 * @return
854 * @throws JMSException
855 */
856
857
858
859
860
861 /***
862 * @return Returns the prefetchPolicy.
863 */
864 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
865 return prefetchPolicy;
866 }
867
868 /***
869 * @param prefetchPolicy The prefetchPolicy to set.
870 */
871 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
872 this.prefetchPolicy = prefetchPolicy;
873 }
874
875 public int getSendCloseTimeout() {
876 return sendCloseTimeout;
877 }
878
879 public void setSendCloseTimeout(int sendCloseTimeout) {
880 this.sendCloseTimeout = sendCloseTimeout;
881 }
882
883 public int getSendConnectionInfoTimeout() {
884 return sendConnectionInfoTimeout;
885 }
886
887 public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
888 this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
889 }
890
891 public Receipt syncSendRequest(Packet packet) throws JMSException {
892 checkClosed();
893 if (isTransportOK && !closed.get()) {
894 Receipt receipt;
895 packet.setReceiptRequired(true);
896 if (packet.getId() == null || packet.getId().length() == 0) {
897 packet.setId(this.packetIdGenerator.generateId());
898 }
899 receipt = this.transportChannel.send(packet);
900 if (receipt.isFailed()) {
901 Throwable e = receipt.getException();
902 if (e != null) {
903 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
904 }
905 throw new JMSException("syncSendPacket failed with unknown exception");
906 }
907 return receipt;
908 }
909 else {
910 throw new JMSException("Connection closed.");
911 }
912 }
913
914 public TransportChannel getTransportChannel() {
915 return transportChannel;
916 }
917
918
919 /***
920 * Returns the clientID of the connection, forcing one to be generated if one has not yet been configured
921 */
922 public String getInitializedClientID() throws JMSException {
923 ensureClientIDInitialised();
924 return this.clientID;
925 }
926
927
928
929
930
931 /***
932 * Used internally for adding Sessions to the Connection
933 *
934 * @param session
935 * @throws JMSException
936 */
937 protected void addSession(ActiveMQSession session) throws JMSException {
938 this.sessions.add(session);
939 addMessageDispatcher(session);
940 if (started.get()) {
941 session.start();
942 }
943 SessionInfo info = createSessionInfo(session);
944 info.setStarted(true);
945 asyncSendPacket(info);
946 }
947
948 /***
949 * Used interanlly for removing Sessions from a Connection
950 *
951 * @param session
952 * @throws JMSException
953 */
954 protected void removeSession(ActiveMQSession session) throws JMSException {
955 this.sessions.remove(session);
956 removeMessageDispatcher(session);
957 SessionInfo info = createSessionInfo(session);
958 info.setStarted(false);
959 asyncSendPacket(info, false);
960 }
961
962 private SessionInfo createSessionInfo(ActiveMQSession session) {
963 SessionInfo info = new SessionInfo();
964 info.setId(packetIdGenerator.generateId());
965 info.setClientId(clientID);
966 info.setSessionId(session.getSessionId());
967 info.setStartTime(session.getStartTime());
968 return info;
969 }
970
971 /***
972 * Add a ConnectionConsumer
973 *
974 * @param connectionConsumer
975 * @throws JMSException
976 */
977 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
978 this.connectionConsumers.add(connectionConsumer);
979 addMessageDispatcher(connectionConsumer);
980 }
981
982 /***
983 * Remove a ConnectionConsumer
984 *
985 * @param connectionConsumer
986 */
987 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
988 this.connectionConsumers.add(connectionConsumer);
989 removeMessageDispatcher(connectionConsumer);
990 }
991
992 /***
993 * Add a Message dispatcher to receive messages from the Broker
994 *
995 * @param messageDispatch
996 * @throws JMSException if an internal error
997 */
998 protected void addMessageDispatcher(ActiveMQMessageDispatcher messageDispatch) throws JMSException {
999 this.messageDispatchers.add(messageDispatch);
1000 }
1001
1002 /***
1003 * Remove a Message dispatcher
1004 *
1005 * @param messageDispatcher
1006 */
1007 protected void removeMessageDispatcher(ActiveMQMessageDispatcher messageDispatcher) {
1008 this.messageDispatchers.remove(messageDispatcher);
1009 }
1010
1011 /***
1012 * Used for handling async exceptions
1013 *
1014 * @param jmsEx
1015 */
1016 protected void handleAsyncException(JMSException jmsEx) {
1017 if (this.exceptionListener != null) {
1018 this.exceptionListener.onException(jmsEx);
1019 }
1020 else {
1021 log.warn("async exception with no exception listener", jmsEx);
1022 }
1023 }
1024
1025 protected void sendConnectionInfoToBroker() throws JMSException {
1026 sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed.get());
1027 }
1028
1029 /***
1030 * Send the ConnectionInfo to the Broker
1031 *
1032 * @param timeout
1033 * @param isClosed
1034 * @throws JMSException
1035 */
1036 protected void sendConnectionInfoToBroker(int timeout, boolean isClosed) throws JMSException {
1037 if (!isConnectionInfoSentToBroker) {
1038 this.isConnectionInfoSentToBroker = true;
1039 }
1040 else {
1041 if (!isClosed) {
1042
1043
1044 return;
1045 }
1046 }
1047 ensureClientIDInitialised();
1048 ConnectionInfo info = new ConnectionInfo();
1049 info.setClientId(this.clientID);
1050 info.setHostName(IdGenerator.getHostName());
1051 info.setUserName(userName);
1052 info.setPassword(password);
1053 info.setId(packetIdGenerator.generateId());
1054 info.setStartTime(startTime);
1055 info.setStarted(started.get());
1056 info.setClosed(isClosed);
1057 info.setClientVersion(connectionMetaData.getProviderVersion());
1058 info.setWireFormatVersion(transportChannel.getCurrentWireFormatVersion());
1059 syncSendPacket(info, timeout);
1060 }
1061
1062 /***
1063 * Set the maximum amount of memory this Connection should use for buffered inbound messages
1064 *
1065 * @param newMemoryLimit the new memory limit in bytes
1066 */
1067 public void setConnectionMemoryLimit(int newMemoryLimit) {
1068 boundedQueueManager.setValueLimit(newMemoryLimit);
1069 }
1070
1071 /***
1072 * Get the current value for the maximum amount of memory this Connection should use for buffered inbound messages
1073 *
1074 * @return the current limit in bytes
1075 */
1076 public int getConnectionMemoryLimit() {
1077 return (int) boundedQueueManager.getValueLimit();
1078 }
1079
1080 /***
1081 * CapacityMonitorEventListener implementation called when the capacity of a CapacityService changes
1082 *
1083 * @param event
1084 */
1085 public void capacityChanged(CapacityMonitorEvent event) {
1086
1087 CapacityInfo info = new CapacityInfo();
1088 info.setId(packetIdGenerator.generateId());
1089 info.setResourceName(event.getMonitorName());
1090 info.setCapacity(event.getCapacity());
1091
1092 try {
1093 asyncSendPacket(info, false);
1094 }
1095 catch (JMSException e) {
1096 JMSException jmsEx = new JMSException("failed to send change in capacity");
1097 jmsEx.setLinkedException(e);
1098 handleAsyncException(jmsEx);
1099 }
1100 }
1101
1102 /***
1103 * @return a number unique for this connection
1104 */
1105 protected int getNextConsumerNumber() {
1106 return this.consumerNumberGenerator.increment();
1107 }
1108
1109 protected String generateSessionId() {
1110 return this.sessionIdGenerator.generateId();
1111 }
1112
1113 protected void ensureClientIDInitialised() {
1114 if (this.clientID == null) {
1115 this.clientID = this.clientIdGenerator.generateId();
1116 }
1117 transportChannel.setClientID(clientID);
1118 this.clientIDSet = true;
1119 }
1120
1121 protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
1122 return boundedQueueManager.getMemoryBoundedQueue(name);
1123 }
1124
1125 protected void doReconnect() {
1126 try {
1127
1128 this.isConnectionInfoSentToBroker = false;
1129 sendConnectionInfoToBroker();
1130 for (Iterator iter = sessions.iterator(); iter.hasNext();) {
1131 ActiveMQSession session = (ActiveMQSession) iter.next();
1132 SessionInfo sessionInfo = createSessionInfo(session);
1133 sessionInfo.setStarted(true);
1134 asyncSendPacket(sessionInfo, false);
1135
1136 for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator.hasNext();) {
1137 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator.next();
1138 ConsumerInfo consumerInfo = session.createConsumerInfo(consumer);
1139 consumerInfo.setStarted(true);
1140 asyncSendPacket(consumerInfo, false);
1141 }
1142
1143 for (Iterator producersIterator = session.producers.iterator(); producersIterator.hasNext();) {
1144 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator.next();
1145 ProducerInfo producerInfo = session.createProducerInfo(producer);
1146 producerInfo.setStarted(true);
1147 asyncSendPacket(producerInfo, false);
1148 }
1149 }
1150 }
1151 catch (JMSException jmsEx) {
1152 log.error("Failed to do reconnection");
1153 handleAsyncException(jmsEx);
1154 isTransportOK = false;
1155 }
1156 }
1157
1158 /***
1159 * @return Returns the useAsyncSend.
1160 */
1161 public boolean isUseAsyncSend() {
1162 return useAsyncSend;
1163 }
1164
1165 /***
1166 * @param useAsyncSend The useAsyncSend to set.
1167 */
1168 public void setUseAsyncSend(boolean useAsyncSend) {
1169 this.useAsyncSend = useAsyncSend;
1170 }
1171
1172 protected void clearMessagesInProgress() {
1173 for (Iterator i = sessions.iterator(); i.hasNext();) {
1174 ActiveMQSession session = (ActiveMQSession) i.next();
1175 session.clearMessagesInProgress();
1176 }
1177 }
1178 }