View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.jabber;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQBytesMessage;
23  import org.codehaus.activemq.message.ActiveMQMessage;
24  import org.codehaus.activemq.message.ActiveMQObjectMessage;
25  import org.codehaus.activemq.message.ActiveMQTextMessage;
26  import org.codehaus.activemq.message.Packet;
27  import org.codehaus.activemq.message.WireFormat;
28  
29  import javax.jms.JMSException;
30  import java.io.DataInput;
31  import java.io.DataOutput;
32  import java.io.IOException;
33  import java.io.Serializable;
34  import java.util.Hashtable;
35  import java.util.Iterator;
36  import java.util.Map;
37  
38  /***
39   * A wire format which uses XMPP format of messages
40   *
41   * @version $Revision: 1.3 $
42   */
43  public class JabberWireFormat extends WireFormat {
44      private static final Log log = LogFactory.getLog(JabberWireFormat.class);
45  
46      public WireFormat copy() {
47          return new JabberWireFormat();
48      }
49  
50      public Packet readPacket(DataInput in) throws IOException {
51          return null;  /*** TODO */
52      }
53  
54      public Packet readPacket(int firstByte, DataInput in) throws IOException {
55          return null;  /*** TODO */
56      }
57  
58      public void writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
59          switch (packet.getPacketType()) {
60              case Packet.ACTIVEMQ_MESSAGE:
61                  writeMessage((ActiveMQMessage) packet, "", out);
62                  break;
63  
64              case Packet.ACTIVEMQ_TEXT_MESSAGE:
65                  writeTextMessage((ActiveMQTextMessage) packet, out);
66                  break;
67  
68              case Packet.ACTIVEMQ_BYTES_MESSAGE:
69                  writeBytesMessage((ActiveMQBytesMessage) packet, out);
70                  break;
71  
72              case Packet.ACTIVEMQ_OBJECT_MESSAGE:
73                  writeObjectMessage((ActiveMQObjectMessage) packet, out);
74                  break;
75  
76              case Packet.ACTIVEMQ_MAP_MESSAGE:
77              case Packet.ACTIVEMQ_STREAM_MESSAGE:
78  
79  
80              case Packet.ACTIVEMQ_BROKER_INFO:
81              case Packet.ACTIVEMQ_CONNECTION_INFO:
82              case Packet.ACTIVEMQ_MSG_ACK:
83              case Packet.CONSUMER_INFO:
84              case Packet.DURABLE_UNSUBSCRIBE:
85              case Packet.INT_RESPONSE_RECEIPT_INFO:
86              case Packet.PRODUCER_INFO:
87              case Packet.RECEIPT_INFO:
88              case Packet.RESPONSE_RECEIPT_INFO:
89              case Packet.SESSION_INFO:
90              case Packet.TRANSACTION_INFO:
91              case Packet.XA_TRANSACTION_INFO:
92              default:
93                  log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
94          }
95      }
96      
97      /***
98       * Can this wireformat process packets of this version
99       * @param version the version number to test
100      * @return true if can accept the version
101      */
102     public boolean canProcessWireFormatVersion(int version){
103         return true;
104     }
105     
106     /***
107      * @return the current version of this wire format
108      */
109     public int getCurrentWireFormatVersion(){
110         return 1;
111     }
112 
113     // Implementation methods
114     //-------------------------------------------------------------------------
115     protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
116         Serializable object = message.getObject();
117         String text = (object != null) ? object.toString() : "";
118         writeMessage(message, text, out);
119     }
120 
121     protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
122         writeMessage(message, message.getText(), out);
123     }
124 
125     protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
126         byte[] data = message.getBodyAsBytes();
127         String text = encodeBinary(data);
128         writeMessage(message, text, out);
129     }
130 
131     protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
132         String type = getXmppType(message);
133 
134         StringBuffer buffer = new StringBuffer("<");
135         buffer.append(type);
136         buffer.append(" to='");
137         buffer.append(message.getJMSDestination().toString());
138         buffer.append("' from='");
139         buffer.append(message.getJMSReplyTo().toString());
140         String messageID = message.getJMSMessageID();
141         if (messageID != null) {
142             buffer.append("' id='");
143             buffer.append(messageID);
144         }
145 
146         Hashtable properties = message.getProperties();
147         if (properties != null) {
148             for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
149                 Map.Entry entry = (Map.Entry) iter.next();
150                 Object key = entry.getKey();
151                 Object value = entry.getValue();
152                 if (value != null) {
153                     buffer.append("' ");
154                     buffer.append(key.toString());
155                     buffer.append("='");
156                     buffer.append(value.toString());
157                 }
158             }
159         }
160 
161         buffer.append("'>");
162 
163         String id = message.getJMSCorrelationID();
164         if (id != null) {
165             buffer.append("<thread>");
166             buffer.append(id);
167             buffer.append("</thread>");
168         }
169         buffer.append(body);
170         buffer.append("</");
171         buffer.append(type);
172         buffer.append(">");
173 
174         out.write(buffer.toString().getBytes());
175     }
176 
177     protected String encodeBinary(byte[] data) {
178         // TODO
179         throw new RuntimeException("Not implemented yet!");
180     }
181 
182     protected String getXmppType(ActiveMQMessage message) {
183         String type = message.getJMSType();
184         if (type == null) {
185             type = "message";
186         }
187         return type;
188     }
189 }