View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message;
20  
21  import org.codehaus.activemq.util.BitArray;
22  
23  import java.io.DataInput;
24  import java.io.IOException;
25  
26  /***
27   * Writes a ProducerInfo object to a Stream
28   */
29  public class ActiveMQMessageReader extends AbstractPacketReader {
30      /***
31       * Return the type of Packet
32       * 
33       * @return integer representation of the type of Packet
34       */
35      public int getPacketType() {
36          return Packet.ACTIVEMQ_MESSAGE;
37      }
38  
39      /***
40       * @return a new Packet instance
41       */
42      public Packet createPacket() {
43          return new ActiveMQMessage();
44      }
45  
46      /***
47       * build a Packet instance from the data input stream
48       *
49       * @param packet A Packet object
50       * @param dataIn the data input stream to build the packet from
51       * @throws IOException
52       */
53      public void buildPacket(Packet packet, DataInput dataIn) throws IOException {
54          super.buildPacket(packet, dataIn);
55          ActiveMQMessage msg = (ActiveMQMessage) packet;
56          msg.setJMSClientID(super.readUTF(dataIn));
57          msg.setProducerID(super.readUTF(dataIn));
58          msg.setJMSDestination(ActiveMQDestination.readFromStream(dataIn));
59          msg.setJMSDeliveryMode(dataIn.readByte());
60          msg.setJMSPriority(dataIn.readByte());
61          BitArray ba = msg.getBitArray();
62       
63          msg.setJMSRedelivered(ba.get(ActiveMQMessage.REDELIVERED_INDEX));
64          msg.setXaTransacted(ba.get(ActiveMQMessage.XA_TRANS_INDEX));
65  
66          if (ba.get(ActiveMQMessage.CORRELATION_INDEX)) {
67              msg.setJMSCorrelationID(super.readUTF(dataIn));
68          }
69          if (ba.get(ActiveMQMessage.TYPE_INDEX)) {
70              msg.setJMSType(super.readUTF(dataIn));
71          }
72          if (ba.get(ActiveMQMessage.BROKER_NAME_INDEX)) {
73              msg.setEntryBrokerName(super.readUTF(dataIn));
74          }
75          if (ba.get(ActiveMQMessage.CLUSTER_NAME_INDEX)) {
76              msg.setEntryClusterName(super.readUTF(dataIn));
77          }
78          if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) {
79              msg.setTransactionId(super.readUTF(dataIn));
80          }
81          if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) {
82              msg.setJMSReplyTo(ActiveMQDestination.readFromStream(dataIn));
83          }
84          if (ba.get(ActiveMQMessage.TIMESTAMP_INDEX)) {
85              msg.setJMSTimestamp(dataIn.readLong());
86          }
87          if (ba.get(ActiveMQMessage.EXPIRATION_INDEX)) {
88              msg.setJMSExpiration(dataIn.readLong());
89          }
90          if (ba.get(ActiveMQMessage.CID_INDEX)) {
91              int cidlength = dataIn.readShort();
92              if (cidlength > 0) {
93                  int[] cids = new int[cidlength];
94                  for (int i = 0; i < cids.length; i++) {
95                      cids[i] = dataIn.readShort();
96                  }
97                  msg.setConsumerNos(cids);
98              }
99          }
100         if (ba.get(ActiveMQMessage.PROPERTIES_INDEX)) {
101             msg.setProperties(msg.readMapProperties(dataIn));
102         }
103         if (ba.get(ActiveMQMessage.PAYLOAD_INDEX)) {
104             int payloadLength = dataIn.readInt();
105             if (payloadLength >= 0) {
106                 byte[] payload = new byte[payloadLength];
107                 dataIn.readFully(payload);
108                 msg.setBodyAsBytes(payload);
109             }
110         }
111     }
112 }