View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  
24  import javax.jms.JMSException;
25  import java.io.ByteArrayInputStream;
26  import java.io.ByteArrayOutputStream;
27  import java.io.DataInput;
28  import java.io.DataInputStream;
29  import java.io.DataOutput;
30  import java.io.DataOutputStream;
31  import java.io.IOException;
32  import java.net.DatagramPacket;
33  
34  /***
35   * Represents a strategy of encoding packets on the wire or on disk
36   * using some kind of serialization or wire format.
37   * <p/>
38   * We use a default efficient format
39   * for Java to Java communication but other formats to other systems
40   * can be used, such as using simple text
41   * strings when talking to JavaScript or coming up with other formats for
42   * talking to C / C# languages or proprietary messaging systems
43   * we wish to interface with at the wire level etc.
44   *
45   * @version $Revision: 1.11 $
46   */
47  public abstract class WireFormat {
48  
49      private static final Log log = LogFactory.getLog(WireFormat.class);
50  
51      /***
52       * Reads a packet from the given input stream
53       *
54       * @param in
55       * @return
56       * @throws IOException
57       */
58      public abstract Packet readPacket(DataInput in) throws IOException;
59  
60      /***
61       * A helper method for working with sockets where the first byte is read
62       * first, then the rest of the message is read.
63       * <p/>
64       * Its common when dealing with sockets to have different timeout semantics
65       * until the first non-zero byte is read of a message, after which
66       * time a zero timeout is used.
67       *
68       * @param firstByte the first byte of the packet
69       * @param in        the rest of the packet
70       * @return
71       * @throws IOException
72       */
73      public abstract Packet readPacket(int firstByte, DataInput in) throws IOException;
74  
75  
76      /***
77       * Read a packet from a Datagram packet from the given channelID. If the
78       * packet is from the same channel ID as it was sent then we have a
79       * loop-back so discard the packet
80       *
81       * @param channelID is the unique channel ID
82       * @param dpacket
83       * @return the packet read from the datagram or null if it should be
84       *         discarded
85       * @throws IOException
86       */
87      public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException {
88          DataInput in = new DataInputStream(new ByteArrayInputStream(dpacket.getData(), dpacket.getOffset(), dpacket.getLength()));
89          String id = in.readUTF();
90  
91          if (channelID == null) {
92              log.trace("We do not have a channelID which is probably caused by a synchronization issue, we're receiving messages before we're fully initialised");
93          }
94          else if (channelID.equals(id)) {
95              if (log.isTraceEnabled()) {
96                  log.trace("Discarding packet from id: " + id);
97              }
98              return null;
99          }
100         int type = in.readByte();
101         Packet packet = readPacket(type, in);
102 
103 //        if (packet instanceof ActiveMQMessage) {
104 //            System.out.println("#####  read packet from channel: " + id + " in channel: " + channelID + " message: " + packet);
105 //        }
106 //
107         return packet;
108     }
109 
110     /***
111      * Writes the packet to the given output stream
112      *
113      * @param packet
114      * @param out
115      * @throws IOException
116      * @throws JMSException
117      */
118     public abstract void writePacket(Packet packet, DataOutput out) throws IOException, JMSException;
119 
120     /***
121      * Writes the given package to a new datagram
122      *
123      * @param channelID is the unique channel ID
124      * @param packet    is the packet to write
125      * @return
126      * @throws IOException
127      * @throws JMSException
128      */
129     public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException {
130         ByteArrayOutputStream out = new ByteArrayOutputStream();
131         DataOutputStream dataOut = new DataOutputStream(out);
132         channelID = channelID != null ? channelID : "";
133         dataOut.writeUTF(channelID);
134 
135 //        if (packet instanceof ActiveMQMessage) {
136 //            System.out.println("##### write packet from channel: " + channelID + " message: " + packet);
137 //        }
138 
139         writePacket(packet, dataOut);
140         dataOut.close();
141         byte[] data = out.toByteArray();
142         return new DatagramPacket(data, data.length);
143     }
144 
145     /***
146      * Reads the packet from the given byte[]
147      * @param bytes
148      * @param offset
149      * @param length
150      * @return
151      * @throws IOException
152      */
153     public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException {
154         DataInput in = new DataInputStream(new ByteArrayInputStream(bytes, offset, length));
155         return readPacket(in);
156     }
157 
158     /***
159      * Reads the packet from the given byte[]
160      * @param bytes
161      * @return
162      * @throws IOException
163      */
164     public Packet fromBytes(byte[] bytes) throws IOException {
165         DataInput in = new DataInputStream(new ByteArrayInputStream(bytes));
166         return readPacket(in);
167     }
168 
169     /***
170      * A helper method which converts a packet into a byte array
171      *
172      * @param packet
173      * @return a byte array representing the packet using some wire protocol
174      * @throws IOException
175      * @throws JMSException
176      */
177     public byte[] toBytes(Packet packet) throws IOException, JMSException {
178         ByteArrayOutputStream out = new ByteArrayOutputStream();
179         DataOutputStream dataOut = new DataOutputStream(out);
180         writePacket(packet, dataOut);
181         dataOut.close();
182         return out.toByteArray();
183     }
184 
185     /***
186      * Creates a new copy of this wire format so it can be used in another thread/context
187      *
188      * @return
189      */
190     public abstract WireFormat copy();
191     
192     /***
193      * Can this wireformat process packets of this version
194      * @param version the version number to test
195      * @return true if can accept the version
196      */
197     public abstract boolean canProcessWireFormatVersion(int version);
198     
199     /***
200      * @return the current version of this wire format
201      */
202     public abstract int getCurrentWireFormatVersion();
203 }