View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import org.codehaus.activemq.management.JMSProducerStatsImpl;
21  import org.codehaus.activemq.management.StatsCapable;
22  import org.codehaus.activemq.message.ActiveMQDestination;
23  import org.codehaus.activemq.util.IdGenerator;
24  
25  import javax.jms.DeliveryMode;
26  import javax.jms.Destination;
27  import javax.jms.IllegalStateException;
28  import javax.jms.InvalidDestinationException;
29  import javax.jms.JMSException;
30  import javax.jms.Message;
31  import javax.jms.MessageFormatException;
32  import javax.jms.MessageProducer;
33  import javax.management.j2ee.statistics.Stats;
34  
35  /***
36   * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
37   * destination. A <CODE>MessageProducer</CODE> object is created by passing a
38   * <CODE>Destination</CODE> object to a message-producer creation method
39   * supplied by a session.
40   * <P>
41   * <CODE>MessageProducer</CODE> is the parent interface for all message
42   * producers.
43   * <P>
44   * A client also has the option of creating a message producer without
45   * supplying a destination. In this case, a destination must be provided with
46   * every send operation. A typical use for this kind of message producer is to
47   * send replies to requests using the request's <CODE>JMSReplyTo</CODE>
48   * destination.
49   * <P>
50   * A client can specify a default delivery mode, priority, and time to live for
51   * messages sent by a message producer. It can also specify the delivery mode,
52   * priority, and time to live for an individual message.
53   * <P>
54   * A client can specify a time-to-live value in milliseconds for each message
55   * it sends. This value defines a message expiration time that is the sum of
56   * the message's time-to-live and the GMT when it is sent (for transacted
57   * sends, this is the time the client sends the message, not the time the
58   * transaction is committed).
59   * <P>
60   * A JMS provider should do its best to expire messages accurately; however,
61   * the JMS API does not define the accuracy provided.
62   *
63   * @version $Revision: 1.19 $
64   * @see javax.jms.TopicPublisher
65   * @see javax.jms.QueueSender
66   * @see javax.jms.Session#createProducer
67   */
68  public class ActiveMQMessageProducer implements MessageProducer, StatsCapable {
69      protected ActiveMQSession session;
70      protected String producerId;
71      private IdGenerator idGenerator;
72      protected boolean closed;
73      private boolean disableMessageID;
74      private boolean disableMessageTimestamp;
75      private int defaultDeliveryMode;
76      private int defaultPriority;
77      private long defaultTimeToLive;
78      protected ActiveMQDestination defaultDestination;
79      private long startTime;
80      private JMSProducerStatsImpl stats;
81      private boolean reuseMessageId; //hint to reuse the same messageId - if already set
82  
83      protected ActiveMQMessageProducer(ActiveMQSession theSession, ActiveMQDestination destination) throws JMSException {
84          this.session = theSession;
85          this.defaultDestination = destination;
86          this.idGenerator = new IdGenerator();
87          this.disableMessageID = false;
88          this.disableMessageTimestamp = false;
89          this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
90          this.defaultPriority = Message.DEFAULT_PRIORITY;
91          this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
92          this.startTime = System.currentTimeMillis();
93          this.session.addProducer(this);
94          this.stats = new JMSProducerStatsImpl(theSession.getSessionStats(), destination);
95      }
96  
97      public Stats getStats() {
98          return stats;
99      }
100 
101     public JMSProducerStatsImpl getProducerStats() {
102         return stats;
103     }
104 
105     /***
106      * Sets whether message IDs are disabled.
107      * <P>
108      * Since message IDs take some effort to create and increase a message's
109      * size, some JMS providers may be able to optimize message overhead if
110      * they are given a hint that the message ID is not used by an application.
111      * By calling the <CODE>setDisableMessageID</CODE> method on this message
112      * producer, a JMS client enables this potential optimization for all
113      * messages sent by this message producer. If the JMS provider accepts this
114      * hint, these messages must have the message ID set to null; if the
115      * provider ignores the hint, the message ID must be set to its normal
116      * unique value.
117      * <P>
118      * Message IDs are enabled by default.
119      *
120      * @param value indicates if message IDs are disabled
121      * @throws JMSException if the JMS provider fails to close the producer due to
122      *                      some internal error.
123      */
124     public void setDisableMessageID(boolean value) throws JMSException {
125         checkClosed();
126         this.disableMessageID = value;
127     }
128 
129     /***
130      * Gets an indication of whether message IDs are disabled.
131      *
132      * @return an indication of whether message IDs are disabled
133      * @throws JMSException if the JMS provider fails to determine if message IDs are
134      *                      disabled due to some internal error.
135      */
136     public boolean getDisableMessageID() throws JMSException {
137         checkClosed();
138         return this.disableMessageID;
139     }
140 
141     /***
142      * Sets whether message timestamps are disabled.
143      * <P>
144      * Since timestamps take some effort to create and increase a message's
145      * size, some JMS providers may be able to optimize message overhead if
146      * they are given a hint that the timestamp is not used by an application.
147      * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
148      * message producer, a JMS client enables this potential optimization for
149      * all messages sent by this message producer. If the JMS provider accepts
150      * this hint, these messages must have the timestamp set to zero; if the
151      * provider ignores the hint, the timestamp must be set to its normal
152      * value.
153      * <P>
154      * Message timestamps are enabled by default.
155      *
156      * @param value indicates if message timestamps are disabled
157      * @throws JMSException if the JMS provider fails to close the producer due to
158      *                      some internal error.
159      */
160     public void setDisableMessageTimestamp(boolean value) throws JMSException {
161         checkClosed();
162         this.disableMessageTimestamp = value;
163     }
164 
165     /***
166      * Gets an indication of whether message timestamps are disabled.
167      *
168      * @return an indication of whether message timestamps are disabled
169      * @throws JMSException if the JMS provider fails to close the producer due to
170      *                      some internal error.
171      */
172     public boolean getDisableMessageTimestamp() throws JMSException {
173         checkClosed();
174         return this.disableMessageTimestamp;
175     }
176 
177     /***
178      * Sets the producer's default delivery mode.
179      * <P>
180      * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
181      *
182      * @param newDeliveryMode the message delivery mode for this message producer; legal
183      *                        values are <code>DeliveryMode.NON_PERSISTENT</code> and
184      *                        <code>DeliveryMode.PERSISTENT</code>
185      * @throws JMSException if the JMS provider fails to set the delivery mode due to
186      *                      some internal error.
187      * @see javax.jms.MessageProducer#getDeliveryMode
188      * @see javax.jms.DeliveryMode#NON_PERSISTENT
189      * @see javax.jms.DeliveryMode#PERSISTENT
190      * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
191      */
192     public void setDeliveryMode(int newDeliveryMode) throws JMSException {
193         if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
194             throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode);
195         }
196         checkClosed();
197         this.defaultDeliveryMode = newDeliveryMode;
198     }
199 
200     /***
201      * Gets the producer's default delivery mode.
202      *
203      * @return the message delivery mode for this message producer
204      * @throws JMSException if the JMS provider fails to close the producer due to
205      *                      some internal error.
206      */
207     public int getDeliveryMode() throws JMSException {
208         checkClosed();
209         return this.defaultDeliveryMode;
210     }
211 
212     /***
213      * Sets the producer's default priority.
214      * <P>
215      * The JMS API defines ten levels of priority value, with 0 as the lowest
216      * priority and 9 as the highest. Clients should consider priorities 0-4 as
217      * gradations of normal priority and priorities 5-9 as gradations of
218      * expedited priority. Priority is set to 4 by default.
219      *
220      * @param newDefaultPriority the message priority for this message producer; must be a
221      *                           value between 0 and 9
222      * @throws JMSException if the JMS provider fails to set the delivery mode due to
223      *                      some internal error.
224      * @see javax.jms.MessageProducer#getPriority
225      * @see javax.jms.Message#DEFAULT_PRIORITY
226      */
227     public void setPriority(int newDefaultPriority) throws JMSException {
228         if (newDefaultPriority < 0 || newDefaultPriority > 9) {
229             throw new IllegalStateException("default priority must be a value between 0 and 9");
230         }
231         checkClosed();
232         this.defaultPriority = newDefaultPriority;
233     }
234 
235     /***
236      * Gets the producer's default priority.
237      *
238      * @return the message priority for this message producer
239      * @throws JMSException if the JMS provider fails to close the producer due to
240      *                      some internal error.
241      * @see javax.jms.MessageProducer#setPriority
242      */
243     public int getPriority() throws JMSException {
244         checkClosed();
245         return this.defaultPriority;
246     }
247 
248     /***
249      * Sets the default length of time in milliseconds from its dispatch time
250      * that a produced message should be retained by the message system.
251      * <P>
252      * Time to live is set to zero by default.
253      *
254      * @param timeToLive the message time to live in milliseconds; zero is unlimited
255      * @throws JMSException if the JMS provider fails to set the time to live due to
256      *                      some internal error.
257      * @see javax.jms.MessageProducer#getTimeToLive
258      * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
259      */
260     public void setTimeToLive(long timeToLive) throws JMSException {
261         if (timeToLive < 0l) {
262             throw new IllegalStateException("cannot set a negative timeToLive");
263         }
264         checkClosed();
265         this.defaultTimeToLive = timeToLive;
266     }
267 
268     /***
269      * Gets the default length of time in milliseconds from its dispatch time
270      * that a produced message should be retained by the message system.
271      *
272      * @return the message time to live in milliseconds; zero is unlimited
273      * @throws JMSException if the JMS provider fails to get the time to live due to
274      *                      some internal error.
275      * @see javax.jms.MessageProducer#setTimeToLive
276      */
277     public long getTimeToLive() throws JMSException {
278         checkClosed();
279         return this.defaultTimeToLive;
280     }
281 
282     /***
283      * Gets the destination associated with this <CODE>MessageProducer</CODE>.
284      *
285      * @return this producer's <CODE>Destination/ <CODE>
286      * @throws JMSException if the JMS provider fails to close the producer due to
287      *                      some internal error.
288      * @since 1.1
289      */
290     public Destination getDestination() throws JMSException {
291         checkClosed();
292         return this.defaultDestination;
293     }
294 
295     /***
296      * Closes the message producer.
297      * <P>
298      * Since a provider may allocate some resources on behalf of a <CODE>
299      * MessageProducer</CODE> outside the Java virtual machine, clients should
300      * close them when they are not needed. Relying on garbage collection to
301      * eventually reclaim these resources may not be timely enough.
302      *
303      * @throws JMSException if the JMS provider fails to close the producer due to
304      *                      some internal error.
305      */
306     public void close() throws JMSException {
307         this.session.removeProducer(this);
308         closed = true;
309     }
310 
311     protected void checkClosed() throws IllegalStateException {
312         if (closed) {
313             throw new IllegalStateException("The producer is closed");
314         }
315     }
316 
317     /***
318      * Sends a message using the <CODE>MessageProducer</CODE>'s default
319      * delivery mode, priority, and time to live.
320      *
321      * @param message the message to send
322      * @throws JMSException                if the JMS provider fails to send the message due to some
323      *                                     internal error.
324      * @throws MessageFormatException      if an invalid message is specified.
325      * @throws InvalidDestinationException if a client uses this method with a <CODE>
326      *                                     MessageProducer</CODE> with an invalid destination.
327      * @throws java.lang.UnsupportedOperationException
328      *                                     if a client uses this method with a <CODE>
329      *                                     MessageProducer</CODE> that did not specify a
330      *                                     destination at creation time.
331      * @see javax.jms.Session#createProducer
332      * @see javax.jms.MessageProducer
333      * @since 1.1
334      */
335     public void send(Message message) throws JMSException {
336         this.send(this.defaultDestination, message, this.defaultDeliveryMode, this.defaultPriority,
337                 this.defaultTimeToLive);
338     }
339 
340     /***
341      * Sends a message to the destination, specifying delivery mode, priority,
342      * and time to live.
343      *
344      * @param message      the message to send
345      * @param deliveryMode the delivery mode to use
346      * @param priority     the priority for this message
347      * @param timeToLive   the message's lifetime (in milliseconds)
348      * @throws JMSException                if the JMS provider fails to send the message due to some
349      *                                     internal error.
350      * @throws MessageFormatException      if an invalid message is specified.
351      * @throws InvalidDestinationException if a client uses this method with a <CODE>
352      *                                     MessageProducer</CODE> with an invalid destination.
353      * @throws java.lang.UnsupportedOperationException
354      *                                     if a client uses this method with a <CODE>
355      *                                     MessageProducer</CODE> that did not specify a
356      *                                     destination at creation time.
357      * @see javax.jms.Session#createProducer
358      * @since 1.1
359      */
360     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
361         this.send(this.defaultDestination, message, deliveryMode, priority, timeToLive);
362     }
363 
364     /***
365      * Sends a message to a destination for an unidentified message producer.
366      * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
367      * priority, and time to live.
368      * <P>
369      * Typically, a message producer is assigned a destination at creation
370      * time; however, the JMS API also supports unidentified message producers,
371      * which require that the destination be supplied every time a message is
372      * sent.
373      *
374      * @param destination the destination to send this message to
375      * @param message     the message to send
376      * @throws JMSException                if the JMS provider fails to send the message due to some
377      *                                     internal error.
378      * @throws MessageFormatException      if an invalid message is specified.
379      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
380      * @throws java.lang.UnsupportedOperationException
381      *                                     if a client uses this method with a <CODE>
382      *                                     MessageProducer</CODE> that specified a destination at
383      *                                     creation time.
384      * @see javax.jms.Session#createProducer
385      * @see javax.jms.MessageProducer
386      * @since 1.1
387      */
388     public void send(Destination destination, Message message) throws JMSException {
389         this.send(destination, message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
390     }
391 
392     /***
393      * Sends a message to a destination for an unidentified message producer,
394      * specifying delivery mode, priority and time to live.
395      * <P>
396      * Typically, a message producer is assigned a destination at creation
397      * time; however, the JMS API also supports unidentified message producers,
398      * which require that the destination be supplied every time a message is
399      * sent.
400      *
401      * @param destination  the destination to send this message to
402      * @param message      the message to send
403      * @param deliveryMode the delivery mode to use
404      * @param priority     the priority for this message
405      * @param timeToLive   the message's lifetime (in milliseconds)
406      * @throws JMSException                if the JMS provider fails to send the message due to some
407      *                                     internal error.
408      * @throws MessageFormatException      if an invalid message is specified.
409      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
410      * @see javax.jms.Session#createProducer
411      * @since 1.1
412      */
413     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
414             throws JMSException {
415         checkClosed();
416         if (destination == null) {
417             throw new InvalidDestinationException("Dont understand null destinations");
418         }
419         this.session.send(this, destination, message, deliveryMode, priority, timeToLive,reuseMessageId);
420         stats.onMessage(message);
421     }
422     
423     /***
424      * @return Returns the reuseMessageId.
425      */
426     public boolean isResuseMessageId() {
427         return reuseMessageId;
428     }
429     /***
430      * @param reuseMessageId The resuseMessageId to set.
431      */
432     public void setReuseMessageId(boolean reuseMessageId) {
433         this.reuseMessageId = reuseMessageId;
434     }
435 
436 
437     /***
438      * @return Returns the producerId.
439      */
440     protected String getProducerId() {
441         return producerId;
442     }
443 
444     /***
445      * @param producerId The producerId to set.
446      */
447     protected void setProducerId(String producerId) {
448         this.producerId = producerId;
449     }
450 
451     protected long getStartTime() {
452         return this.startTime;
453     }
454 
455     protected IdGenerator getIdGenerator() {
456         return this.idGenerator;
457     }
458 }