View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message;
20  
21  import org.codehaus.activemq.util.BitArray;
22  
23  import java.io.DataOutput;
24  import java.io.IOException;
25  
26  /***
27   * Writes a ProducerInfo object to a Stream
28   */
29  public class ActiveMQMessageWriter extends AbstractPacketWriter {
30      /***
31       * Return the type of Packet
32       * 
33       * @return integer representation of the type of Packet
34       */
35      public int getPacketType() {
36          return Packet.ACTIVEMQ_MESSAGE;
37      }
38  
39      /***
40       * Write a Packet instance to data output stream
41       *
42       * @param packet  the instance to be seralized
43       * @param dataOut the output stream
44       * @throws IOException thrown if an error occurs
45       */
46      public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
47          ActiveMQMessage msg = (ActiveMQMessage) packet;
48          byte[] payload = msg.getBodyAsBytes();
49          
50          BitArray ba = msg.getBitArray();
51          ba.reset();
52          
53          ba.set(ActiveMQMessage.CORRELATION_INDEX, msg.getJMSCorrelationID() != null);
54          ba.set(ActiveMQMessage.TYPE_INDEX, msg.getJMSType() != null);
55          ba.set(ActiveMQMessage.BROKER_NAME_INDEX, msg.getEntryBrokerName() != null);
56          ba.set(ActiveMQMessage.CLUSTER_NAME_INDEX, msg.getEntryClusterName() != null);
57          ba.set(ActiveMQMessage.TRANSACTION_ID_INDEX, msg.getTransactionId() != null);
58          ba.set(ActiveMQMessage.REPLY_TO_INDEX, msg.getJMSReplyTo() != null);
59          ba.set(ActiveMQMessage.TIMESTAMP_INDEX, msg.getJMSTimestamp() > 0);
60          ba.set(ActiveMQMessage.EXPIRATION_INDEX, msg.getJMSExpiration() > 0);
61          ba.set(ActiveMQMessage.REDELIVERED_INDEX, msg.getJMSRedelivered());
62          ba.set(ActiveMQMessage.XA_TRANS_INDEX, msg.isXaTransacted());
63          ba.set(ActiveMQMessage.CID_INDEX, msg.getConsumerNos() != null);
64          ba.set(ActiveMQMessage.PROPERTIES_INDEX, msg.getProperties() != null && msg.getProperties().size() > 0);
65          ba.set(ActiveMQMessage.PAYLOAD_INDEX, payload != null);
66          
67          super.writePacket(msg, dataOut);
68          
69          super.writeUTF(msg.getJMSClientID(), dataOut);
70          super.writeUTF(msg.getProducerID(), dataOut);
71          ActiveMQDestination.writeToStream((ActiveMQDestination) msg.getJMSDestination(), dataOut);
72          dataOut.write(msg.getJMSDeliveryMode());
73          dataOut.write(msg.getJMSPriority());
74  
75          
76  
77          if (ba.get(ActiveMQMessage.CORRELATION_INDEX)) {
78              super.writeUTF(msg.getJMSCorrelationID(), dataOut);
79          }
80          if (ba.get(ActiveMQMessage.TYPE_INDEX)) {
81              super.writeUTF(msg.getJMSType(), dataOut);
82          }
83          if (ba.get(ActiveMQMessage.BROKER_NAME_INDEX)) {
84              super.writeUTF(msg.getEntryBrokerName(), dataOut);
85          }
86          if (ba.get(ActiveMQMessage.CLUSTER_NAME_INDEX)) {
87              super.writeUTF(msg.getEntryClusterName(), dataOut);
88          }
89          if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) {
90              super.writeUTF(msg.getTransactionId(), dataOut);
91          }
92          if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) {
93              ActiveMQDestination.writeToStream((ActiveMQDestination) msg.getJMSReplyTo(), dataOut);
94          }
95          if (ba.get(ActiveMQMessage.TIMESTAMP_INDEX)) {
96              dataOut.writeLong(msg.getJMSTimestamp());
97          }
98          if (ba.get(ActiveMQMessage.EXPIRATION_INDEX)) {
99              dataOut.writeLong(msg.getJMSExpiration());
100         }
101         if (ba.get(ActiveMQMessage.CID_INDEX)) {
102             //write out consumer numbers ...
103             int[] cids = msg.getConsumerNos();
104             dataOut.writeShort(cids.length);
105             for (int i = 0; i < cids.length; i++) {
106                 dataOut.writeShort(cids[i]);
107             }
108         }
109         if (ba.get(ActiveMQMessage.PROPERTIES_INDEX)) {
110             msg.writeMapProperties(msg.getProperties(), dataOut);
111         }
112         if (ba.get(ActiveMQMessage.PAYLOAD_INDEX)) {
113             dataOut.writeInt(payload.length);
114             dataOut.write(payload);
115         }
116     }
117 }