1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.web;
20
21 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.activemq.ActiveMQConnection;
25 import org.codehaus.activemq.ActiveMQConnectionFactory;
26 import org.codehaus.activemq.ActiveMQSession;
27
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import javax.jms.Message;
32 import javax.jms.MessageConsumer;
33 import javax.jms.MessageProducer;
34 import javax.jms.Session;
35 import javax.jms.Topic;
36 import javax.servlet.ServletContext;
37 import javax.servlet.http.HttpSession;
38 import javax.servlet.http.HttpSessionActivationListener;
39 import javax.servlet.http.HttpSessionEvent;
40 import java.io.Externalizable;
41 import java.io.IOException;
42 import java.io.ObjectInput;
43 import java.io.ObjectOutput;
44 import java.util.HashMap;
45 import java.util.Map;
46
47 /***
48 * Represents a messaging client used from inside a web container
49 * typically stored inside a HttpSession
50 *
51 * @version $Revision: 1.12 $
52 */
53 public class WebClient implements HttpSessionActivationListener, Externalizable {
54 public static final String webClientAttribute = "org.codehaus.activemq.webclient";
55 public static final String connectionFactoryAttribute = "org.codehaus.activemq.connectionFactory";
56 public static final String queueConsumersAttribute = "org.codehaus.activemq.queueConsumers";
57 public static final String brokerUrlInitParam = "org.codehaus.activemq.brokerURL";
58 public static final String embeddedBrokerInitParam = "org.codehaus.activemq.embeddedBroker";
59
60 private static final Log log = LogFactory.getLog(WebClient.class);
61
62 private static transient ConnectionFactory factory;
63 private static transient Map queueConsumers;
64
65 private transient ServletContext context;
66 private transient ActiveMQConnection connection;
67 private transient ActiveMQSession session;
68 private transient MessageProducer producer;
69 private transient Map topicConsumers = new ConcurrentHashMap();
70
71
72 /***
73 * @return the web client for the current HTTP session or null if there is not a web client created yet
74 */
75 public static WebClient getWebClient(HttpSession session) {
76 return (WebClient) session.getAttribute(webClientAttribute);
77 }
78
79
80 public static void initContext(ServletContext context) {
81 factory = initConnectionFactory(context);
82 if (factory == null) {
83 log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
84 factory = new ActiveMQConnectionFactory("vm://localhost");
85 context.setAttribute(connectionFactoryAttribute, factory);
86 }
87 queueConsumers = initQueueConsumers(context);
88 }
89
90 /***
91 * Only called by serialization
92 */
93 public WebClient() {
94 }
95
96 public WebClient(ServletContext context) {
97 this.context = context;
98 initContext(context);
99 }
100
101 public void start() throws JMSException {
102 }
103
104 public void stop() throws JMSException {
105 try {
106 connection.close();
107 }
108 finally {
109 producer = null;
110 session = null;
111 connection = null;
112 topicConsumers.clear();
113 }
114 }
115
116 public void writeExternal(ObjectOutput out) throws IOException {
117 }
118
119 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
120 topicConsumers = new HashMap();
121 }
122
123 public void send(Destination destination, Message message) throws JMSException {
124 if (producer == null) {
125 producer = getSession().createProducer(null);
126 }
127 log.info("Sending to destination: " + destination);
128 producer.send(destination, message);
129 log.info("Sent! message: " + message);
130 }
131
132 public Session getSession() throws JMSException {
133 if (session == null) {
134 session = createSession();
135 }
136 return session;
137 }
138
139 public ActiveMQConnection getConnection() throws JMSException {
140 if (connection == null) {
141 connection = (ActiveMQConnection) factory.createConnection();
142 connection.start();
143 }
144 return connection;
145 }
146
147 public void sessionWillPassivate(HttpSessionEvent event) {
148 try {
149 stop();
150 }
151 catch (JMSException e) {
152 log.warn("Could not close connection: " + e, e);
153 }
154 }
155
156 public void sessionDidActivate(HttpSessionEvent event) {
157
158 context = event.getSession().getServletContext();
159 initContext(context);
160 }
161
162 public static Map initQueueConsumers(ServletContext context) {
163 Map answer = (Map) context.getAttribute(queueConsumersAttribute);
164 if (answer == null) {
165 answer = new HashMap();
166 context.setAttribute(queueConsumersAttribute, answer);
167 }
168 return answer;
169 }
170
171
172 public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
173 ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
174 if (connectionFactory == null) {
175 String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
176
177 servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
178
179 if (brokerURL == null) {
180 brokerURL = "vm://localhost";
181 }
182
183 boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
184 servletContext.log("Use embedded broker: " + embeddedBroker);
185
186 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
187 factory.setUseEmbeddedBroker(embeddedBroker);
188
189 connectionFactory = factory;
190 servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
191 }
192 return connectionFactory;
193 }
194
195 public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
196 if (destination instanceof Topic) {
197 MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
198 if (consumer == null) {
199 consumer = session.createConsumer(destination);
200 topicConsumers.put(destination, consumer);
201 }
202 return consumer;
203 }
204 else {
205 synchronized (queueConsumers) {
206 SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
207 if (pair == null) {
208 pair = createSessionConsumerPair(destination);
209 queueConsumers.put(destination, pair);
210 }
211 return pair.consumer;
212 }
213 }
214 }
215
216 protected ActiveMQSession createSession() throws JMSException {
217 return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
218 }
219
220 protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
221 SessionConsumerPair answer = new SessionConsumerPair();
222 answer.session = createSession();
223 answer.consumer = answer.session.createConsumer(destination);
224 return answer;
225 }
226
227 protected static class SessionConsumerPair {
228 public Session session;
229 public MessageConsumer consumer;
230 }
231 }