1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.broker.impl;
20 import java.io.IOException;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Set;
24 import javax.jms.ExceptionListener;
25 import javax.jms.JMSException;
26 import javax.transaction.xa.XAException;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.codehaus.activemq.broker.BrokerClient;
30 import org.codehaus.activemq.broker.BrokerConnector;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.message.ActiveMQXid;
33 import org.codehaus.activemq.message.BrokerInfo;
34 import org.codehaus.activemq.message.CapacityInfo;
35 import org.codehaus.activemq.message.ConnectionInfo;
36 import org.codehaus.activemq.message.ConsumerInfo;
37 import org.codehaus.activemq.message.DurableUnsubscribe;
38 import org.codehaus.activemq.message.IntResponseReceipt;
39 import org.codehaus.activemq.message.MessageAck;
40 import org.codehaus.activemq.message.Packet;
41 import org.codehaus.activemq.message.PacketListener;
42 import org.codehaus.activemq.message.ProducerInfo;
43 import org.codehaus.activemq.message.Receipt;
44 import org.codehaus.activemq.message.ResponseReceipt;
45 import org.codehaus.activemq.message.SessionInfo;
46 import org.codehaus.activemq.message.TransactionInfo;
47 import org.codehaus.activemq.message.XATransactionInfo;
48 import org.codehaus.activemq.message.util.BoundedPacketQueue;
49 import org.codehaus.activemq.message.util.SpooledBoundedPacketQueue;
50 import org.codehaus.activemq.transport.TransportChannel;
51 import org.codehaus.activemq.util.IdGenerator;
52 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
53 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
54 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
55
56 /***
57 * A Broker client side proxy representing a JMS Connnection
58 *
59 * @version $Revision: 1.25 $
60 */
61 public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
62 private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
63 private BrokerConnector brokerConnector;
64 private TransportChannel channel;
65 private ConnectionInfo connectionInfo;
66 private IdGenerator packetIdGenerator;
67 private SynchronizedBoolean closed;
68 private Set activeConsumers;
69 private CopyOnWriteArrayList consumers;
70 private CopyOnWriteArrayList producers;
71 private CopyOnWriteArrayList transactions;
72 private CopyOnWriteArrayList xatransactions;
73 private CopyOnWriteArrayList sessions;
74 private boolean started;
75 private boolean brokerConnection;
76 private boolean clusteredConnection;
77 private String remoteBrokerName;
78 private int capacity = 100;
79 private SpooledBoundedPacketQueue spoolQueue;
80 private boolean cleanedUp;
81
82 /***
83 * Default Constructor of BrokerClientImpl
84 */
85 public BrokerClientImpl() {
86 this.packetIdGenerator = new IdGenerator();
87 this.closed = new SynchronizedBoolean(false);
88 this.activeConsumers = new HashSet();
89 this.consumers = new CopyOnWriteArrayList();
90 this.producers = new CopyOnWriteArrayList();
91 this.transactions = new CopyOnWriteArrayList();
92 this.xatransactions = new CopyOnWriteArrayList();
93 this.sessions = new CopyOnWriteArrayList();
94 }
95
96 /***
97 * Initialize the BrokerClient
98 *
99 * @param brokerConnector
100 * @param channel
101 */
102 public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
103 this.brokerConnector = brokerConnector;
104 this.channel = channel;
105 this.channel.setPacketListener(this);
106 this.channel.setExceptionListener(this);
107 log.trace("brokerConnectorConnector client initialized");
108 }
109
110 /***
111 * @return the BrokerConnector this client is associated with
112 */
113 public BrokerConnector getBrokerConnector(){
114 return this.brokerConnector;
115 }
116
117 /***
118 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
119 */
120 public void onException(JMSException jmsEx) {
121 log.warn(this + " caught exception ", jmsEx);
122 close();
123 }
124
125 /***
126 * @return pretty print for this brokerConnector-client
127 */
128 public String toString() {
129 String str = "brokerConnector-client:("+hashCode()+") ";
130 str += connectionInfo == null ? "" : connectionInfo.getClientId();
131 str += ": " + channel;
132 return str;
133 }
134
135 /***
136 * Dispatch an ActiveMQMessage to the end client
137 *
138 * @param message
139 */
140 public void dispatch(ActiveMQMessage message) {
141 if (isSlowConsumer()) {
142 if (spoolQueue == null) {
143 log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
144 String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
145 try {
146 spoolQueue = new SpooledBoundedPacketQueue(brokerConnector.getBrokerContainer().getBroker()
147 .getTempDir(), spoolName);
148 final BoundedPacketQueue bpq = spoolQueue;
149 ThreadedExecutor exec = new ThreadedExecutor();
150 exec.execute(new Runnable() {
151 public void run() {
152 while (!closed.get()) {
153 try {
154 Packet packet = bpq.dequeue();
155 }
156 catch (InterruptedException e) {
157 log.warn("async dispatch got an interupt", e);
158 }
159 catch (JMSException e) {
160 log.error("async dispatch got an problem", e);
161 }
162 }
163 }
164 });
165 }
166 catch (IOException e) {
167 log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
168 close();
169 }
170 catch (InterruptedException e) {
171 log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
172 close();
173 }
174 }
175 if (spoolQueue != null) {
176 try {
177 spoolQueue.enqueue(message);
178 }
179 catch (JMSException e) {
180 log.error(
181 "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
182 e);
183 close();
184 }
185 }
186 }
187 else {
188 send(message);
189 }
190 }
191
192 /***
193 * @return true if the peer for this Client is itself another Broker
194 */
195 public boolean isBrokerConnection() {
196 return brokerConnection;
197 }
198
199 /***
200 * @return true id this client is part of a cluster
201 */
202 public boolean isClusteredConnection(){
203 return clusteredConnection;
204 }
205
206
207 /***
208 * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
209 * capacity representing that the peer cannot process any more messages at the current time
210 *
211 * @return
212 */
213 public int getCapacity() {
214 return capacity;
215 }
216
217 public String getClientID() {
218 if (connectionInfo != null) {
219 return connectionInfo.getClientId();
220 }
221 return null;
222 }
223
224 public TransportChannel getChannel() {
225 return channel;
226 }
227
228 /***
229 * Get an indication if the peer should be considered as a slow consumer
230 *
231 * @return true id the peer should be considered as a slow consumer
232 */
233 public boolean isSlowConsumer() {
234 return capacity <= 20;
235 }
236
237 /***
238 * Consume a Packet from the underlying TransportChannel for processing
239 *
240 * @param packet
241 */
242 public void consume(Packet packet) {
243 if (!closed.get() && packet != null) {
244 Throwable requestEx = null;
245 boolean failed = false;
246 String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
247 String clusterName = brokerConnector.getBrokerInfo().getClusterName();
248 try {
249 if (brokerConnection) {
250 packet.addBrokerVisited(remoteBrokerName);
251 packet.addBrokerVisited(brokerName);
252 }
253
254 if (packet.isJMSMessage()) {
255 ActiveMQMessage message = (ActiveMQMessage) packet;
256
257
258 if (connectionInfo != null) {
259 message.setProducerID(connectionInfo.getClientId());
260 }
261 else {
262 log.warn("No connection info available! Maybe the client forgot to start() the Connection?");
263 }
264 if (!brokerConnection){
265 message.setEntryBrokerName(brokerName);
266 message.setEntryClusterName(clusterName);
267 }
268 consumeActiveMQMessage(message);
269 }
270 else {
271 switch (packet.getPacketType()) {
272 case Packet.ACTIVEMQ_MSG_ACK : {
273 MessageAck ack = (MessageAck) packet;
274 consumeMessageAck(ack);
275 break;
276 }
277 case Packet.XA_TRANSACTION_INFO : {
278 XATransactionInfo info = (XATransactionInfo) packet;
279 consumeXATransactionInfo(info);
280 break;
281 }
282 case Packet.TRANSACTION_INFO : {
283 TransactionInfo info = (TransactionInfo) packet;
284 consumeTransactionInfo(info);
285 break;
286 }
287 case Packet.CONSUMER_INFO : {
288 ConsumerInfo info = (ConsumerInfo) packet;
289 consumeConsumerInfo(info);
290 break;
291 }
292 case Packet.PRODUCER_INFO : {
293 ProducerInfo info = (ProducerInfo) packet;
294 consumeProducerInfo(info);
295 break;
296 }
297 case Packet.SESSION_INFO : {
298 SessionInfo info = (SessionInfo) packet;
299 consumeSessionInfo(info);
300 break;
301 }
302 case Packet.ACTIVEMQ_CONNECTION_INFO : {
303 ConnectionInfo info = (ConnectionInfo) packet;
304 consumeConnectionInfo(info);
305 break;
306 }
307 case Packet.DURABLE_UNSUBSCRIBE : {
308 DurableUnsubscribe ds = (DurableUnsubscribe) packet;
309 brokerConnector.durableUnsubscribe(this, ds);
310 break;
311 }
312 case Packet.CAPACITY_INFO : {
313 CapacityInfo info = (CapacityInfo) packet;
314 consumeCapacityInfo(info);
315 break;
316 }
317 case Packet.CAPACITY_INFO_REQUEST : {
318 updateCapacityInfo(packet.getId());
319 break;
320 }
321 case Packet.ACTIVEMQ_BROKER_INFO : {
322 consumeBrokerInfo((BrokerInfo) packet);
323 break;
324 }
325 default : {
326 log.warn("Unknown Packet received: " + packet);
327 break;
328 }
329 }
330 }
331 }
332 catch (Throwable e) {
333 requestEx = e;
334 log.warn("caught exception consuming packet: " + packet, e);
335 failed = true;
336 }
337 sendReceipt(packet, requestEx, failed);
338 }
339 }
340
341 /***
342 * Register/deregister MessageConsumer with the Broker
343 *
344 * @param info
345 * @throws JMSException
346 */
347 public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
348 if (info.isStarted()) {
349 consumers.add(info);
350 if ((connectionInfo != null && connectionInfo.isStarted())) {
351 if (this.activeConsumers.add(info)) {
352 this.brokerConnector.registerMessageConsumer(this, info);
353 }
354 }
355 }
356 else {
357 consumers.remove(info);
358 if (activeConsumers.remove(info)) {
359 this.brokerConnector.deregisterMessageConsumer(this, info);
360 }
361 }
362 }
363
364 /***
365 * Update the peer Connection about the Broker's capacity for messages
366 *
367 * @param capacity
368 */
369 public void updateBrokerCapacity(int capacity) {
370 CapacityInfo info = new CapacityInfo();
371 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
372 info.setCapacity(capacity);
373 info.setFlowControlTimeout(getFlowControlTimeout(capacity));
374 send(info);
375 }
376
377 /***
378 * register with the Broker
379 *
380 * @param info
381 * @throws JMSException
382 */
383 public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
384 this.connectionInfo = info;
385 if (info.isClosed()) {
386 cleanUp();
387 try {
388 sendReceipt(info);
389 info.setReceiptRequired(false);
390 try {
391 Thread.sleep(500);
392 }
393 catch (Throwable e) {
394 }
395 }
396 finally {
397 close();
398 }
399 }
400 else {
401 if (!started && info.isStarted()) {
402 started = true;
403 log.debug(this + " has started running client version " + info.getClientVersion() + " , wire format = " + info.getWireFormatVersion());
404
405 this.brokerConnector.registerClient(this, info);
406
407 for (Iterator i = consumers.iterator();i.hasNext();) {
408 ConsumerInfo ci = (ConsumerInfo) i.next();
409 ci.setClientId(info.getClientId());
410 }
411 for (Iterator i = producers.iterator();i.hasNext();) {
412 ProducerInfo pi = (ProducerInfo) i.next();
413 pi.setClientId(info.getClientId());
414 }
415 }
416 if (info.isStarted()) {
417
418 for (Iterator i = consumers.iterator();i.hasNext();) {
419 ConsumerInfo ci = (ConsumerInfo) i.next();
420 if (activeConsumers.add(ci)) {
421 this.brokerConnector.registerMessageConsumer(this, ci);
422 }
423 }
424 }
425 else {
426 log.debug(this + " has stopped");
427
428 for (Iterator i = consumers.iterator();i.hasNext();) {
429 ConsumerInfo ci = (ConsumerInfo) i.next();
430 if (activeConsumers.remove(ci)) {
431 this.brokerConnector.deregisterMessageConsumer(this, ci);
432 }
433 }
434 }
435 }
436 }
437
438 /***
439 * start consuming messages
440 *
441 * @throws JMSException
442 */
443 public void start() throws JMSException {
444 channel.start();
445 }
446
447 /***
448 * stop consuming messages
449 *
450 * @throws JMSException
451 */
452 public void stop() throws JMSException {
453 log.trace("Stopping channel: " + channel);
454 channel.stop();
455 }
456
457 public synchronized void cleanUp() {
458
459
460
461 if (!cleanedUp) {
462 cleanedUp = true;
463 try {
464 try {
465 for (Iterator i = consumers.iterator();i.hasNext();) {
466 ConsumerInfo info = (ConsumerInfo) i.next();
467 info.setStarted(false);
468 this.brokerConnector.deregisterMessageConsumer(this, info);
469 }
470 for (Iterator i = producers.iterator();i.hasNext();) {
471 ProducerInfo info = (ProducerInfo) i.next();
472 info.setStarted(false);
473 this.brokerConnector.deregisterMessageProducer(this, info);
474 }
475 for (Iterator i = sessions.iterator();i.hasNext();) {
476 SessionInfo info = (SessionInfo) i.next();
477 info.setStarted(false);
478 this.brokerConnector.deregisterSession(this, info);
479 }
480 for (Iterator i = transactions.iterator();i.hasNext();) {
481 this.brokerConnector.rollbackTransaction(this, i.next().toString());
482 }
483 for (Iterator i = xatransactions.iterator();i.hasNext();) {
484 try {
485 this.brokerConnector.rollbackTransaction(this, (ActiveMQXid) i.next());
486 }
487 catch (XAException e) {
488 log.warn("Transaction rollback failed:", e);
489 }
490 }
491 }
492 finally {
493
494 if (log.isDebugEnabled()) {
495 log.debug(this + " has stopped");
496 }
497 this.consumers.clear();
498 this.producers.clear();
499 this.transactions.clear();
500 this.xatransactions.clear();
501 this.sessions.clear();
502 this.brokerConnector.deregisterClient(this, connectionInfo);
503 }
504 }
505 catch (JMSException e) {
506 log.warn("failed to de-register Broker client: " + e, e);
507 }
508 }
509 else {
510 log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
511 }
512 }
513
514
515
516 protected void send(Packet packet) {
517 if (!closed.get()) {
518 try {
519 if (brokerConnection) {
520 String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
521 packet.addBrokerVisited(brokerName);
522 if (packet.hasVisited(remoteBrokerName)) {
523 if (log.isDebugEnabled()) {
524 log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
525 + packet);
526 }
527 return;
528 }
529 }
530 this.channel.asyncSend(packet);
531 }
532 catch (JMSException e) {
533 log.warn(this + " caught exception ", e);
534 close();
535 }
536 }
537 }
538
539 protected void close() {
540 if (closed.commit(false, true)) {
541 this.channel.stop();
542 log.debug(this + " has closed");
543 }
544 }
545
546 /***
547 * Send message to Broker
548 *
549 * @param message
550 * @throws JMSException
551 */
552 private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
553 message = message.shallowCopy();
554 if (message.isPartOfTransaction()) {
555 this.brokerConnector.sendTransactedMessage(this, message.getTransactionId(), message);
556 }
557 else {
558 this.brokerConnector.sendMessage(this, message);
559 }
560 }
561
562 /***
563 * Send Message acknowledge to the Broker
564 *
565 * @param ack
566 * @throws JMSException
567 */
568 private void consumeMessageAck(MessageAck ack) throws JMSException {
569 if (ack.isPartOfTransaction()) {
570 this.brokerConnector.acknowledgeTransactedMessage(this, ack.getTransactionId(), ack);
571 }
572 else {
573 this.brokerConnector.acknowledgeMessage(this, ack);
574 }
575 }
576
577 /***
578 * Handle transaction start/commit/rollback
579 *
580 * @param info
581 * @throws JMSException
582 */
583 private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
584 if (info.getType() == TransactionInfo.START) {
585 transactions.add(info.getTransactionId());
586 this.brokerConnector.startTransaction(this, info.getTransactionId());
587 }
588 else {
589 if (info.getType() == TransactionInfo.ROLLBACK) {
590 this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
591 }
592 else if (info.getType() == TransactionInfo.COMMIT) {
593 this.brokerConnector.commitTransaction(this, info.getTransactionId());
594 }
595 transactions.remove(info.getTransactionId());
596 }
597 }
598
599 /***
600 * Handle XA transaction start/prepare/commit/rollback
601 *
602 * @param info
603 * @throws JMSException
604 * @throws XAException
605 */
606 private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
607 if (info.getType() == XATransactionInfo.START) {
608 transactions.add(info.getXid());
609 this.brokerConnector.startTransaction(this, info.getXid());
610 }
611 else if (info.getType() == XATransactionInfo.XA_RECOVER) {
612 ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
613
614 info.setReceiptRequired(false);
615
616 ResponseReceipt receipt = new ResponseReceipt();
617 receipt.setId(this.packetIdGenerator.generateId());
618 receipt.setCorrelationId(info.getId());
619 receipt.setResult(rc);
620 send(receipt);
621 }
622 else if (info.getType() == XATransactionInfo.GET_RM_ID) {
623 String rc = this.brokerConnector.getResourceManagerId(this);
624
625 info.setReceiptRequired(false);
626
627 ResponseReceipt receipt = new ResponseReceipt();
628 receipt.setId(this.packetIdGenerator.generateId());
629 receipt.setCorrelationId(info.getId());
630 receipt.setResult(rc);
631 send(receipt);
632 }
633 else if (info.getType() == XATransactionInfo.END) {
634
635 }
636 else {
637 if (info.getType() == XATransactionInfo.PRE_COMMIT) {
638 int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
639
640 info.setReceiptRequired(false);
641
642 IntResponseReceipt receipt = new IntResponseReceipt();
643 receipt.setId(this.packetIdGenerator.generateId());
644 receipt.setCorrelationId(info.getId());
645 receipt.setResult(rc);
646 send(receipt);
647 }
648 else if (info.getType() == XATransactionInfo.ROLLBACK) {
649 this.brokerConnector.rollbackTransaction(this, info.getXid());
650 }
651 else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
652 this.brokerConnector.commitTransaction(this, info.getXid(), true);
653 }
654 else if (info.getType() == XATransactionInfo.COMMIT) {
655 this.brokerConnector.commitTransaction(this, info.getXid(), false);
656 }
657 else {
658 throw new JMSException("Packet type: " + info.getType() + " not recognized.");
659 }
660 transactions.remove(info.getXid());
661 }
662 }
663
664 /***
665 * register/deregister MessageProducer in the Broker
666 *
667 * @param info
668 * @throws JMSException
669 */
670 private void consumeProducerInfo(ProducerInfo info) throws JMSException {
671 if (info.isStarted()) {
672 producers.add(info);
673 this.brokerConnector.registerMessageProducer(this, info);
674 }
675 else {
676 producers.remove(info);
677 this.brokerConnector.deregisterMessageProducer(this, info);
678 }
679 }
680
681 /***
682 * register/deregister Session in a Broker
683 *
684 * @param info
685 * @throws JMSException
686 */
687 private void consumeSessionInfo(SessionInfo info) throws JMSException {
688 if (info.isStarted()) {
689 sessions.add(info);
690 this.brokerConnector.registerSession(this, info);
691 }
692 else {
693 sessions.remove(info);
694 this.brokerConnector.deregisterSession(this, info);
695 }
696 }
697
698 /***
699 * Update capacity for the peer
700 *
701 * @param info
702 */
703 private void consumeCapacityInfo(CapacityInfo info) {
704 this.capacity = info.getCapacity();
705 }
706
707 private void updateCapacityInfo(String correlationId) {
708 CapacityInfo info = new CapacityInfo();
709 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
710 info.setCorrelationId(correlationId);
711 info.setCapacity(this.brokerConnector.getBrokerCapacity());
712 info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
713 send(info);
714 }
715
716 private long getFlowControlTimeout(int capacity) {
717 long result = -1;
718 if (capacity <= 0) {
719 result = 10000;
720 }
721 else if (capacity <= 10) {
722 result = 1000;
723 }
724 else if (capacity <= 20) {
725 result = 10;
726 }
727 return result;
728 }
729
730 private void consumeBrokerInfo(BrokerInfo info) {
731 brokerConnection = true;
732 remoteBrokerName = info.getBrokerName();
733 String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
734 if (clusterName.equals(info.getClusterName())){
735 clusteredConnection = true;
736 }
737 }
738
739 private void sendReceipt(Packet packet) {
740 sendReceipt(packet, null, false);
741 }
742
743 private void sendReceipt(Packet packet, Throwable requestEx, boolean failed) {
744 if (packet != null && packet.isReceiptRequired()) {
745 Receipt receipt = new Receipt();
746 receipt.setId(this.packetIdGenerator.generateId());
747 receipt.setCorrelationId(packet.getId());
748 receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
749 receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
750 receipt.setException(requestEx);
751 receipt.setFailed(failed);
752 send(receipt);
753 }
754 }
755 }