1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20 import java.util.List;
21 import javax.jms.DeliveryMode;
22 import javax.jms.JMSException;
23 import org.codehaus.activemq.broker.BrokerClient;
24 import org.codehaus.activemq.broker.BrokerConnector;
25 import org.codehaus.activemq.filter.Filter;
26 import org.codehaus.activemq.message.ActiveMQMessage;
27 import org.codehaus.activemq.message.BrokerInfo;
28 import org.codehaus.activemq.message.ConsumerInfo;
29 import org.codehaus.activemq.message.Packet;
30 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
31
32 /***
33 * A holder for Transient Queue consumer info and message routing
34 *
35 * @version $Revision: 1.6 $
36 */
37 public class TransientQueueSubscription extends TransientSubscription {
38 private BrokerClient client;
39 private String brokerName;
40 private String clusterName;
41 private MemoryBoundedQueue dispatchedQueue;
42
43 /***
44 * Construct the TransientQueueSubscription
45 *
46 * @param client
47 * @param dispatchedQueue
48 * @param filter
49 * @param info
50 */
51 public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, Filter filter,
52 ConsumerInfo info) {
53 super(filter, info);
54 this.client = client;
55 this.dispatchedQueue = dispatchedQueue;
56 if (client != null){
57 BrokerConnector connector = client.getBrokerConnector();
58 if (connector != null){
59 BrokerInfo bi = connector.getBrokerInfo();
60 if (bi != null){
61 this.brokerName = bi.getBrokerName();
62 this.clusterName = bi.getClusterName();
63 }
64 }
65 }
66 }
67
68 /***
69 * determines if the Subscription is interested in the message
70 *
71 * @param message
72 * @return true if this Subscription will accept the message
73 * @throws JMSException
74 */
75 public boolean isTarget(ActiveMQMessage message) throws JMSException {
76 boolean result = false;
77 if (message != null) {
78
79 if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
80 || message.isEntryBroker(brokerName)) {
81 result = filter.matches(message)
82 && (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo
83 .getDestination().isTemporary());
84 }
85 }
86 return result;
87 }
88
89 /***
90 * @return true if the consumer has capacity for more messages
91 */
92 public boolean canAcceptMessages() {
93 return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
94 }
95
96 /***
97 * Dispatch a message to the Consumer
98 *
99 * @param message
100 * @throws JMSException
101 */
102 public void doDispatch(ActiveMQMessage message) throws JMSException {
103 addDispatchedMessage(message);
104 message = message.shallowCopy();
105 message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
106 client.dispatch(message);
107 }
108
109 /***
110 * Add a dispatched message
111 *
112 * @param message
113 */
114 private void addDispatchedMessage(ActiveMQMessage message) {
115 dispatchedQueue.enqueue(message);
116 }
117
118 /***
119 * Acknowledge the receipt of a message by a consumer
120 *
121 * @param id
122 * @return the removed ActiveMQMessage with the associated id
123 */
124 public ActiveMQMessage acknowledgeMessage(String id) {
125 ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id);
126 return msg;
127 }
128
129 /***
130 * @return all the unacknowledge messages
131 */
132 public List getUndeliveredMessages() {
133 return dispatchedQueue.getContents();
134 }
135
136 /***
137 * close the subscription
138 */
139 public void close() {
140 super.close();
141 dispatchedQueue.close();
142 }
143 }