View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport.tcp;
20  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.message.WireFormat;
26  import org.codehaus.activemq.transport.TransportServerChannelSupport;
27  import org.codehaus.activemq.util.JMSExceptionHelper;
28  import javax.jms.JMSException;
29  import java.io.IOException;
30  import java.net.InetAddress;
31  import java.net.ServerSocket;
32  import java.net.Socket;
33  import java.net.SocketTimeoutException;
34  import java.net.URI;
35  import java.net.URISyntaxException;
36  import java.net.UnknownHostException;
37  
38  /***
39   * Binds to a well known port and listens for Sockets ...
40   * 
41   * @version $Revision: 1.23 $
42   */
43  public class TcpTransportServerChannel extends TransportServerChannelSupport implements Runnable {
44      private static final Log log = LogFactory.getLog(TcpTransportServerChannel.class);
45      protected static final int DEFAULT_BACKLOG = 500;
46      private WireFormat wireFormat;
47      private Thread serverSocketThread;
48      private ServerSocket serverSocket;
49      private SynchronizedBoolean closed;
50      private SynchronizedBoolean started;
51      private boolean useAsyncSend = false;
52      private int maxOutstandingMessages = 10;
53      private int backlog = DEFAULT_BACKLOG;
54  
55      /***
56       * Default Constructor
57       * 
58       * @param bindAddr
59       * @throws JMSException
60       */
61      public TcpTransportServerChannel(WireFormat wireFormat, URI bindAddr) throws JMSException {
62          super(bindAddr);
63          this.wireFormat = wireFormat;
64          closed = new SynchronizedBoolean(false);
65          started = new SynchronizedBoolean(false);
66          try {
67              serverSocket = createServerSocket(bindAddr);
68              serverSocket.setSoTimeout(2000);
69              updatePhysicalUri(bindAddr);
70          }
71          catch (Exception se) {
72              System.out.println(se);
73              se.printStackTrace();
74              throw JMSExceptionHelper.newJMSException("Bind to " + bindAddr + " failed: " + se.getMessage(), se);
75          }
76      }
77  
78      public TcpTransportServerChannel(WireFormat wireFormat, ServerSocket serverSocket) throws JMSException {
79          super(serverSocket.getInetAddress().toString());
80          this.wireFormat = wireFormat;
81          this.serverSocket = serverSocket;
82          closed = new SynchronizedBoolean(false);
83          started = new SynchronizedBoolean(false);
84          InetAddress address = serverSocket.getInetAddress();
85          try {
86              updatePhysicalUri(new URI("tcp", "", address.getHostName(), 0, "", "", ""));
87          }
88          catch (URISyntaxException e) {
89              throw JMSExceptionHelper.newJMSException("Failed to extract URI: : " + e.getMessage(), e);
90          }
91      }
92  
93      public void start() throws JMSException {
94          super.start();
95          if (started.commit(false, true)) {
96              log.info("Listening for connections at: " + getUrl());
97              serverSocketThread = new Thread(this, toString());
98              serverSocketThread.setDaemon(true);
99              serverSocketThread.start();
100         }
101     }
102 
103     public void stop() throws JMSException {
104         if (closed.commit(false, true)) {
105             super.stop();
106             try {
107                 if (serverSocket != null) {
108                     serverSocket.close();
109                     serverSocketThread.join();
110                     serverSocketThread = null;
111                 }
112             }
113             catch (Throwable e) {
114                 throw JMSExceptionHelper.newJMSException("Failed to stop: " + e, e);
115             }
116         }
117     }
118 
119     /***
120      * @return pretty print of this
121      */
122     public String toString() {
123         return "TcpTransportServerChannel@" + getUrl();
124     }
125 
126     /***
127      * pull Sockets from the ServerSocket
128      */
129     public void run() {
130         while (!closed.get()) {
131             Socket socket = null;
132             try {
133                 socket = serverSocket.accept();
134                 if (socket != null) {
135                     if (closed.get()) {
136                         socket.close();
137                     }
138                     else {
139                         // have thread per channel for sending messages and a thread for receiving them
140                         PooledExecutor executor = null;
141                         if (useAsyncSend) {
142                             executor = new PooledExecutor(new BoundedBuffer(maxOutstandingMessages), 1);
143                         }
144                         TcpTransportChannel channel = new TcpTransportChannel(wireFormat, socket, executor);
145                         addClient(channel);
146                     }
147                 }
148             }
149             catch (SocketTimeoutException ste) {
150                 //expect this to happen
151             }
152             catch (Throwable e) {
153                 if (!closed.get()) {
154                     log.warn("run()", e);
155                 }
156             }
157         }
158     }
159 
160     // Properties
161     //-------------------------------------------------------------------------
162     public boolean isUseAsyncSend() {
163         return useAsyncSend;
164     }
165 
166     public void setUseAsyncSend(boolean useAsyncSend) {
167         this.useAsyncSend = useAsyncSend;
168     }
169 
170     public int getMaxOutstandingMessages() {
171         return maxOutstandingMessages;
172     }
173 
174     public void setMaxOutstandingMessages(int maxOutstandingMessages) {
175         this.maxOutstandingMessages = maxOutstandingMessages;
176     }
177 
178     public int getBacklog() {
179         return backlog;
180     }
181 
182     public void setBacklog(int backlog) {
183         this.backlog = backlog;
184     }
185 
186     // Implementation methods
187     //-------------------------------------------------------------------------
188     /***
189      * In cases where we construct ourselves with a zero port we need to regenerate the URI with the real physical port
190      * so that people can connect to us via discovery
191      */
192     protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException {
193         URI newURI = new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), bindAddr.getHost(), serverSocket
194                 .getLocalPort(), bindAddr.getPath(), bindAddr.getQuery(), bindAddr.getFragment());
195         setUrl(newURI.toString());
196     }
197 
198     /***
199      * Factory method to create a new ServerSocket
200      * 
201      * @throws UnknownHostException
202      * @throws IOException
203      */
204     protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException {
205         ServerSocket answer = null;
206         String host = bind.getHost();
207         host = (host == null || host.length() == 0) ? "localhost" : host;
208         InetAddress addr = InetAddress.getByName(host);
209         if (addr.equals(InetAddress.getLocalHost())) {
210             answer = new ServerSocket(bind.getPort(), backlog);
211         }
212         else {
213             answer = new ServerSocket(bind.getPort(), backlog, addr);
214         }
215         return answer;
216     }
217 }