View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.http;
19  
20  import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.TextWireFormat;
25  import org.codehaus.activemq.transport.TransportChannelListener;
26  import org.codehaus.activemq.util.JMSExceptionHelper;
27  
28  import javax.jms.JMSException;
29  import javax.servlet.ServletException;
30  import javax.servlet.http.HttpServlet;
31  import javax.servlet.http.HttpServletRequest;
32  import javax.servlet.http.HttpServletResponse;
33  import java.io.BufferedReader;
34  import java.io.DataOutputStream;
35  import java.io.IOException;
36  import java.util.HashMap;
37  import java.util.Map;
38  
39  /***
40   * @version $Revision: 1.2 $
41   */
42  public class HttpTunnelServlet extends HttpServlet {
43  
44      private static final Log log = LogFactory.getLog(HttpTunnelServlet.class);
45  
46      private TransportChannelListener listener;
47      private TextWireFormat wireFormat;
48      private Map clients = new HashMap();
49      private long requestTimeout = 30000L;
50  
51      public void init() throws ServletException {
52          super.init();
53          listener = (TransportChannelListener) getServletContext().getAttribute("transportChannelListener");
54          if (listener == null) {
55              throw new ServletException("No such attribute 'transportChannelListener' available in the ServletContext");
56          }
57          wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat");
58          if (wireFormat == null) {
59              throw new ServletException("No such attribute 'wireFormat' available in the ServletContext");
60          }
61      }
62  
63      protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
64          // lets return the next response
65          Packet packet = null;
66          try {
67              HttpServerTransportChannel transportChannel = getTransportChannel(request);
68              if (transportChannel == null) {
69                  return;
70              }
71              packet = (Packet) transportChannel.getChannel().poll(requestTimeout);
72          }
73          catch (InterruptedException e) {
74              // ignore
75          }
76          if (packet == null) {
77              response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
78          }
79          else {
80              try {
81                  wireFormat.writePacket(packet, new DataOutputStream(response.getOutputStream()));
82              }
83              catch (JMSException e) {
84                  throw JMSExceptionHelper.newIOException(e);
85              }
86          }
87      }
88  
89      protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
90          HttpServerTransportChannel transportChannel = getTransportChannel(request);
91          if (transportChannel == null) {
92              response.setStatus(HttpServletResponse.SC_NOT_FOUND);
93          }
94          else {
95              try {
96                  Packet packet = wireFormat.fromString(readRequestBody(request));
97                  transportChannel.getPacketListener().consume(packet);
98              }
99              catch (IOException e) {
100                 log.error("Caught: " + e, e);
101             }
102             catch (JMSException e) {
103                 throw JMSExceptionHelper.newIOException(e);
104             }
105         }
106     }
107 
108     protected String readRequestBody(HttpServletRequest request) throws IOException {
109         StringBuffer buffer = new StringBuffer();
110         BufferedReader reader = request.getReader();
111         while (true) {
112             String line = reader.readLine();
113             if (line == null) {
114                 break;
115             }
116             else {
117                 buffer.append(line);
118                 buffer.append("\n");
119             }
120         }
121         return buffer.toString();
122     }
123 
124     protected HttpServerTransportChannel getTransportChannel(HttpServletRequest request) {
125         String clientID = request.getHeader("clientID");
126         if (clientID == null) {
127             clientID = request.getParameter("clientID");
128         }
129         if (clientID == null) {
130             log.warn("No clientID header so ignoring request");
131             return null;
132         }
133         synchronized (this) {
134             HttpServerTransportChannel answer = (HttpServerTransportChannel) clients.get(clientID);
135             if (answer == null) {
136                 answer = createTransportChannel();
137                 clients.put(clientID, answer);
138                 listener.addClient(answer);
139             }
140             else {
141                 // this lookup should keep the client alive, otherwise we need to discard it
142                 keepAlivePing(answer);
143             }
144             return answer;
145         }
146     }
147 
148     /***
149      * Disable this channel from being auto-disconnected after a timeout period
150      */
151     protected void keepAlivePing(HttpServerTransportChannel channel) {
152         /*** TODO */
153     }
154 
155     protected HttpServerTransportChannel createTransportChannel() {
156         return new HttpServerTransportChannel(new BoundedLinkedQueue(10));
157     }
158 }