1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.broker.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.broker.BrokerClient;
23 import org.codehaus.activemq.broker.BrokerConnector;
24 import org.codehaus.activemq.broker.BrokerContainer;
25 import org.codehaus.activemq.message.*;
26 import org.codehaus.activemq.transport.TransportChannel;
27 import org.codehaus.activemq.transport.TransportChannelListener;
28 import org.codehaus.activemq.transport.TransportServerChannel;
29 import org.codehaus.activemq.transport.TransportServerChannelProvider;
30
31 import javax.jms.JMSException;
32 import javax.jms.JMSSecurityException;
33 import javax.transaction.xa.XAException;
34 import java.net.URI;
35 import java.net.URISyntaxException;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.Map;
39
40 /***
41 * An implementation of the broker (the JMS server)
42 *
43 * @version $Revision: 1.13 $
44 */
45 public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
46 private BrokerInfo brokerInfo;
47
48 private TransportServerChannel serverChannel;
49 private Log log;
50 private BrokerContainer container;
51 private Map clients = Collections.synchronizedMap(new HashMap());
52
53 /***
54 * Helper constructor for TCP protocol with the given bind address
55 *
56 * @param container
57 * @param bindAddress
58 * @throws JMSException
59 */
60 public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
61 this(container, createTransportServerChannel(wireFormat, bindAddress));
62 }
63
64 /***
65 * @param container
66 * @param serverChannel
67 */
68 public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
69 assert container != null;
70 this.brokerInfo = new BrokerInfo();
71 this.brokerInfo.setBrokerName(container.getBroker().getBrokerName());
72 this.brokerInfo.setClusterName(container.getBroker().getBrokerClusterName());
73 this.log = LogFactory.getLog(getClass().getName());
74 this.serverChannel = serverChannel;
75 this.container = container;
76 this.container.addConnector(this);
77 serverChannel.setTransportChannelListener(this);
78 }
79
80 /***
81 * @return infomation about the Broker
82 */
83 public BrokerInfo getBrokerInfo() {
84 return brokerInfo;
85 }
86
87 /***
88 * Get a hint about the broker capacity for more messages
89 *
90 * @return percentage value (0-100) about how much capacity the
91 * broker has
92 */
93 public int getBrokerCapacity() {
94 return container.getBroker().getRoundedCapacity();
95 }
96
97 /***
98 * @return Get the server channel
99 */
100 public TransportServerChannel getServerChannel() {
101 return serverChannel;
102 }
103
104 /***
105 * start the Broker
106 *
107 * @throws JMSException
108 */
109 public void start() throws JMSException {
110 this.serverChannel.start();
111 log.info("ActiveMQ connector started: " + serverChannel);
112 }
113
114 /***
115 * Stop the Broker
116 *
117 * @throws JMSException
118 */
119 public void stop() throws JMSException {
120 this.container.removeConnector(this);
121 this.serverChannel.stop();
122 log.info("ActiveMQ connector stopped: " + serverChannel);
123 }
124
125 /***
126 * Register a Broker Client
127 *
128 * @param client
129 * @param info contains infomation about the Connection this Client represents
130 * @throws JMSException
131 * @throws javax.jms.InvalidClientIDException
132 * if the JMS client specifies an invalid or duplicate client ID.
133 * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
134 */
135 public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
136 this.container.registerConnection(client, info);
137 }
138
139 /***
140 * Deregister a Broker Client
141 *
142 * @param client
143 * @param info
144 * @throws JMSException if some internal error occurs
145 */
146 public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
147 this.container.deregisterConnection(client, info);
148 }
149
150 /***
151 * Registers a MessageConsumer
152 *
153 * @param client
154 * @param info
155 * @throws JMSException
156 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
157 */
158 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
159 if (info.getDestination() == null) {
160 throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
161 }
162 this.container.registerMessageConsumer(client, info);
163
164 }
165
166 /***
167 * De-register a MessageConsumer from the Broker
168 *
169 * @param client
170 * @param info
171 * @throws JMSException
172 */
173 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
174 this.container.deregisterMessageConsumer(client, info);
175 }
176
177 /***
178 * Registers a MessageProducer
179 *
180 * @param client
181 * @param info
182 * @throws JMSException
183 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
184 */
185 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
186 this.container.registerMessageProducer(client, info);
187 }
188
189 /***
190 * De-register a MessageProducer from the Broker
191 *
192 * @param client
193 * @param info
194 * @throws JMSException
195 */
196 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
197 this.container.deregisterMessageProducer(client, info);
198 }
199
200 /***
201 * Register a client-side Session (used for Monitoring)
202 *
203 * @param client
204 * @param info
205 * @throws JMSException
206 */
207 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
208 this.container.registerSession(client, info);
209 }
210
211 /***
212 * De-register a client-side Session from the Broker (used for monitoring)
213 *
214 * @param client
215 * @param info
216 * @throws JMSException
217 */
218 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
219 this.container.deregisterSession(client, info);
220 }
221
222 /***
223 * Start a transaction from the Client session
224 *
225 * @param client
226 * @param transactionId
227 * @throws JMSException
228 */
229 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
230 this.container.startTransaction(client, transactionId);
231 }
232
233 /***
234 * Rollback a transacton
235 *
236 * @param client
237 * @param transactionId
238 * @throws JMSException
239 */
240 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
241 this.container.rollbackTransaction(client, transactionId);
242 }
243
244 /***
245 * Commit a transaction
246 *
247 * @param client
248 * @param transactionId
249 * @throws JMSException
250 */
251 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
252 this.container.commitTransaction(client, transactionId);
253 }
254
255 /***
256 * send message with a transaction context
257 *
258 * @param client
259 * @param transactionId
260 * @param message
261 * @throws JMSException
262 */
263 public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
264 throws JMSException {
265 this.container.sendTransactedMessage(client, transactionId, message);
266 }
267
268 /***
269 * Acknowledge receipt of a message within a transaction context
270 *
271 * @param client
272 * @param transactionId
273 * @param ack
274 * @throws JMSException
275 */
276 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
277 throws JMSException {
278 this.container.acknowledgeTransactedMessage(client, transactionId, ack);
279 }
280
281 /***
282 * Send a non-transacted message to the Broker
283 *
284 * @param client
285 * @param message
286 * @throws JMSException
287 */
288 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
289 this.container.sendMessage(client, message);
290 }
291
292 /***
293 * Acknowledge reciept of a message
294 *
295 * @param client
296 * @param ack
297 * @throws JMSException
298 */
299 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
300 this.container.acknowledgeMessage(client, ack);
301 }
302
303 /***
304 * Command to delete a durable topic subscription
305 *
306 * @param client
307 * @param ds
308 * @throws JMSException
309 */
310 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
311 this.container.durableUnsubscribe(client, ds);
312 }
313
314
315 /***
316 * @param channel - client to add
317 */
318 public void addClient(TransportChannel channel) {
319 try {
320 BrokerClient client = new BrokerClientImpl();
321 client.initialize(this, channel);
322 if (log.isDebugEnabled()) {
323 log.debug("Starting new client: " + client);
324 }
325 channel.setServerSide(true);
326 channel.start();
327 clients.put(channel, client);
328 }
329 catch (JMSException e) {
330 log.error("Failed to add client due to: " + e, e);
331 }
332 }
333
334 /***
335 * @param channel - client to remove
336 */
337 public void removeClient(TransportChannel channel) {
338 BrokerClient client = (BrokerClient) clients.remove(channel);
339 if (client != null) {
340 if (log.isDebugEnabled()) {
341 log.debug("Client leaving client: " + client);
342 }
343
344
345 client.cleanUp();
346 }
347 else {
348
349 log.warn("No such client for channel: " + channel);
350 }
351 }
352
353 /***
354 * @return the BrokerContainer for this Connector
355 */
356 public BrokerContainer getBrokerContainer() {
357 return this.container;
358 }
359
360 /***
361 * Start an XA transaction.
362 *
363 * @see org.codehaus.activemq.broker.BrokerConnector#startTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
364 */
365 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
366 this.container.startTransaction(client, xid);
367 }
368
369 /***
370 * Gets the prepared XA transactions.
371 *
372 * @see org.codehaus.activemq.broker.BrokerConnector#getPreparedTransactions(org.codehaus.activemq.broker.BrokerClient)
373 */
374 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
375 return this.container.getPreparedTransactions(client);
376 }
377
378 /***
379 * Prepare an XA transaction.
380 *
381 * @see org.codehaus.activemq.broker.BrokerConnector#prepareTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
382 */
383 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
384 return this.container.prepareTransaction(client, xid);
385 }
386
387 /***
388 * Rollback an XA transaction.
389 *
390 * @see org.codehaus.activemq.broker.BrokerConnector#rollbackTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
391 */
392 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
393 this.container.rollbackTransaction(client, xid);
394 }
395
396 /***
397 * Commit an XA transaction.
398 *
399 * @see org.codehaus.activemq.broker.BrokerConnector#commitTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid, boolean)
400 */
401 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
402 this.container.commitTransaction(client, xid, onePhase);
403 }
404
405 /***
406 * @see org.codehaus.activemq.broker.BrokerConnector#getResourceManagerId(org.codehaus.activemq.broker.BrokerClient)
407 */
408 public String getResourceManagerId(BrokerClient client) {
409
410 return getBrokerInfo().getBrokerName();
411 }
412
413
414
415
416 /***
417 * Factory method ot create a transport channel
418 *
419 * @param bindAddress
420 * @return @throws JMSException
421 * @throws JMSException
422 */
423 protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
424 URI url;
425 try {
426 url = new URI(bindAddress);
427 }
428 catch (URISyntaxException e) {
429 JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
430 jmsEx.setLinkedException(e);
431 throw jmsEx;
432 }
433 return TransportServerChannelProvider.create(wireFormat, url);
434 }
435
436 }