View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.codehaus.activemq.message;
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.util.WireByteArrayInputStream;
23  import org.codehaus.activemq.message.util.WireByteArrayOutputStream;
24  import java.io.DataInput;
25  import java.io.DataInputStream;
26  import java.io.DataOutput;
27  import java.io.DataOutputStream;
28  import java.io.IOException;
29  import java.io.ObjectStreamException;
30  import java.io.Serializable;
31  
32  /***
33   * Default implementation used for Java-Java protocols. When talking to non-Java nodes we may use a different wire
34   * format.
35   * 
36   * @version $Revision: 1.14 $
37   */
38  public class DefaultWireFormat extends WireFormat implements Serializable {
39      /***
40       * Current wire format version for this implementation
41       */
42      public static final int WIRE_FORMAT_VERSION = 1;
43      private static final Log log = LogFactory.getLog(DefaultWireFormat.class);
44      
45      private transient final PacketReader messageReader = new ActiveMQMessageReader();
46      private transient final PacketReader textMessageReader = new ActiveMQTextMessageReader();
47      private transient final PacketReader objectMessageReader = new ActiveMQObjectMessageReader();
48      private transient final PacketReader bytesMessageReader = new ActiveMQBytesMessageReader();
49      private transient final PacketReader streamMessageReader = new ActiveMQStreamMessageReader();
50      private transient final PacketReader mapMessageReader = new ActiveMQMapMessageReader();
51      private transient final PacketReader messageAckReader = new MessageAckReader();
52      private transient final PacketReader receiptReader = new ReceiptReader();
53      private transient final PacketReader consumerInfoReader = new ConsumerInfoReader();
54      private transient final PacketReader producerInfoReader = new ProducerInfoReader();
55      private transient final PacketReader transactionInfoReader = new TransactionInfoReader();
56      private transient final PacketReader xaTransactionInfoReader = new XATransactionInfoReader();
57      private transient final PacketReader brokerInfoReader = new BrokerInfoReader();
58      private transient final PacketReader connectionInfoReader = new ConnectionInfoReader();
59      private transient final PacketReader sessionInfoReader = new SessionInfoReader();
60      private transient final PacketReader durableUnsubscribeReader = new DurableUnsubscribeReader();
61      private transient final PacketReader reponseReceiptReader = new ResponseReceiptReader();
62      private transient final PacketReader intReponseReceiptReader = new IntResponseReceiptReader();
63      private transient final PacketReader capacityInfoReader = new CapacityInfoReader();
64      private transient final PacketReader capacityInfoRequestReader = new CapacityInfoRequestReader();
65      private transient final PacketReader wireFormatInfoReader = new WireFormatInfoReader();
66      private transient final PacketWriter messageWriter = new ActiveMQMessageWriter();
67      private transient final PacketWriter textMessageWriter = new ActiveMQTextMessageWriter();
68      private transient final PacketWriter objectMessageWriter = new ActiveMQObjectMessageWriter();
69      private transient final PacketWriter bytesMessageWriter = new ActiveMQBytesMessageWriter();
70      private transient final PacketWriter streamMessageWriter = new ActiveMQStreamMessageWriter();
71      private transient final PacketWriter mapMessageWriter = new ActiveMQMapMessageWriter();
72      private transient final PacketWriter messageAckWriter = new MessageAckWriter();
73      private transient final PacketWriter receiptWriter = new ReceiptWriter();
74      private transient final PacketWriter consumerInfoWriter = new ConsumerInfoWriter();
75      private transient final PacketWriter producerInfoWriter = new ProducerInfoWriter();
76      private transient final PacketWriter transactionInfoWriter = new TransactionInfoWriter();
77      private transient final PacketWriter xaTransactionInfoWriter = new XATransactionInfoWriter();
78      private transient final PacketWriter brokerInfoWriter = new BrokerInfoWriter();
79      private transient final PacketWriter connectionInfoWriter = new ConnectionInfoWriter();
80      private transient final PacketWriter sessionInfoWriter = new SessionInfoWriter();
81      private transient final PacketWriter durableUnsubscribeWriter = new DurableUnsubscribeWriter();
82      private transient final PacketWriter reponseReceiptWriter = new ResponseReceiptWriter();
83      private transient final PacketWriter intReponseReceiptWriter = new IntResponseReceiptWriter();
84      private transient final PacketWriter capacityInfoWriter = new CapacityInfoWriter();
85      private transient final PacketWriter capacityInfoRequestWriter = new CapacityInfoRequestWriter();
86      private transient final PacketWriter wireFormatInfoWriter = new WireFormatInfoWriter();
87      private transient WireByteArrayOutputStream internalBytesOut;
88      private transient DataOutputStream internalDataOut;
89      private transient WireByteArrayInputStream internalBytesIn;
90      private transient DataInputStream internalDataIn;
91  
92      /***
93       * Default Constructor
94       */
95      public DefaultWireFormat() {
96          internalBytesOut = new WireByteArrayOutputStream();
97          internalDataOut = new DataOutputStream(internalBytesOut);
98          internalBytesIn = new WireByteArrayInputStream();
99          internalDataIn = new DataInputStream(internalBytesIn);
100     }
101     
102 
103     /***
104      * @return new WireFormat
105      */
106     public WireFormat copy() {
107         return new DefaultWireFormat();
108     }
109 
110     
111     /***
112      * @param in
113      * @return
114      * @throws IOException
115      */
116     public Packet readPacket(DataInput in) throws IOException {
117         int type = in.readByte();
118         return readPacket(type, in);
119     }
120 
121     /***
122      * @param firstByte
123      * @param dataIn
124      * @return
125      * @throws IOException
126      * 
127      */
128     public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
129         switch (firstByte) {
130             case Packet.ACTIVEMQ_MESSAGE :
131                 return readPacket(dataIn, messageReader);
132             case Packet.ACTIVEMQ_TEXT_MESSAGE :
133                 return readPacket(dataIn, textMessageReader);
134             case Packet.ACTIVEMQ_OBJECT_MESSAGE :
135                 return readPacket(dataIn, objectMessageReader);
136             case Packet.ACTIVEMQ_BYTES_MESSAGE :
137                 return readPacket(dataIn, bytesMessageReader);
138             case Packet.ACTIVEMQ_STREAM_MESSAGE :
139                 return readPacket(dataIn, streamMessageReader);
140             case Packet.ACTIVEMQ_MAP_MESSAGE :
141                 return readPacket(dataIn, mapMessageReader);
142             case Packet.ACTIVEMQ_MSG_ACK :
143                 return readPacket(dataIn, messageAckReader);
144             case Packet.RECEIPT_INFO :
145                 return readPacket(dataIn, receiptReader);
146             case Packet.CONSUMER_INFO :
147                 return readPacket(dataIn, consumerInfoReader);
148             case Packet.PRODUCER_INFO :
149                 return readPacket(dataIn, producerInfoReader);
150             case Packet.TRANSACTION_INFO :
151                 return readPacket(dataIn, transactionInfoReader);
152             case Packet.XA_TRANSACTION_INFO :
153                 return readPacket(dataIn, xaTransactionInfoReader);
154             case Packet.ACTIVEMQ_BROKER_INFO :
155                 return readPacket(dataIn, brokerInfoReader);
156             case Packet.ACTIVEMQ_CONNECTION_INFO :
157                 return readPacket(dataIn, connectionInfoReader);
158             case Packet.SESSION_INFO :
159                 return readPacket(dataIn, sessionInfoReader);
160             case Packet.DURABLE_UNSUBSCRIBE :
161                 return readPacket(dataIn, durableUnsubscribeReader);
162             case Packet.RESPONSE_RECEIPT_INFO :
163                 return readPacket(dataIn, reponseReceiptReader);
164             case Packet.INT_RESPONSE_RECEIPT_INFO :
165                 return readPacket(dataIn, intReponseReceiptReader);
166             case Packet.CAPACITY_INFO :
167                 return readPacket(dataIn, capacityInfoReader);
168             case Packet.CAPACITY_INFO_REQUEST :
169                 return readPacket(dataIn, capacityInfoRequestReader);
170             case Packet.WIRE_FORMAT_INFO :
171                 return readPacket(dataIn, wireFormatInfoReader);
172             default :
173                 log.error("Could not find PacketReader for packet type: "
174                         + AbstractPacket.getPacketTypeAsString(firstByte));
175                 return null;
176         }
177     }
178 
179     /***
180      * Write a Packet to a DataOutput
181      * 
182      * @param packet
183      * @param dataOut
184      * @throws IOException
185      */
186     public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
187         PacketWriter writer = getWriter(packet);
188         if (writer != null) {
189             writePacket(packet, dataOut, writer);
190         }
191     }
192 
193     /***
194      * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
195      * BytesOutputStream
196      * 
197      * @param packet
198      * @return a byte array representing the packet using some wire protocol
199      * @throws IOException
200      */
201     public byte[] toBytes(Packet packet) throws IOException {
202         byte[] data = null;
203         PacketWriter writer = getWriter(packet);
204         if (writer != null) {
205             internalBytesOut.reset();
206             internalDataOut.writeByte(packet.getPacketType());
207             internalDataOut.writeInt(-1);//the length
208             writer.writePacket(packet, internalDataOut);
209             internalDataOut.flush();
210             data = internalBytesOut.toByteArray();
211             // lets subtract the header offset from the length
212             int length = data.length - 5;
213             packet.setMemoryUsage(length);
214             //write in the length to the data
215             data[1] = (byte) ((length >>> 24) & 0xFF);
216             data[2] = (byte) ((length >>> 16) & 0xFF);
217             data[3] = (byte) ((length >>> 8) & 0xFF);
218             data[4] = (byte) ((length >>> 0) & 0xFF);
219         }
220         return data;
221     }
222     
223     /***
224      * Can this wireformat process packets of this version
225      * @param version the version number to test
226      * @return true if can accept the version
227      */
228     public boolean canProcessWireFormatVersion(int version){
229         return version == WIRE_FORMAT_VERSION;
230     }
231     
232     /***
233      * @return the current version of this wire format
234      */
235     public int getCurrentWireFormatVersion(){
236         return WIRE_FORMAT_VERSION;
237     }
238 
239     protected synchronized final void writePacket(Packet packet, DataOutput dataOut, PacketWriter writer)
240             throws IOException {
241         dataOut.writeByte(packet.getPacketType());
242         internalBytesOut.reset();
243         writer.writePacket(packet, internalDataOut);
244         internalDataOut.flush();
245         //reuse the byte buffer in the ByteArrayOutputStream
246         byte[] data = internalBytesOut.getData();
247         int count = internalBytesOut.size();
248         dataOut.writeInt(count);
249         //byte[] data = internalBytesOut.toByteArray();
250         //int count = data.length;
251         //dataOut.writeInt(count);
252         packet.setMemoryUsage(count);
253         dataOut.write(data, 0, count);
254     }
255 
256     protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
257         Packet packet = reader.createPacket();
258         int length = dataIn.readInt();
259         packet.setMemoryUsage(length);
260         // read all the remaining data in one chunk ignoring the header
261         // TODO sometimes the length should exclude the header?
262         byte[] data = new byte[length];
263         dataIn.readFully(data);
264         //then splat into the internal datainput
265         internalBytesIn.restart(data);
266         reader.buildPacket(packet, internalDataIn);
267         return packet;
268     }
269 
270     private Object readResolve() throws ObjectStreamException {
271         return new DefaultWireFormat();
272     }
273 
274     private PacketWriter getWriter(Packet packet) throws IOException {
275         PacketWriter answer = null;
276         switch (packet.getPacketType()) {
277             case Packet.ACTIVEMQ_MESSAGE :
278                 answer = messageWriter;
279                 break;
280             case Packet.ACTIVEMQ_TEXT_MESSAGE :
281                 answer = textMessageWriter;
282                 break;
283             case Packet.ACTIVEMQ_OBJECT_MESSAGE :
284                 answer = objectMessageWriter;
285                 break;
286             case Packet.ACTIVEMQ_BYTES_MESSAGE :
287                 answer = bytesMessageWriter;
288                 break;
289             case Packet.ACTIVEMQ_STREAM_MESSAGE :
290                 answer = streamMessageWriter;
291                 break;
292             case Packet.ACTIVEMQ_MAP_MESSAGE :
293                 answer = mapMessageWriter;
294                 break;
295             case Packet.ACTIVEMQ_MSG_ACK :
296                 answer = messageAckWriter;
297                 break;
298             case Packet.RECEIPT_INFO :
299                 answer = receiptWriter;
300                 break;
301             case Packet.CONSUMER_INFO :
302                 answer = consumerInfoWriter;
303                 break;
304             case Packet.PRODUCER_INFO :
305                 answer = producerInfoWriter;
306                 break;
307             case Packet.TRANSACTION_INFO :
308                 answer = transactionInfoWriter;
309                 break;
310             case Packet.XA_TRANSACTION_INFO :
311                 answer = xaTransactionInfoWriter;
312                 break;
313             case Packet.ACTIVEMQ_BROKER_INFO :
314                 answer = brokerInfoWriter;
315                 break;
316             case Packet.ACTIVEMQ_CONNECTION_INFO :
317                 answer = connectionInfoWriter;
318                 break;
319             case Packet.SESSION_INFO :
320                 answer = sessionInfoWriter;
321                 break;
322             case Packet.DURABLE_UNSUBSCRIBE :
323                 answer = durableUnsubscribeWriter;
324                 break;
325             case Packet.RESPONSE_RECEIPT_INFO :
326                 answer = reponseReceiptWriter;
327                 break;
328             case Packet.INT_RESPONSE_RECEIPT_INFO :
329                 answer = intReponseReceiptWriter;
330                 break;
331             case Packet.CAPACITY_INFO :
332                 answer = capacityInfoWriter;
333                 break;
334             case Packet.CAPACITY_INFO_REQUEST :
335                 answer = capacityInfoRequestWriter;
336                 break;
337             case Packet.WIRE_FORMAT_INFO :
338                 answer = wireFormatInfoWriter;
339                 break;
340             default :
341                 log.error("no PacketWriter for packet: " + packet);
342         }
343         return answer;
344     }
345 }