1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.transport;
20 import javax.jms.Destination;
21 import javax.jms.JMSException;
22 import javax.jms.Message;
23 import javax.jms.MessageConsumer;
24 import javax.jms.MessageListener;
25 import javax.jms.MessageProducer;
26 import javax.jms.Session;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.codehaus.activemq.ActiveMQMessageProducer;
30 import org.codehaus.activemq.message.ActiveMQDestination;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.service.Service;
33 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
34
35 /***
36 * A NetworkMessageBridge consumes messages from a remote broker and daisy chains them to the local message producer,
37 * which will pass them into the local broker for consumption
38 *
39 * @version $Revision: 1.8 $
40 */
41 class NetworkMessageBridge implements Service, MessageListener {
42 private String localBrokerName;
43 private ActiveMQMessageProducer localProducer;
44 private MessageConsumer remoteConsumer;
45 private Session localSession;
46 private Session remoteSession;
47 private boolean stopped;
48 private boolean durableTopic;
49 private ActiveMQDestination destination;
50 private SynchronizedInt referenceCount;
51 private static final Log log = LogFactory.getLog(NetworkMessageBridge.class);
52
53 /***
54 * Construct the NetworkMessageConsumer
55 */
56 public NetworkMessageBridge() {
57 this.referenceCount = new SynchronizedInt(0);
58 }
59
60 /***
61 * sett the durable value of the consumer
62 *
63 * @param durableTopic
64 */
65 public void setDurableTopic(boolean durableTopic) {
66 this.durableTopic = durableTopic;
67 }
68
69 /***
70 * @return true if a durable consumer
71 */
72 public boolean isDurableTopic() {
73 return this.durableTopic;
74 }
75
76 /***
77 * @return Returns the destination.
78 */
79 public ActiveMQDestination getDestination() {
80 return destination;
81 }
82
83 /***
84 * @param destination The destination to set.
85 */
86 public void setDestination(ActiveMQDestination destination) {
87 this.destination = destination;
88 }
89
90 /***
91 * @return Returns the localBrokerName.
92 */
93 public String getLocalBrokerName() {
94 return localBrokerName;
95 }
96
97 /***
98 * @param localBrokerName The localBrokerName to set.
99 */
100 public void setLocalBrokerName(String localBrokerName) {
101 this.localBrokerName = localBrokerName;
102 }
103
104 /***
105 * @return Returns the localSession.
106 */
107 public Session getLocalSession() {
108 return localSession;
109 }
110
111 /***
112 * @param localSession The localSession to set.
113 */
114 public void setLocalSession(Session localSession) {
115 this.localSession = localSession;
116 }
117
118 /***
119 * @return Returns the remoteSession.
120 */
121 public Session getRemoteSession() {
122 return remoteSession;
123 }
124
125 /***
126 * @param remoteSession The remoteSession to set.
127 */
128 public void setRemoteSession(Session remoteSession) {
129 this.remoteSession = remoteSession;
130 }
131
132 /***
133 * increment number of references to this consumer
134 *
135 * @return the number of references
136 */
137 public int incrementReferenceCount() {
138 return referenceCount.increment();
139 }
140
141 /***
142 * decrement number of references to this consumer
143 *
144 * @return the number of references
145 */
146 public int decrementReferenceCount() {
147 return referenceCount.decrement();
148 }
149
150 /***
151 * start the bridge
152 *
153 * @throws JMSException
154 */
155 public void start() throws JMSException {
156 localProducer = (ActiveMQMessageProducer)localSession.createProducer(destination);
157 localProducer.setReuseMessageId(true);
158 if (isDurableTopic()) {
159 String subsName = destination.toString() + "@" + localBrokerName;
160 remoteConsumer = remoteSession.createDurableSubscriber((javax.jms.Topic) destination, subsName);
161 }
162 else {
163 remoteConsumer = remoteSession.createConsumer(destination);
164 }
165 remoteConsumer.setMessageListener(this);
166 }
167
168 /***
169 * stop the bridge
170 */
171 public void stop() {
172 if (!stopped) {
173 stopped = true;
174 referenceCount.set(0);
175 try {
176 localSession.close();
177 remoteSession.close();
178 }
179 catch (JMSException jmsEx) {
180 log.warn("failure in stopping the message bridge", jmsEx);
181 }
182 }
183 }
184
185 /***
186 * @param msg consumed message from remote broker
187 */
188 public void onMessage(Message msg) {
189 try {
190 if (!stopped) {
191 ActiveMQMessage message = (ActiveMQMessage) msg;
192 if (message != null) {
193 message = message.shallowCopy();
194 message.addBrokerVisited(localBrokerName);
195 Destination destination = message.getJMSDestination();
196 int deliveryMode = message.getJMSDeliveryMode();
197 int priority = message.getJMSPriority();
198 long timeToLive = message.getJMSExpiration() - msg.getJMSTimestamp();
199 localProducer.send(destination,message,deliveryMode,priority,timeToLive);
200
201 msg.acknowledge();
202 }
203 }
204 }
205 catch (JMSException jmsEx) {
206 log.error("NetworkMessageConsumer failed", jmsEx);
207 stop();
208 }
209 }
210
211 /***
212 * @return hash code for this object
213 */
214 public int hashCode() {
215 return destination.hashCode();
216 }
217
218 /***
219 * @param obj
220 * @return true if the obj and this are equal
221 */
222 public boolean equals(Object obj) {
223 boolean result = false;
224 if (obj != null && obj instanceof NetworkMessageBridge) {
225 NetworkMessageBridge other = (NetworkMessageBridge) obj;
226 result = this.destination.equals(other.destination) && this.isDurableTopic() == other.isDurableTopic();
227 }
228 return result;
229 }
230 }