View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.web;
20  
21  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.activemq.ActiveMQConnection;
25  import org.codehaus.activemq.ActiveMQConnectionFactory;
26  import org.codehaus.activemq.ActiveMQSession;
27  
28  import javax.jms.ConnectionFactory;
29  import javax.jms.Destination;
30  import javax.jms.JMSException;
31  import javax.jms.Message;
32  import javax.jms.MessageConsumer;
33  import javax.jms.MessageProducer;
34  import javax.jms.Session;
35  import javax.jms.Topic;
36  import javax.servlet.ServletContext;
37  import javax.servlet.http.HttpSession;
38  import javax.servlet.http.HttpSessionActivationListener;
39  import javax.servlet.http.HttpSessionEvent;
40  import java.io.Externalizable;
41  import java.io.IOException;
42  import java.io.ObjectInput;
43  import java.io.ObjectOutput;
44  import java.util.HashMap;
45  import java.util.Map;
46  
47  /***
48   * Represents a messaging client used from inside a web container
49   * typically stored inside a HttpSession
50   *
51   * @version $Revision: 1.12 $
52   */
53  public class WebClient implements HttpSessionActivationListener, Externalizable {
54      public static final String webClientAttribute = "org.codehaus.activemq.webclient";
55      public static final String connectionFactoryAttribute = "org.codehaus.activemq.connectionFactory";
56      public static final String queueConsumersAttribute = "org.codehaus.activemq.queueConsumers";
57      public static final String brokerUrlInitParam = "org.codehaus.activemq.brokerURL";
58      public static final String embeddedBrokerInitParam = "org.codehaus.activemq.embeddedBroker";
59  
60      private static final Log log = LogFactory.getLog(WebClient.class);
61  
62      private static transient ConnectionFactory factory;
63      private static transient Map queueConsumers;
64  
65      private transient ServletContext context;
66      private transient ActiveMQConnection connection;
67      private transient ActiveMQSession session;
68      private transient MessageProducer producer;
69      private transient Map topicConsumers = new ConcurrentHashMap();
70  
71  
72      /***
73       * @return the web client for the current HTTP session or null if there is not a web client created yet
74       */
75      public static WebClient getWebClient(HttpSession session) {
76          return (WebClient) session.getAttribute(webClientAttribute);
77      }
78  
79  
80      public static void initContext(ServletContext context) {
81          factory = initConnectionFactory(context);
82          if (factory == null) {
83              log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
84              factory = new ActiveMQConnectionFactory("vm://localhost");
85              context.setAttribute(connectionFactoryAttribute, factory);
86          }
87          queueConsumers = initQueueConsumers(context);
88      }
89  
90      /***
91       * Only called by serialization
92       */
93      public WebClient() {
94      }
95  
96      public WebClient(ServletContext context) {
97          this.context = context;
98          initContext(context);
99      }
100 
101     public void start() throws JMSException {
102     }
103 
104     public void stop() throws JMSException {
105         try {
106             connection.close();
107         }
108         finally {
109             producer = null;
110             session = null;
111             connection = null;
112             topicConsumers.clear();
113         }
114     }
115 
116     public void writeExternal(ObjectOutput out) throws IOException {
117     }
118 
119     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
120         topicConsumers = new HashMap();
121     }
122 
123     public void send(Destination destination, Message message) throws JMSException {
124         if (producer == null) {
125             producer = getSession().createProducer(null);
126         }
127         log.info("Sending to destination: " + destination);
128         producer.send(destination, message);
129         log.info("Sent! message: " + message);
130     }
131 
132     public Session getSession() throws JMSException {
133         if (session == null) {
134             session = createSession();
135         }
136         return session;
137     }
138 
139     public ActiveMQConnection getConnection() throws JMSException {
140         if (connection == null) {
141             connection = (ActiveMQConnection) factory.createConnection();
142             connection.start();
143         }
144         return connection;
145     }
146 
147     public void sessionWillPassivate(HttpSessionEvent event) {
148         try {
149             stop();
150         }
151         catch (JMSException e) {
152             log.warn("Could not close connection: " + e, e);
153         }
154     }
155 
156     public void sessionDidActivate(HttpSessionEvent event) {
157         // lets update the connection factory from the servlet context
158         context = event.getSession().getServletContext();
159         initContext(context);
160     }
161 
162     public static Map initQueueConsumers(ServletContext context) {
163         Map answer = (Map) context.getAttribute(queueConsumersAttribute);
164         if (answer == null) {
165             answer = new HashMap();
166             context.setAttribute(queueConsumersAttribute, answer);
167         }
168         return answer;
169     }
170 
171 
172     public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
173         ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
174         if (connectionFactory == null) {
175             String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
176 
177             servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
178 
179             if (brokerURL == null) {
180                 brokerURL = "vm://localhost";
181             }
182 
183             boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
184             servletContext.log("Use embedded broker: " + embeddedBroker);
185 
186             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
187             factory.setUseEmbeddedBroker(embeddedBroker);
188 
189             connectionFactory = factory;
190             servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
191         }
192         return connectionFactory;
193     }
194 
195     public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
196         if (destination instanceof Topic) {
197             MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
198             if (consumer == null) {
199                 consumer = session.createConsumer(destination);
200                 topicConsumers.put(destination, consumer);
201             }
202             return consumer;
203         }
204         else {
205             synchronized (queueConsumers) {
206                 SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
207                 if (pair == null) {
208                     pair = createSessionConsumerPair(destination);
209                     queueConsumers.put(destination, pair);
210                 }
211                 return pair.consumer;
212             }
213         }
214     }
215 
216     protected ActiveMQSession createSession() throws JMSException {
217         return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
218     }
219 
220     protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
221         SessionConsumerPair answer = new SessionConsumerPair();
222         answer.session = createSession();
223         answer.consumer = answer.session.createConsumer(destination);
224         return answer;
225     }
226 
227     protected static class SessionConsumerPair {
228         public Session session;
229         public MessageConsumer consumer;
230     }
231 }