View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.boundedvm;
20  import java.util.List;
21  import javax.jms.DeliveryMode;
22  import javax.jms.JMSException;
23  import org.codehaus.activemq.broker.BrokerClient;
24  import org.codehaus.activemq.broker.BrokerConnector;
25  import org.codehaus.activemq.filter.Filter;
26  import org.codehaus.activemq.message.ActiveMQMessage;
27  import org.codehaus.activemq.message.BrokerInfo;
28  import org.codehaus.activemq.message.ConsumerInfo;
29  import org.codehaus.activemq.message.Packet;
30  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
31  
32  /***
33   * A holder for Transient Queue consumer info and message routing
34   * 
35   * @version $Revision: 1.6 $
36   */
37  public class TransientQueueSubscription extends TransientSubscription {
38      private BrokerClient client;
39      private String brokerName;
40      private String clusterName;
41      private MemoryBoundedQueue dispatchedQueue;
42  
43      /***
44       * Construct the TransientQueueSubscription
45       * 
46       * @param client
47       * @param dispatchedQueue
48       * @param filter
49       * @param info
50       */
51      public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, Filter filter,
52              ConsumerInfo info) {
53          super(filter, info);
54          this.client = client;
55          this.dispatchedQueue = dispatchedQueue;
56          if (client != null){
57              BrokerConnector connector = client.getBrokerConnector();
58              if (connector != null){
59                  BrokerInfo bi = connector.getBrokerInfo();
60                  if (bi != null){
61                      this.brokerName = bi.getBrokerName();
62                      this.clusterName = bi.getClusterName();
63                  }
64              }
65          }
66      }
67  
68      /***
69       * determines if the Subscription is interested in the message
70       * 
71       * @param message
72       * @return true if this Subscription will accept the message
73       * @throws JMSException
74       */
75      public boolean isTarget(ActiveMQMessage message) throws JMSException {
76          boolean result = false;
77          if (message != null) {
78              //make sure we don't loop messages around the cluster
79              if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
80                      || message.isEntryBroker(brokerName)) {
81                  result = filter.matches(message)
82                          && (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo
83                                  .getDestination().isTemporary());
84              }
85          }
86          return result;
87      }
88  
89      /***
90       * @return true if the consumer has capacity for more messages
91       */
92      public boolean canAcceptMessages() {
93          return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
94      }
95  
96      /***
97       * Dispatch a message to the Consumer
98       * 
99       * @param message
100      * @throws JMSException
101      */
102     public void doDispatch(ActiveMQMessage message) throws JMSException {
103         addDispatchedMessage(message);
104         message = message.shallowCopy();
105         message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
106         client.dispatch(message);
107     }
108 
109     /***
110      * Add a dispatched message
111      * 
112      * @param message
113      */
114     private void addDispatchedMessage(ActiveMQMessage message) {
115         dispatchedQueue.enqueue(message);
116     }
117 
118     /***
119      * Acknowledge the receipt of a message by a consumer
120      * 
121      * @param id
122      * @return the removed ActiveMQMessage with the associated id
123      */
124     public ActiveMQMessage acknowledgeMessage(String id) {
125         ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id);
126         return msg;
127     }
128 
129     /***
130      * @return all the unacknowledge messages
131      */
132     public List getUndeliveredMessages() {
133         return dispatchedQueue.getContents();
134     }
135 
136     /***
137      * close the subscription
138      */
139     public void close() {
140         super.close();
141         dispatchedQueue.close();
142     }
143 }