1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 19 package org.codehaus.activemq; 20 21 import javax.jms.ConnectionConsumer; 22 import javax.jms.IllegalStateException; 23 import javax.jms.JMSException; 24 import javax.jms.ServerSession; 25 import javax.jms.ServerSessionPool; 26 27 import org.codehaus.activemq.message.ActiveMQMessage; 28 import org.codehaus.activemq.message.ConsumerInfo; 29 import org.codehaus.activemq.message.util.MemoryBoundedQueue; 30 31 /*** 32 * For application servers, <CODE>Connection</CODE> objects provide a special 33 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The 34 * messages it is to consume are specified by a <CODE>Destination</CODE> and 35 * a message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be 36 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages. 37 * <p/> 38 * <P> 39 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a 40 * <CODE>ServerSession</CODE> from its pool, loads it with a single message, 41 * and starts it. As traffic picks up, messages can back up. If this happens, a 42 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE> 43 * with more than one message. This reduces the thread context switches and 44 * minimizes resource use at the expense of some serialization of message 45 * processing. 46 * 47 * @see javax.jms.Connection#createConnectionConsumer 48 * @see javax.jms.Connection#createDurableConnectionConsumer 49 * @see javax.jms.QueueConnection#createConnectionConsumer 50 * @see javax.jms.TopicConnection#createConnectionConsumer 51 * @see javax.jms.TopicConnection#createDurableConnectionConsumer 52 */ 53 54 public class ActiveMQConnectionConsumer implements ConnectionConsumer, 55 ActiveMQMessageDispatcher { 56 57 private ActiveMQConnection connection; 58 59 private ServerSessionPool sessionPool; 60 61 private ConsumerInfo consumerInfo; 62 63 private boolean closed; 64 65 private int maximumMessages; 66 67 protected MemoryBoundedQueue messageQueue; 68 69 70 /*** 71 * Create a ConnectionConsumer 72 * 73 * @param theConnection 74 * @param theSessionPool 75 * @param theConsumerInfo 76 * @param theMaximumMessages 77 * @throws JMSException 78 */ 79 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, 80 ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo, 81 int theMaximumMessages) throws JMSException { 82 this.connection = theConnection; 83 this.sessionPool = theSessionPool; 84 this.consumerInfo = theConsumerInfo; 85 this.maximumMessages = theMaximumMessages; 86 this.connection.addConnectionConsumer(this); 87 this.consumerInfo.setStarted(true); 88 this.connection.syncSendPacket(this.consumerInfo); 89 90 String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName()+":"+ 91 theConsumerInfo.getConsumerNo(); 92 this.messageQueue = connection.getMemoryBoundedQueue(queueName); 93 } 94 95 /*** 96 * Tests to see if the Message Dispatcher is a target for this message 97 * 98 * @param message the message to test 99 * @return true if the Message Dispatcher can dispatch the message 100 */ 101 public boolean isTarget(ActiveMQMessage message) { 102 return message.isConsumerTarget(this.consumerInfo.getConsumerNo()); 103 } 104 105 /*** 106 * Dispatch an ActiveMQMessage 107 * 108 * @param message 109 */ 110 public void dispatch(ActiveMQMessage message) { 111 if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) { 112 message.setConsumerId(this.consumerInfo.getConsumerId()); 113 try { 114 if( sessionPool != null ) 115 dispatchToSession(message); 116 else 117 dispatchToQueue(message); 118 } 119 catch (JMSException jmsEx) { 120 this.connection.handleAsyncException(jmsEx); 121 } 122 } 123 } 124 125 /*** 126 * @param message 127 * @throws JMSException 128 */ 129 private void dispatchToQueue(ActiveMQMessage message) throws JMSException { 130 messageQueue.enqueue(message); 131 } 132 133 /*** 134 * Receives the next message that arrives within the specified timeout interval. 135 * @throws JMSException 136 */ 137 public ActiveMQMessage receive(long timeout) throws JMSException { 138 try { 139 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout); 140 return message; 141 } 142 catch (InterruptedException ioe) { 143 return null; 144 } 145 } 146 147 148 /*** 149 * @param message 150 * @throws JMSException 151 */ 152 private void dispatchToSession(ActiveMQMessage message) throws JMSException { 153 ServerSession serverSession = sessionPool.getServerSession(); 154 ActiveMQSession session = (ActiveMQSession) serverSession 155 .getSession(); 156 session.dispatch(message); 157 serverSession.start(); 158 } 159 160 /*** 161 * Gets the server session pool associated with this connection consumer. 162 * 163 * @return the server session pool used by this connection consumer 164 * @throws JMSException if the JMS provider fails to get the server session pool 165 * associated with this consumer due to some internal error. 166 */ 167 168 public ServerSessionPool getServerSessionPool() throws JMSException { 169 if (closed) { 170 throw new IllegalStateException("The Connection Consumer is closed"); 171 } 172 return this.sessionPool; 173 } 174 175 /*** 176 * Closes the connection consumer. 177 * <p/> 178 * <P> 179 * Since a provider may allocate some resources on behalf of a connection 180 * consumer outside the Java virtual machine, clients should close these 181 * resources when they are not needed. Relying on garbage collection to 182 * eventually reclaim these resources may not be timely enough. 183 * 184 * @throws JMSException 185 */ 186 187 public void close() throws JMSException { 188 if (!closed) { 189 closed = true; 190 this.consumerInfo.setStarted(false); 191 this.connection.asyncSendPacket(this.consumerInfo); 192 this.connection.removeConnectionConsumer(this); 193 } 194 195 } 196 }