View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.codehaus.activemq;
20  
21  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.management.JMSSessionStatsImpl;
26  import org.codehaus.activemq.management.StatsCapable;
27  import org.codehaus.activemq.message.*;
28  import org.codehaus.activemq.ra.LocalTransactionEventListener;
29  import org.codehaus.activemq.service.impl.DefaultQueueList;
30  import org.codehaus.activemq.util.IdGenerator;
31  
32  import javax.jms.*;
33  import javax.jms.IllegalStateException;
34  import javax.management.j2ee.statistics.Stats;
35  import java.io.Serializable;
36  import java.util.Iterator;
37  import java.util.LinkedList;
38  import java.util.ListIterator;
39  
40  /***
41   * <P>
42   * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
43   * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
44   * <P>
45   * A session serves several purposes:
46   * <UL>
47   * <LI>It is a factory for its message producers and consumers.
48   * <LI>It supplies provider-optimized message factories.
49   * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
50   * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
51   * dynamically manipulate provider-specific destination names.
52   * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
53   * units.
54   * <LI>It defines a serial order for the messages it consumes and the messages it produces.
55   * <LI>It retains messages it consumes until they have been acknowledged.
56   * <LI>It serializes execution of message listeners registered with its message consumers.
57   * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
58   * </UL>
59   * <P>
60   * A session can create and service multiple message producers and consumers.
61   * <P>
62   * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
63   * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
64   * <P>
65   * If a client desires to have one thread produce messages while others consume them, the client should use a separate
66   * session for its producing thread.
67   * <P>
68   * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
69   * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
70   * constituent objects from another thread of control. The only exception to this rule is the use of the session or
71   * connection <CODE>close</CODE> method.
72   * <P>
73   * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
74   * start simply and incrementally add message processing complexity as their need for concurrency grows.
75   * <P>
76   * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
77   * being executed in another thread.
78   * <P>
79   * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
80   * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
81   * transactions organize a session's input message stream and output message stream into series of atomic units. When a
82   * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
83   * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
84   * recovered.
85   * <P>
86   * The content of a transaction's input and output units is simply those messages that have been produced and consumed
87   * within the session's current transaction.
88   * <P>
89   * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
90   * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
91   * transacted session always has a current transaction within which its work is done.
92   * <P>
93   * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
94   * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
95   * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
96   * methods in this context is prohibited.
97   * <P>
98   * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
99   * <P>
100  * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
101  * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
102  * JMS API into their application server products.
103  *
104  * @version $Revision: 1.54 $
105  * @see javax.jms.Session
106  * @see javax.jms.QueueSession
107  * @see javax.jms.TopicSession
108  * @see javax.jms.XASession
109  */
110 public class ActiveMQSession
111         implements
112         Session,
113         QueueSession,
114         TopicSession,
115         ActiveMQMessageDispatcher,
116         MessageAcknowledge,
117         StatsCapable {
118     protected static final int CONSUMER_DISPATCH_UNSET = 1;
119     protected static final int CONSUMER_DISPATCH_ASYNC = 2;
120     protected static final int CONSUMER_DISPATCH_SYNC = 3;
121     private static final Log log = LogFactory.getLog(ActiveMQSession.class);
122     protected ActiveMQConnection connection;
123     private int acknowledgeMode;
124     protected CopyOnWriteArrayList consumers;
125     protected CopyOnWriteArrayList producers;
126     private IdGenerator transactionIdGenerator;
127     private IdGenerator temporaryDestinationGenerator;
128     protected IdGenerator packetIdGenerator;
129     private IdGenerator producerIdGenerator;
130     private IdGenerator consumerIdGenerator;
131     private MessageListener messageListener;
132     protected SynchronizedBoolean closed;
133     private SynchronizedBoolean startTransaction;
134     private String sessionId;
135     protected String currentTransactionId;
136     private long startTime;
137     private LocalTransactionEventListener localTransactionEventListener;
138     private DefaultQueueList deliveredMessages;
139     private ActiveMQSessionExecutor messageExecutor;
140     private JMSSessionStatsImpl stats;
141     private int consumerDispatchState;
142 
143     /***
144      * Construct the Session
145      *
146      * @param theConnection
147      * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
148      * @throws JMSException on internal error
149      */
150     protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
151         this.connection = theConnection;
152         this.acknowledgeMode = theAcknowledgeMode;
153         this.consumers = new CopyOnWriteArrayList();
154         this.producers = new CopyOnWriteArrayList();
155         this.producerIdGenerator = new IdGenerator();
156         this.consumerIdGenerator = new IdGenerator();
157         this.transactionIdGenerator = new IdGenerator();
158         this.temporaryDestinationGenerator = new IdGenerator();
159         this.packetIdGenerator = new IdGenerator();
160         this.closed = new SynchronizedBoolean(false);
161         this.startTransaction = new SynchronizedBoolean(false);
162         this.sessionId = connection.generateSessionId();
163         this.startTime = System.currentTimeMillis();
164         this.deliveredMessages = new DefaultQueueList();
165         this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue(sessionId));
166         if (getTransacted()) {
167             this.currentTransactionId = getNextTransactionId();
168         }
169         connection.addSession(this);
170         stats = new JMSSessionStatsImpl(producers, consumers);
171         this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
172     }
173 
174     public Stats getStats() {
175         return stats;
176     }
177 
178     public JMSSessionStatsImpl getSessionStats() {
179         return stats;
180     }
181 
182     /***
183      * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
184      * containing a stream of uninterpreted bytes.
185      *
186      * @return the an ActiveMQBytesMessage
187      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
188      */
189     public BytesMessage createBytesMessage() throws JMSException {
190         checkClosed();
191         return new ActiveMQBytesMessage();
192     }
193 
194     /***
195      * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
196      * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
197      * Java programming language.
198      *
199      * @return an ActiveMQMapMessage
200      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
201      */
202     public MapMessage createMapMessage() throws JMSException {
203         checkClosed();
204         return new ActiveMQMapMessage();
205     }
206 
207     /***
208      * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
209      * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
210      * a message containing only header information is sufficient.
211      *
212      * @return an ActiveMQMessage
213      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
214      */
215     public Message createMessage() throws JMSException {
216         checkClosed();
217         return new ActiveMQMessage();
218     }
219 
220     /***
221      * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
222      * that contains a serializable Java object.
223      *
224      * @return an ActiveMQObjectMessage
225      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
226      */
227     public ObjectMessage createObjectMessage() throws JMSException {
228         checkClosed();
229         return new ActiveMQObjectMessage();
230     }
231 
232     /***
233      * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
234      * send a message that contains a serializable Java object.
235      *
236      * @param object the object to use to initialize this message
237      * @return an ActiveMQObjectMessage
238      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
239      */
240     public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
241         checkClosed();
242         ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
243         msg.setObject(object);
244         return msg;
245     }
246 
247     /***
248      * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
249      * self-defining stream of primitive values in the Java programming language.
250      *
251      * @return an ActiveMQStreamMessage
252      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
253      */
254     public StreamMessage createStreamMessage() throws JMSException {
255         checkClosed();
256         return new ActiveMQStreamMessage();
257     }
258 
259     /***
260      * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
261      * containing a <CODE>String</CODE> object.
262      *
263      * @return an ActiveMQTextMessage
264      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
265      */
266     public TextMessage createTextMessage() throws JMSException {
267         checkClosed();
268         return new ActiveMQTextMessage();
269     }
270 
271     /***
272      * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
273      * message containing a <CODE>String</CODE>.
274      *
275      * @param text the string used to initialize this message
276      * @return an ActiveMQTextMessage
277      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
278      */
279     public TextMessage createTextMessage(String text) throws JMSException {
280         checkClosed();
281         ActiveMQTextMessage msg = new ActiveMQTextMessage();
282         msg.setText(text);
283         return msg;
284     }
285 
286     /***
287      * Indicates whether the session is in transacted mode.
288      *
289      * @return true if the session is in transacted mode
290      * @throws JMSException if there is some internal error.
291      */
292     public boolean getTransacted() throws JMSException {
293         checkClosed();
294         return this.acknowledgeMode == Session.SESSION_TRANSACTED;
295     }
296 
297     /***
298      * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
299      * created. If the session is transacted, the acknowledgement mode is ignored.
300      *
301      * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
302      *         session is transacted, returns SESSION_TRANSACTED.
303      * @throws JMSException
304      * @see javax.jms.Connection#createSession(boolean,int)
305      * @since 1.1 exception JMSException if there is some internal error.
306      */
307     public int getAcknowledgeMode() throws JMSException {
308         checkClosed();
309         return this.acknowledgeMode;
310     }
311 
312     /***
313      * Commits all messages done in this transaction and releases any locks currently held.
314      *
315      * @throws JMSException                   if the JMS provider fails to commit the transaction due to some internal error.
316      * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
317      *                                        commit.
318      * @throws javax.jms.IllegalStateException
319      *                                        if the method is not called by a transacted session.
320      */
321     public void commit() throws JMSException {
322         checkClosed();
323         if (!getTransacted()) {
324             throw new javax.jms.IllegalStateException("Not a transacted session");
325         }
326         // Only send commit if the transaction was started.
327         if (this.startTransaction.commit(true, false)) {
328             TransactionInfo info = new TransactionInfo();
329             info.setId(this.packetIdGenerator.generateId());
330             info.setTransactionId(currentTransactionId);
331             info.setType(TransactionInfo.COMMIT);
332             //before we send, update the current transaction id
333             this.currentTransactionId = getNextTransactionId();
334             // Notify the listener that the tx was commited back
335             this.connection.syncSendPacket(info);
336             if (localTransactionEventListener != null) {
337                 localTransactionEventListener.commitEvent();
338             }
339         }
340         deliveredMessages.clear();
341     }
342 
343     /***
344      * Rolls back any messages done in this transaction and releases any locks currently held.
345      *
346      * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
347      * @throws javax.jms.IllegalStateException
348      *                      if the method is not called by a transacted session.
349      */
350     public void rollback() throws JMSException {
351         checkClosed();
352         if (!getTransacted()) {
353             throw new javax.jms.IllegalStateException("Not a transacted session");
354         }
355         // Only rollback commit if the transaction was started.
356         if (this.startTransaction.commit(true, false)) {
357             TransactionInfo info = new TransactionInfo();
358             info.setId(this.packetIdGenerator.generateId());
359             info.setTransactionId(currentTransactionId);
360             info.setType(TransactionInfo.ROLLBACK);
361             //before we send, update the current transaction id
362             this.currentTransactionId = getNextTransactionId();
363             this.connection.asyncSendPacket(info);
364             // Notify the listener that the tx was rolled back
365             if (localTransactionEventListener != null) {
366                 localTransactionEventListener.rollbackEvent();
367             }
368         }
369         redeliverUnacknowledgedMessages(true);
370         deliveredMessages.clear();
371     }
372 
373     /***
374      * Closes the session.
375      * <P>
376      * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
377      * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
378      * be timely enough.
379      * <P>
380      * There is no need to close the producers and consumers of a closed session.
381      * <P>
382      * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
383      * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
384      * <P>
385      * Closing a transacted session must roll back the transaction in progress.
386      * <P>
387      * This method is the only <CODE>Session</CODE> method that can be called concurrently.
388      * <P>
389      * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
390      * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
391      *
392      * @throws JMSException if the JMS provider fails to close the session due to some internal error.
393      */
394     public void close() throws JMSException {
395         if (!this.closed.get()) {
396             if (getTransacted()) {
397                 rollback();
398             }
399             doClose();
400             closed.set(true);
401         }
402     }
403 
404     protected void doClose() throws JMSException {
405         doAcknowledge(true);
406         for (Iterator i = consumers.iterator(); i.hasNext();) {
407             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
408             consumer.close();
409         }
410         for (Iterator i = producers.iterator(); i.hasNext();) {
411             ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
412             producer.close();
413         }
414         consumers.clear();
415         producers.clear();
416         this.connection.removeSession(this);
417         messageExecutor.close();
418         deliveredMessages.clear();
419     }
420 
421     /***
422      * @throws IllegalStateException if the Session is closed
423      */
424     protected void checkClosed() throws IllegalStateException {
425         if (this.closed.get()) {
426             throw new IllegalStateException("The Consumer is closed");
427         }
428     }
429 
430     /***
431      * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
432      * <P>
433      * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
434      * messages that have been delivered to the client.
435      * <P>
436      * Restarting a session causes it to take the following actions:
437      * <UL>
438      * <LI>Stop message delivery
439      * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
440      * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
441      * Redelivered messages do not have to be delivered in exactly their original delivery order.
442      * </UL>
443      *
444      * @throws JMSException          if the JMS provider fails to stop and restart message delivery due to some internal error.
445      * @throws IllegalStateException if the method is called by a transacted session.
446      */
447     public void recover() throws JMSException {
448         checkClosed();
449         if (getTransacted()) {
450             throw new IllegalStateException("This session is transacted");
451         }
452         redeliverUnacknowledgedMessages();
453     }
454 
455     /***
456      * Returns the session's distinguished message listener (optional).
457      *
458      * @return the message listener associated with this session
459      * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
460      * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
461      * @see javax.jms.ServerSessionPool
462      * @see javax.jms.ServerSession
463      */
464     public MessageListener getMessageListener() throws JMSException {
465         checkClosed();
466         return this.messageListener;
467     }
468 
469     /***
470      * Sets the session's distinguished message listener (optional).
471      * <P>
472      * When the distinguished message listener is set, no other form of message receipt in the session can be used;
473      * however, all forms of sending messages are still supported.
474      * <P>
475      * This is an expert facility not used by regular JMS clients.
476      *
477      * @param listener the message listener to associate with this session
478      * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
479      * @see javax.jms.Session#getMessageListener()
480      * @see javax.jms.ServerSessionPool
481      * @see javax.jms.ServerSession
482      */
483     public void setMessageListener(MessageListener listener) throws JMSException {
484         checkClosed();
485         this.messageListener = listener;
486         if (listener != null) {
487             messageExecutor.setDoDispatch(false);
488         }
489     }
490 
491     /***
492      * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
493      *
494      * @see javax.jms.ServerSession
495      */
496     public void run() {
497         MessageListener listener = this.messageListener;
498         boolean doRemove = this.acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;
499         ActiveMQMessage message;
500         while ((message = messageExecutor.dequeueNoWait()) != null) {
501             if (listener != null) {
502                 try {
503                     listener.onMessage(message);
504                     this.messageDelivered(true, message, true);
505                 }
506                 catch (Throwable t) {
507                     log.info("Caught :" + t, t);
508                     this.messageDelivered(true, message, false);
509                 }
510             }
511             else {
512                 this.messageDelivered(true, message, false);
513             }
514         }
515     }
516 
517     /***
518      * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
519      * <P>
520      * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
521      * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
522      * destination parameter to create a <CODE>MessageProducer</CODE> object.
523      *
524      * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
525      *                    specified destination.
526      * @return the MessageProducer
527      * @throws JMSException                if the session fails to create a MessageProducer due to some internal error.
528      * @throws InvalidDestinationException if an invalid destination is specified.
529      * @since 1.1
530      */
531     public MessageProducer createProducer(Destination destination) throws JMSException {
532         checkClosed();
533         return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
534     }
535 
536     /***
537      * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
538      * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
539      * create a <CODE>MessageConsumer</CODE>.
540      *
541      * @param destination the <CODE>Destination</CODE> to access.
542      * @return the MessageConsumer
543      * @throws JMSException                if the session fails to create a consumer due to some internal error.
544      * @throws InvalidDestinationException if an invalid destination is specified.
545      * @since 1.1
546      */
547     public MessageConsumer createConsumer(Destination destination) throws JMSException {
548         checkClosed();
549         int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
550                 .getPrefetchPolicy().getQueuePrefetch();
551         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
552                 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
553     }
554 
555     /***
556      * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
557      * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
558      * destination parameter to create a <CODE>MessageConsumer</CODE>.
559      * <P>
560      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
561      *
562      * @param destination     the <CODE>Destination</CODE> to access
563      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
564      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
565      * @return the MessageConsumer
566      * @throws JMSException                if the session fails to create a MessageConsumer due to some internal error.
567      * @throws InvalidDestinationException if an invalid destination is specified.
568      * @throws InvalidSelectorException    if the message selector is invalid.
569      * @since 1.1
570      */
571     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
572         checkClosed();
573         int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
574                 .getPrefetchPolicy().getQueuePrefetch();
575         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
576                 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
577     }
578 
579     /***
580      * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
581      * specify whether messages published by its own connection should be delivered to it, if the destination is a
582      * topic.
583      * <P>
584      * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
585      * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
586      * <P>
587      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
588      * destination.
589      * <P>
590      * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
591      * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
592      * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
593      * topics.
594      *
595      * @param destination     the <CODE>Destination</CODE> to access
596      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
597      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
598      * @param NoLocal         - if true, and the destination is a topic, inhibits the delivery of messages published by its own
599      *                        connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
600      * @return the MessageConsumer
601      * @throws JMSException                if the session fails to create a MessageConsumer due to some internal error.
602      * @throws InvalidDestinationException if an invalid destination is specified.
603      * @throws InvalidSelectorException    if the message selector is invalid.
604      * @since 1.1
605      */
606     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
607             throws JMSException {
608         checkClosed();
609         int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
610         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
611                 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
612     }
613 
614     /***
615      * Creates a queue identity given a <CODE>Queue</CODE> name.
616      * <P>
617      * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
618      * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
619      * not portable.
620      * <P>
621      * Note that this method is not for creating the physical queue. The physical creation of queues is an
622      * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
623      * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
624      *
625      * @param queueName the name of this <CODE>Queue</CODE>
626      * @return a <CODE>Queue</CODE> with the given name
627      * @throws JMSException if the session fails to create a queue due to some internal error.
628      * @since 1.1
629      */
630     public Queue createQueue(String queueName) throws JMSException {
631         checkClosed();
632         return new ActiveMQQueue(queueName);
633     }
634 
635     /***
636      * Creates a topic identity given a <CODE>Topic</CODE> name.
637      * <P>
638      * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
639      * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
640      * not portable.
641      * <P>
642      * Note that this method is not for creating the physical topic. The physical creation of topics is an
643      * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
644      * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
645      *
646      * @param topicName the name of this <CODE>Topic</CODE>
647      * @return a <CODE>Topic</CODE> with the given name
648      * @throws JMSException if the session fails to create a topic due to some internal error.
649      * @since 1.1
650      */
651     public Topic createTopic(String topicName) throws JMSException {
652         checkClosed();
653         return new ActiveMQTopic(topicName);
654     }
655 
656     /***
657      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
658      * 
659      * @param queue the <CODE>queue</CODE> to access
660      * @exception InvalidDestinationException if an invalid destination is specified
661      * @since 1.1
662      */
663     /***
664      * Creates a durable subscriber to the specified topic.
665      * <P>
666      * If a client needs to receive all the messages published on a topic, including the ones published while the
667      * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
668      * this durable subscription and insures that all messages from the topic's publishers are retained until they are
669      * acknowledged by this durable subscriber or they have expired.
670      * <P>
671      * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
672      * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
673      * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
674      * <P>
675      * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
676      * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
677      * unsubscribing (deleting) the old one and creating a new one.
678      * <P>
679      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
680      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
681      * value for this attribute is false.
682      *
683      * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
684      * @param name  the name used to identify this subscription
685      * @return the TopicSubscriber
686      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
687      * @throws InvalidDestinationException if an invalid topic is specified.
688      * @since 1.1
689      */
690     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
691         checkClosed();
692         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
693                 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
694                 false, false);
695     }
696 
697     /***
698      * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
699      * published by its own connection should be delivered to it.
700      * <P>
701      * If a client needs to receive all the messages published on a topic, including the ones published while the
702      * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
703      * this durable subscription and insures that all messages from the topic's publishers are retained until they are
704      * acknowledged by this durable subscriber or they have expired.
705      * <P>
706      * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
707      * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
708      * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
709      * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
710      * <P>
711      * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
712      * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
713      * unsubscribing (deleting) the old one and creating a new one.
714      *
715      * @param topic           the non-temporary <CODE>Topic</CODE> to subscribe to
716      * @param name            the name used to identify this subscription
717      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
718      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
719      * @param noLocal         if set, inhibits the delivery of messages published by its own connection
720      * @return the Queue Browser
721      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
722      * @throws InvalidDestinationException if an invalid topic is specified.
723      * @throws InvalidSelectorException    if the message selector is invalid.
724      * @since 1.1
725      */
726     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
727             throws JMSException {
728         checkClosed();
729         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
730                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
731                 .getDurableTopicPrefetch(), noLocal, false);
732     }
733 
734     /***
735      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
736      *
737      * @param queue the <CODE>queue</CODE> to access
738      * @return the Queue Browser
739      * @throws JMSException                if the session fails to create a browser due to some internal error.
740      * @throws InvalidDestinationException if an invalid destination is specified
741      * @since 1.1
742      */
743     public QueueBrowser createBrowser(Queue queue) throws JMSException {
744         checkClosed();
745         return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue), "",
746                 this.connection.getNextConsumerNumber());
747     }
748 
749     /***
750      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
751      * selector.
752      *
753      * @param queue           the <CODE>queue</CODE> to access
754      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
755      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
756      * @return the Queue Browser
757      * @throws JMSException                if the session fails to create a browser due to some internal error.
758      * @throws InvalidDestinationException if an invalid destination is specified
759      * @throws InvalidSelectorException    if the message selector is invalid.
760      * @since 1.1
761      */
762     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
763         checkClosed();
764         return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue),
765                 messageSelector, this.connection.getNextConsumerNumber());
766     }
767 
768     /***
769      * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
770      * it is deleted earlier.
771      *
772      * @return a temporary queue identity
773      * @throws JMSException if the session fails to create a temporary queue due to some internal error.
774      * @since 1.1
775      */
776     public TemporaryQueue createTemporaryQueue() throws JMSException {
777         checkClosed();
778         String tempQueueName = "TemporaryQueue-"
779                 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
780         tempQueueName += this.temporaryDestinationGenerator.generateId();
781         return new ActiveMQTemporaryQueue(tempQueueName);
782     }
783 
784     /***
785      * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
786      * it is deleted earlier.
787      *
788      * @return a temporary topic identity
789      * @throws JMSException if the session fails to create a temporary topic due to some internal error.
790      * @since 1.1
791      */
792     public TemporaryTopic createTemporaryTopic() throws JMSException {
793         checkClosed();
794         String tempTopicName = "TemporaryTopic-"
795                 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
796         tempTopicName += this.temporaryDestinationGenerator.generateId();
797         return new ActiveMQTemporaryTopic(tempTopicName);
798     }
799 
800     /***
801      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
802      *
803      * @param queue the <CODE>Queue</CODE> to access
804      * @return @throws JMSException if the session fails to create a receiver due to some internal error.
805      * @throws JMSException
806      * @throws InvalidDestinationException if an invalid queue is specified.
807      */
808     public QueueReceiver createReceiver(Queue queue) throws JMSException {
809         checkClosed();
810         return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
811                 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
812     }
813 
814     /***
815      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
816      * selector.
817      *
818      * @param queue           the <CODE>Queue</CODE> to access
819      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
820      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
821      * @return QueueReceiver
822      * @throws JMSException                if the session fails to create a receiver due to some internal error.
823      * @throws InvalidDestinationException if an invalid queue is specified.
824      * @throws InvalidSelectorException    if the message selector is invalid.
825      */
826     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
827         checkClosed();
828         return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
829                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
830                 .getQueuePrefetch());
831     }
832 
833     /***
834      * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
835      *
836      * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
837      * @return QueueSender
838      * @throws JMSException                if the session fails to create a sender due to some internal error.
839      * @throws InvalidDestinationException if an invalid queue is specified.
840      */
841     public QueueSender createSender(Queue queue) throws JMSException {
842         checkClosed();
843         return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
844     }
845 
846     /***
847      * Creates a nondurable subscriber to the specified topic. <p/>
848      * <P>
849      * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
850      * <p/>
851      * <P>
852      * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
853      * while they are active. <p/>
854      * <P>
855      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
856      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
857      * value for this attribute is false.
858      *
859      * @param topic the <CODE>Topic</CODE> to subscribe to
860      * @return TopicSubscriber
861      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
862      * @throws InvalidDestinationException if an invalid topic is specified.
863      */
864     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
865         checkClosed();
866         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
867                 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
868                 false);
869     }
870 
871     /***
872      * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
873      * published by its own connection should be delivered to it. <p/>
874      * <P>
875      * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
876      * <p/>
877      * <P>
878      * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
879      * while they are active. <p/>
880      * <P>
881      * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
882      * subscriber's perspective, they do not exist. <p/>
883      * <P>
884      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
885      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
886      * value for this attribute is false.
887      *
888      * @param topic           the <CODE>Topic</CODE> to subscribe to
889      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
890      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
891      * @param noLocal         if set, inhibits the delivery of messages published by its own connection
892      * @return TopicSubscriber
893      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
894      * @throws InvalidDestinationException if an invalid topic is specified.
895      * @throws InvalidSelectorException    if the message selector is invalid.
896      */
897     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
898         checkClosed();
899         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
900                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
901                 .getTopicPrefetch(), noLocal, false);
902     }
903 
904     /***
905      * Creates a publisher for the specified topic. <p/>
906      * <P>
907      * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
908      * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
909      * relationship with the messages it has previously sent.
910      *
911      * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
912      * @return TopicPublisher
913      * @throws JMSException                if the session fails to create a publisher due to some internal error.
914      * @throws InvalidDestinationException if an invalid topic is specified.
915      */
916     public TopicPublisher createPublisher(Topic topic) throws JMSException {
917         checkClosed();
918         return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
919     }
920 
921     /***
922      * Unsubscribes a durable subscription that has been created by a client.
923      * <P>
924      * This method deletes the state being maintained on behalf of the subscriber by its provider.
925      * <P>
926      * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
927      * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
928      * transaction or has not been acknowledged in the session.
929      *
930      * @param name the name used to identify this subscription
931      * @throws JMSException                if the session fails to unsubscribe to the durable subscription due to some internal error.
932      * @throws InvalidDestinationException if an invalid subscription name is specified.
933      * @since 1.1
934      */
935     public void unsubscribe(String name) throws JMSException {
936         checkClosed();
937         DurableUnsubscribe ds = new DurableUnsubscribe();
938         ds.setId(this.packetIdGenerator.generateId());
939         ds.setClientId(this.connection.getClientID());
940         ds.setSubscriberName(name);
941         this.connection.syncSendPacket(ds);
942     }
943 
944     /***
945      * Tests to see if the Message Dispatcher is a target for this message
946      *
947      * @param message the message to test
948      * @return true if the Message Dispatcher can dispatch the message
949      */
950     public boolean isTarget(ActiveMQMessage message) {
951         for (Iterator i = this.consumers.iterator(); i.hasNext();) {
952             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
953             if (message.isConsumerTarget(consumer.getConsumerNumber())) {
954                 return true;
955             }
956         }
957         return false;
958     }
959 
960     /***
961      * Dispatch an ActiveMQMessage
962      *
963      * @param message
964      */
965     public void dispatch(ActiveMQMessage message) {
966         message.setMessageAcknowledge(this);
967         messageExecutor.execute(message);
968     }
969 
970     /***
971      * Acknowledges all consumed messages of the session of this consumed message.
972      * <P>
973      * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
974      * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
975      * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
976      * <P>
977      * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
978      * implicit acknowledgement modes.
979      * <P>
980      * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
981      * an application-defined group (which is done by calling acknowledge on the last received message of the group,
982      * thereby acknowledging all messages consumed by the session.)
983      * <P>
984      * Messages that have been received but not acknowledged may be redelivered.
985      *
986      * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
987      * @throws javax.jms.IllegalStateException
988      *                      if this method is called on a closed session.
989      * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
990      */
991     public void acknowledge() throws JMSException {
992         doAcknowledge(false);
993     }
994     
995     protected void doAcknowledge(boolean isClosing) throws JMSException {
996         checkClosed();
997         if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
998             ActiveMQMessage msg = null;
999             while ((msg = (ActiveMQMessage) deliveredMessages.removeFirst()) != null) {
1000                 MessageAck ack = new MessageAck();
1001                 ack.setConsumerId(msg.getConsumerId());
1002                 ack.setMessageID(msg.getJMSMessageID());
1003                 if (!isClosing){
1004                     ack.setMessageRead(msg.isMessageConsumed());
1005                 }
1006                 ack.setId(packetIdGenerator.generateId());
1007                 ack.setDestination(msg.getJMSActiveMQDestination());
1008                 ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1009                 this.connection.asyncSendPacket(ack,false);
1010             }
1011             deliveredMessages.clear();
1012         }
1013     }
1014 
1015     protected void messageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed) {
1016         if (message != null && !closed.get()) {
1017             if (isClientAcknowledge() || (isTransacted() && message.isTransientConsumed())) {
1018                 message.setMessageConsumed(messageConsumed);
1019                 deliveredMessages.add(message);
1020             }
1021             if (sendAcknowledge) {
1022                 try {
1023                     doStartTransaction();
1024                     MessageAck ack = new MessageAck();
1025                     ack.setConsumerId(message.getConsumerId());
1026                     ack.setTransactionId(this.currentTransactionId);
1027                     ack.setMessageID(message.getJMSMessageID());
1028                     ack.setMessageRead(messageConsumed);
1029                     ack.setId(packetIdGenerator.generateId());
1030                     ack.setXaTransacted(isXaTransacted());
1031                     ack.setDestination(message.getJMSActiveMQDestination());
1032                     ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1033                     this.connection.asyncSendPacket(ack);
1034                 }
1035                 catch (JMSException e) {
1036                     log.warn("failed to notify Broker that message is delivered", e);
1037                 }
1038             }
1039         }
1040     }
1041 
1042     /***
1043      * @param consumer
1044      * @throws JMSException
1045      */
1046     protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1047         // lets add the stat
1048         if (consumer.isDurableSubscriber()) {
1049             stats.onCreateDurableSubscriber();
1050         }
1051         consumer.setConsumerId(consumerIdGenerator.generateId());
1052         ConsumerInfo info = createConsumerInfo(consumer);
1053         info.setStarted(true);
1054         this.connection.syncSendPacket(info);
1055         this.consumers.add(consumer);
1056     }
1057 
1058     /***
1059      * @param consumer
1060      * @throws JMSException
1061      */
1062     protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1063         this.consumers.remove(consumer);
1064         // lets remove the stat
1065         if (consumer.isDurableSubscriber()) {
1066             stats.onRemoveDurableSubscriber();
1067         }
1068         if (!closed.get()) {
1069             ConsumerInfo info = createConsumerInfo(consumer);
1070             info.setStarted(false);
1071             this.connection.asyncSendPacket(info, false);
1072         }
1073     }
1074 
1075     protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1076         ConsumerInfo info = new ConsumerInfo();
1077         info.setConsumerId(consumer.consumerId);
1078         info.setClientId(connection.clientID);
1079         info.setSessionId(this.sessionId);
1080         info.setConsumerNo(consumer.consumerNumber);
1081         info.setPrefetchNumber(consumer.prefetchNumber);
1082         info.setDestination(consumer.destination);
1083         info.setId(this.packetIdGenerator.generateId());
1084         info.setNoLocal(consumer.noLocal);
1085         info.setBrowser(consumer.browser);
1086         info.setSelector(consumer.messageSelector);
1087         info.setStartTime(consumer.startTime);
1088         info.setConsumerName(consumer.consumerName);
1089         return info;
1090     }
1091 
1092     /***
1093      * @param producer
1094      * @throws JMSException
1095      */
1096     protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1097         producer.setProducerId(producerIdGenerator.generateId());
1098         ProducerInfo info = createProducerInfo(producer);
1099         info.setStarted(true);
1100         this.connection.syncSendPacket(info);
1101         this.producers.add(producer);
1102     }
1103 
1104     /***
1105      * @param producer
1106      * @throws JMSException
1107      */
1108     protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1109         this.producers.remove(producer);
1110         if (!closed.get()) {
1111             ProducerInfo info = createProducerInfo(producer);
1112             info.setStarted(false);
1113             this.connection.asyncSendPacket(info, false);
1114         }
1115     }
1116 
1117     protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1118         ProducerInfo info = new ProducerInfo();
1119         info.setProducerId(producer.getProducerId());
1120         info.setClientId(connection.clientID);
1121         info.setSessionId(this.sessionId);
1122         info.setDestination(producer.defaultDestination);
1123         info.setId(this.packetIdGenerator.generateId());
1124         info.setStartTime(producer.getStartTime());
1125         return info;
1126     }
1127 
1128     /***
1129      * Start this Session
1130      */
1131     protected void start() {
1132         messageExecutor.start();
1133     }
1134 
1135     /***
1136      * Stop this Session
1137      */
1138     protected void stop() {
1139         messageExecutor.stop();
1140     }
1141 
1142     /***
1143      * @return Returns the sessionId.
1144      */
1145     protected String getSessionId() {
1146         return sessionId;
1147     }
1148 
1149     /***
1150      * @param sessionId The sessionId to set.
1151      */
1152     protected void setSessionId(String sessionId) {
1153         this.sessionId = sessionId;
1154     }
1155 
1156     /***
1157      * @return Returns the startTime.
1158      */
1159     protected long getStartTime() {
1160         return startTime;
1161     }
1162 
1163     /***
1164      * @param startTime The startTime to set.
1165      */
1166     protected void setStartTime(long startTime) {
1167         this.startTime = startTime;
1168     }
1169 
1170     /***
1171      * send the message for dispatch by the broker
1172      *
1173      * @param producer
1174      * @param destination
1175      * @param message
1176      * @param deliveryMode
1177      * @param priority
1178      * @param timeToLive
1179      * @throws JMSException
1180      */
1181     protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1182                         int priority, long timeToLive, boolean reuseMessageId) throws JMSException {
1183         checkClosed();
1184         // ensure that the connection info is sent to the broker
1185         connection.sendConnectionInfoToBroker();
1186         // tell the Broker we are about to start a new transaction
1187         doStartTransaction();
1188         message.setJMSDestination(destination);
1189         message.setJMSDeliveryMode(deliveryMode);
1190         message.setJMSPriority(priority);
1191         long expiration = 0L;
1192         if (!producer.getDisableMessageTimestamp()) {
1193             long timeStamp = System.currentTimeMillis();
1194             message.setJMSTimestamp(timeStamp);
1195             if (timeToLive > 0) {
1196                 expiration = timeToLive + timeStamp;
1197             }
1198         }
1199         message.setJMSExpiration(expiration);
1200         String id = message.getJMSMessageID();
1201         if ((id == null || id.length() == 0) || !producer.getDisableMessageID() && !reuseMessageId) {
1202             message.setJMSMessageID(producer.getIdGenerator().generateId());
1203         }
1204         //transform to our own message format here
1205         ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1206         msg.prepareMessageBody();
1207         msg.setProducerID(producer.getProducerId());
1208         msg.setTransactionId(currentTransactionId);
1209         msg.setXaTransacted(isXaTransacted());
1210         msg.setJMSClientID(this.connection.clientID);
1211         msg.setJMSRedelivered(false);//could be forwarding this message on
1212         if (log.isDebugEnabled()) {
1213             log.debug("Sending message: " + msg);
1214         }
1215         // Should we use an async send?
1216         if (this.connection.isUseAsyncSend() || this.acknowledgeMode == Session.SESSION_TRANSACTED
1217                 || deliveryMode == DeliveryMode.NON_PERSISTENT) {
1218             this.connection.asyncSendPacket(msg);
1219         }
1220         else {
1221             this.connection.syncSendPacket(msg);
1222         }
1223     }
1224 
1225     /***
1226      * Send TransactionInfo to indicate transaction has started
1227      *
1228      * @throws JMSException if some internal error occurs
1229      */
1230     protected void doStartTransaction() throws JMSException {
1231         if (getTransacted()) {
1232             if (startTransaction.commit(false, true)) {
1233                 TransactionInfo info = new TransactionInfo();
1234                 info.setId(this.packetIdGenerator.generateId());
1235                 info.setTransactionId(currentTransactionId);
1236                 info.setType(TransactionInfo.START);
1237                 this.connection.asyncSendPacket(info);
1238                 // Notify the listener that the tx was started.
1239                 if (localTransactionEventListener != null) {
1240                     localTransactionEventListener.beginEvent();
1241                 }
1242             }
1243         }
1244     }
1245 
1246     /***
1247      * @return Returns the localTransactionEventListener.
1248      */
1249     public LocalTransactionEventListener getLocalTransactionEventListener() {
1250         return localTransactionEventListener;
1251     }
1252 
1253     /***
1254      * Used by the resource adapter to listen to transaction events.
1255      *
1256      * @param localTransactionEventListener The localTransactionEventListener to set.
1257      */
1258     public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
1259         this.localTransactionEventListener = localTransactionEventListener;
1260     }
1261 
1262     protected boolean isXaTransacted() {
1263         return false;
1264     }
1265 
1266     //this has a misleading name, since in subclass ActiveMQXASession it is overridden to
1267     //return "currentTransactionId"
1268     protected String getNextTransactionId() {
1269         return this.transactionIdGenerator.generateId();
1270     }
1271 
1272     protected void setSessionConsumerDispatchState(int value) throws JMSException {
1273         if (consumerDispatchState != ActiveMQSession.CONSUMER_DISPATCH_UNSET && value != consumerDispatchState) {
1274             String errorStr = "Cannot mix consumer dispatching on a session - already: ";
1275             if (value == ActiveMQSession.CONSUMER_DISPATCH_SYNC) {
1276                 errorStr += "synchronous";
1277             }
1278             else {
1279                 errorStr += "asynchronous";
1280             }
1281             throw new IllegalStateException(errorStr);
1282         }
1283         consumerDispatchState = value;
1284     }
1285 
1286     protected void redeliverUnacknowledgedMessages() {
1287         redeliverUnacknowledgedMessages(false);
1288     }
1289 
1290     protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) {
1291         messageExecutor.stop();
1292         LinkedList replay = new LinkedList();
1293         Object obj = null;
1294         while ((obj = deliveredMessages.removeFirst()) != null) {
1295             replay.add(obj);
1296         }
1297         deliveredMessages.clear();
1298         if (!replay.isEmpty()) {
1299             for (ListIterator i = replay.listIterator(replay.size()); i.hasPrevious();) {
1300                 ActiveMQMessage msg = (ActiveMQMessage) i.previous();
1301                 if (!onlyDeliverTransientConsumed || msg.isTransientConsumed()) {
1302                     msg.setJMSRedelivered(true);
1303                     messageExecutor.executeFirst(msg);
1304                 }
1305             }
1306         }
1307         replay.clear();
1308         messageExecutor.start();
1309     }
1310 
1311     protected void clearMessagesInProgress() {
1312         messageExecutor.clearMessagesInProgress();
1313         for (Iterator i = consumers.iterator(); i.hasNext();) {
1314             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1315             consumer.clearMessagesInProgress();
1316         }
1317     }
1318 
1319     protected boolean isTransacted() {
1320         return this.acknowledgeMode == Session.SESSION_TRANSACTED;
1321     }
1322 
1323     protected boolean isClientAcknowledge() {
1324         return this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
1325     }
1326 }