View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport;
20  import javax.jms.Destination;
21  import javax.jms.JMSException;
22  import javax.jms.Message;
23  import javax.jms.MessageConsumer;
24  import javax.jms.MessageListener;
25  import javax.jms.MessageProducer;
26  import javax.jms.Session;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.codehaus.activemq.ActiveMQMessageProducer;
30  import org.codehaus.activemq.message.ActiveMQDestination;
31  import org.codehaus.activemq.message.ActiveMQMessage;
32  import org.codehaus.activemq.service.Service;
33  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
34  
35  /***
36   * A NetworkMessageBridge consumes messages from a remote broker and daisy chains them to the local message producer,
37   * which will pass them into the local broker for consumption
38   * 
39   * @version $Revision: 1.8 $
40   */
41  class NetworkMessageBridge implements Service, MessageListener {
42      private String localBrokerName;
43      private ActiveMQMessageProducer localProducer;
44      private MessageConsumer remoteConsumer;
45      private Session localSession;
46      private Session remoteSession;
47      private boolean stopped;
48      private boolean durableTopic;
49      private ActiveMQDestination destination;
50      private SynchronizedInt referenceCount;
51      private static final Log log = LogFactory.getLog(NetworkMessageBridge.class);
52  
53      /***
54       * Construct the NetworkMessageConsumer
55       */
56      public NetworkMessageBridge() {
57          this.referenceCount = new SynchronizedInt(0);
58      }
59  
60      /***
61       * sett the durable value of the consumer
62       * 
63       * @param durableTopic
64       */
65      public void setDurableTopic(boolean durableTopic) {
66          this.durableTopic = durableTopic;
67      }
68  
69      /***
70       * @return true if a durable consumer
71       */
72      public boolean isDurableTopic() {
73          return this.durableTopic;
74      }
75  
76      /***
77       * @return Returns the destination.
78       */
79      public ActiveMQDestination getDestination() {
80          return destination;
81      }
82  
83      /***
84       * @param destination The destination to set.
85       */
86      public void setDestination(ActiveMQDestination destination) {
87          this.destination = destination;
88      }
89  
90      /***
91       * @return Returns the localBrokerName.
92       */
93      public String getLocalBrokerName() {
94          return localBrokerName;
95      }
96  
97      /***
98       * @param localBrokerName The localBrokerName to set.
99       */
100     public void setLocalBrokerName(String localBrokerName) {
101         this.localBrokerName = localBrokerName;
102     }
103 
104     /***
105      * @return Returns the localSession.
106      */
107     public Session getLocalSession() {
108         return localSession;
109     }
110 
111     /***
112      * @param localSession The localSession to set.
113      */
114     public void setLocalSession(Session localSession) {
115         this.localSession = localSession;
116     }
117 
118     /***
119      * @return Returns the remoteSession.
120      */
121     public Session getRemoteSession() {
122         return remoteSession;
123     }
124 
125     /***
126      * @param remoteSession The remoteSession to set.
127      */
128     public void setRemoteSession(Session remoteSession) {
129         this.remoteSession = remoteSession;
130     }
131 
132     /***
133      * increment number of references to this consumer
134      * 
135      * @return the number of references
136      */
137     public int incrementReferenceCount() {
138         return referenceCount.increment();
139     }
140 
141     /***
142      * decrement number of references to this consumer
143      * 
144      * @return the number of references
145      */
146     public int decrementReferenceCount() {
147         return referenceCount.decrement();
148     }
149 
150     /***
151      * start the bridge
152      * 
153      * @throws JMSException
154      */
155     public void start() throws JMSException {
156         localProducer = (ActiveMQMessageProducer)localSession.createProducer(destination);
157         localProducer.setReuseMessageId(true);
158         if (isDurableTopic()) {
159             String subsName = destination.toString() + "@" + localBrokerName;
160             remoteConsumer = remoteSession.createDurableSubscriber((javax.jms.Topic) destination, subsName);
161         }
162         else {
163             remoteConsumer = remoteSession.createConsumer(destination);
164         }
165         remoteConsumer.setMessageListener(this);
166     }
167 
168     /***
169      * stop the bridge
170      */
171     public void stop() {
172         if (!stopped) {
173             stopped = true;
174             referenceCount.set(0);
175             try {
176                 localSession.close();
177                 remoteSession.close();
178             }
179             catch (JMSException jmsEx) {
180                 log.warn("failure in stopping the message bridge", jmsEx);
181             }
182         }
183     }
184 
185     /***
186      * @param msg consumed message from remote broker
187      */
188     public void onMessage(Message msg) {
189         try {
190             if (!stopped) {
191                 ActiveMQMessage message = (ActiveMQMessage) msg;
192                 if (message != null) {
193                     message = message.shallowCopy();
194                     message.addBrokerVisited(localBrokerName);
195                     Destination destination = message.getJMSDestination();
196                     int deliveryMode = message.getJMSDeliveryMode();
197                     int priority = message.getJMSPriority();
198                     long timeToLive = message.getJMSExpiration() - msg.getJMSTimestamp();
199                     localProducer.send(destination,message,deliveryMode,priority,timeToLive);
200                     //acknowledge the original
201                     msg.acknowledge();
202                 }
203             }
204         }
205         catch (JMSException jmsEx) {
206             log.error("NetworkMessageConsumer failed", jmsEx);
207             stop();
208         }
209     }
210 
211     /***
212      * @return hash code for this object
213      */
214     public int hashCode() {
215         return destination.hashCode();
216     }
217 
218     /***
219      * @param obj
220      * @return true if the obj and this are equal
221      */
222     public boolean equals(Object obj) {
223         boolean result = false;
224         if (obj != null && obj instanceof NetworkMessageBridge) {
225             NetworkMessageBridge other = (NetworkMessageBridge) obj;
226             result = this.destination.equals(other.destination) && this.isDurableTopic() == other.isDurableTopic();
227         }
228         return result;
229     }
230 }