View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.management.JMSConsumerStatsImpl;
23  import org.codehaus.activemq.management.StatsCapable;
24  import org.codehaus.activemq.message.ActiveMQDestination;
25  import org.codehaus.activemq.message.ActiveMQMessage;
26  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
27  import org.codehaus.activemq.selector.SelectorParser;
28  
29  import javax.jms.IllegalStateException;
30  import javax.jms.InvalidDestinationException;
31  import javax.jms.JMSException;
32  import javax.jms.Message;
33  import javax.jms.MessageConsumer;
34  import javax.jms.MessageListener;
35  import javax.management.j2ee.statistics.Stats;
36  
37  /***
38   * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
39   * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
40   * creation method supplied by a session.
41   * <P>
42   * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
43   * <P>
44   * A message consumer can be created with a message selector. A message selector allows the client to restrict the
45   * messages delivered to the message consumer to those that match the selector.
46   * <P>
47   * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
48   * them as they arrive.
49   * <P>
50   * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
51   * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
52   * for the next message.
53   * <P>
54   * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
55   * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
56   * onMessage</CODE> method.
57   * <P>
58   * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
59   *
60   * @version $Revision: 1.35 $
61   * @see javax.jms.MessageConsumer
62   * @see javax.jms.QueueReceiver
63   * @see javax.jms.TopicSubscriber
64   * @see javax.jms.Session
65   */
66  public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable {
67      private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
68      protected ActiveMQSession session;
69      protected String consumerId;
70      protected MemoryBoundedQueue messageQueue;
71      protected String messageSelector;
72      private MessageListener messageListener;
73      protected String consumerName;
74      protected ActiveMQDestination destination;
75      private boolean closed;
76      protected int consumerNumber;
77      protected int prefetchNumber;
78      protected long startTime;
79      protected boolean noLocal;
80      protected boolean browser;
81      private Thread accessThread;
82      private Object messageListenerGuard;
83      private JMSConsumerStatsImpl stats;
84  
85      /***
86       * Create a MessageConsumer
87       *
88       * @param theSession
89       * @param dest
90       * @param name
91       * @param selector
92       * @param cnum
93       * @param prefetch
94       * @param noLocalValue
95       * @param browserValue
96       * @throws JMSException
97       */
98      protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
99                                        String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
100         if (dest == null) {
101             throw new InvalidDestinationException("Do not understand a null destination");
102         }
103         if (dest.isTemporary()) {
104             //validate that the destination comes from this Connection
105             String physicalName = dest.getPhysicalName();
106             if (physicalName == null) {
107                 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
108             }
109             String clientID = theSession.connection.getInitializedClientID();
110             if (physicalName.indexOf(clientID) < 0) {
111                 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
112             }
113         }
114         if (selector != null) {
115             selector = selector.trim();
116             if (selector.length() > 0) {
117                 // Validate that the selector
118                 new SelectorParser().parse(selector);
119             }
120         }
121         this.session = theSession;
122         this.destination = dest;
123         this.consumerName = name;
124         this.messageSelector = selector;
125 
126         this.consumerNumber = cnum;
127         this.prefetchNumber = prefetch;
128         this.noLocal = noLocalValue;
129         this.browser = browserValue;
130         this.startTime = System.currentTimeMillis();
131         this.messageListenerGuard = new Object();
132         String queueName = theSession.connection.clientID + ":" + name;
133         queueName += ":" + cnum;
134         this.messageQueue = theSession.connection.getMemoryBoundedQueue(queueName);
135         this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
136         this.session.addConsumer(this);
137     }
138 
139     /***
140      * @return Stats for this MessageConsumer
141      */
142     public Stats getStats() {
143         return stats;
144     }
145 
146     /***
147      * @return Stats for this MessageConsumer
148      */
149     public JMSConsumerStatsImpl getConsumerStats() {
150         return stats;
151     }
152 
153     /***
154      * @return pretty print of this consumer
155      */
156     public String toString() {
157         return "MessageConsumer: " + consumerId;
158     }
159 
160     /***
161      * @return Returns the prefetchNumber.
162      */
163     public int getPrefetchNumber() {
164         return prefetchNumber;
165     }
166 
167     /***
168      * @param prefetchNumber The prefetchNumber to set.
169      */
170     public void setPrefetchNumber(int prefetchNumber) {
171         this.prefetchNumber = prefetchNumber;
172     }
173 
174     /***
175      * Gets this message consumer's message selector expression.
176      *
177      * @return this message consumer's message selector, or null if no message selector exists for the message consumer
178      *         (that is, if the message selector was not set or was set to null or the empty string)
179      * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
180      */
181     public String getMessageSelector() throws JMSException {
182         checkClosed();
183         return this.messageSelector;
184     }
185 
186     /***
187      * Gets the message consumer's <CODE>MessageListener</CODE>.
188      *
189      * @return the listener for the message consumer, or null if no listener is set
190      * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
191      * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
192      */
193     public MessageListener getMessageListener() throws JMSException {
194         checkClosed();
195         return this.messageListener;
196     }
197 
198     /***
199      * Sets the message consumer's <CODE>MessageListener</CODE>.
200      * <P>
201      * Setting the message listener to null is the equivalent of unsetting the message listener for the message
202      * consumer.
203      * <P>
204      * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
205      * existing listener or the consumer is being used to consume messages synchronously is undefined.
206      *
207      * @param listener the listener to which the messages are to be delivered
208      * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
209      * @see javax.jms.MessageConsumer#getMessageListener()
210      */
211     public void setMessageListener(MessageListener listener) throws JMSException {
212         checkClosed();
213         this.messageListener = listener;
214         if (listener != null) {
215             session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC);
216         }
217     }
218 
219     /***
220      * Receives the next message produced for this message consumer.
221      * <P>
222      * This call blocks indefinitely until a message is produced or until this message consumer is closed.
223      * <P>
224      * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
225      * transaction commits.
226      *
227      * @return the next message produced for this message consumer, or null if this message consumer is concurrently
228      *         closed
229      * @throws JMSException
230      */
231     public Message receive() throws JMSException {
232         checkClosed();
233         session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
234         try {
235             this.accessThread = Thread.currentThread();
236             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
237             this.accessThread = null;
238             if (message != null) {
239                 messageDelivered(message, true);
240                 message = message.shallowCopy();
241             }
242             return message;
243         }
244         catch (InterruptedException ioe) {
245             return null;
246         }
247     }
248 
249     /***
250      * Receives the next message that arrives within the specified timeout interval.
251      * <P>
252      * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
253      * timeout</CODE> of zero never expires, and the call blocks indefinitely.
254      *
255      * @param timeout the timeout value (in milliseconds)
256      * @return the next message produced for this message consumer, or null if the timeout expires or this message
257      *         consumer is concurrently closed
258      * @throws JMSException
259      */
260     public Message receive(long timeout) throws JMSException {
261         checkClosed();
262         session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
263         try {
264             if (timeout == 0) {
265                 return this.receive();
266             }
267             this.accessThread = Thread.currentThread();
268             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
269             this.accessThread = null;
270             if (message != null) {
271                 messageDelivered(message, true);
272                 message = message.shallowCopy();
273             }
274             return message;
275         }
276         catch (InterruptedException ioe) {
277             return null;
278         }
279     }
280 
281     /***
282      * Receives the next message if one is immediately available.
283      *
284      * @return the next message produced for this message consumer, or null if one is not available
285      * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
286      */
287     public Message receiveNoWait() throws JMSException {
288         checkClosed();
289         session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
290         try {
291             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeueNoWait();
292             if (message != null) {
293                 messageDelivered(message, true);
294                 return message.shallowCopy();
295             }
296         }
297         catch (InterruptedException ioe) {
298             throw new JMSException("Queue is interrupted: " + ioe.getMessage());
299         }
300         return null;
301     }
302 
303     /***
304      * Closes the message consumer.
305      * <P>
306      * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
307      * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
308      * reclaim these resources may not be timely enough.
309      * <P>
310      * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
311      * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
312      *
313      * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
314      */
315     public void close() throws JMSException {
316         try {
317             this.accessThread.interrupt();
318         }
319         catch (NullPointerException npe) {
320         }
321         catch (SecurityException se) {
322         }
323         this.session.removeConsumer(this);
324         messageQueue.close();
325         closed = true;
326     }
327 
328     /***
329      * @return true if this is a durable topic subscriber
330      */
331     public boolean isDurableSubscriber() {
332         return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
333     }
334 
335     /***
336      * @throws IllegalStateException
337      */
338     protected void checkClosed() throws IllegalStateException {
339         if (closed) {
340             throw new IllegalStateException("The Consumer is closed");
341         }
342     }
343 
344     /***
345      * Process a Message - passing either to the queue or message listener
346      *
347      * @param message
348      */
349     protected void processMessage(ActiveMQMessage message) {
350         message.setConsumerId(this.consumerId);
351         MessageListener listener = null;
352         synchronized (messageListenerGuard) {
353             listener = this.messageListener;
354         }
355         try {
356             if (!closed) {
357                 if (listener != null) {
358                     listener.onMessage(message.shallowCopy());
359                     messageDelivered(message, true);
360                 }
361                 else {
362                     this.messageQueue.enqueue(message);
363                 }
364             }
365             else {
366                 messageDelivered(message, false);
367             }
368         }
369         catch (Exception e) {
370             log.warn("could not process message: " + message, e);
371 
372             messageDelivered(message, false);
373 
374             // TODO should we use a dead letter queue?
375         }
376     }
377 
378     /***
379      * @return Returns the consumerId.
380      */
381     protected String getConsumerId() {
382         return consumerId;
383     }
384 
385     /***
386      * @param consumerId The consumerId to set.
387      */
388     protected void setConsumerId(String consumerId) {
389         this.consumerId = consumerId;
390     }
391 
392     /***
393      * @return the consumer name - used for durable consumers
394      */
395     protected String getConsumerName() {
396         return this.consumerName;
397     }
398 
399     /***
400      * Set the name of the Consumer - used for durable subscribers
401      *
402      * @param value
403      */
404     protected void setConsumerName(String value) {
405         this.consumerName = value;
406     }
407 
408     /***
409      * @return the locally unique Consumer Number
410      */
411     protected int getConsumerNumber() {
412         return this.consumerNumber;
413     }
414 
415     /***
416      * Set the locally unique consumer number
417      *
418      * @param value
419      */
420     protected void setConsumerNumber(int value) {
421         this.consumerNumber = value;
422     }
423 
424     /***
425      * @return true if this consumer does not accept locally produced messages
426      */
427     protected boolean isNoLocal() {
428         return this.noLocal;
429     }
430 
431     /***
432      * Retrive is a browser
433      *
434      * @return true if a browser
435      */
436     protected boolean isBrowser() {
437         return this.browser;
438     }
439 
440     /***
441      * Set true if only a Browser
442      *
443      * @param value
444      * @see ActiveMQQueueBrowser
445      */
446     protected void setBrowser(boolean value) {
447         this.browser = value;
448     }
449 
450     /***
451      * @return ActiveMQDestination
452      */
453     protected ActiveMQDestination getDestination() {
454         return this.destination;
455     }
456 
457     /***
458      * @return the startTime
459      */
460     protected long getStartTime() {
461         return startTime;
462     }
463 
464     protected void clearMessagesInProgress() {
465         messageQueue.clear();
466     }
467 
468     private void messageDelivered(ActiveMQMessage message, boolean messageRead) {
469         boolean read = browser ? false : messageRead;
470         if (message != null) {
471             message.setTransientConsumed(!isDurableSubscriber() && message.getJMSActiveMQDestination().isTopic());
472             this.session.messageDelivered((isDurableSubscriber() || destination.isQueue()), message, read);
473             if (messageRead) {
474                 stats.onMessage(message);
475             }
476         }
477     }
478 }