1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.jabber;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQBytesMessage;
23 import org.codehaus.activemq.message.ActiveMQMessage;
24 import org.codehaus.activemq.message.ActiveMQObjectMessage;
25 import org.codehaus.activemq.message.ActiveMQTextMessage;
26 import org.codehaus.activemq.message.Packet;
27 import org.codehaus.activemq.message.WireFormat;
28
29 import javax.jms.JMSException;
30 import java.io.DataInput;
31 import java.io.DataOutput;
32 import java.io.IOException;
33 import java.io.Serializable;
34 import java.util.Hashtable;
35 import java.util.Iterator;
36 import java.util.Map;
37
38 /***
39 * A wire format which uses XMPP format of messages
40 *
41 * @version $Revision: 1.3 $
42 */
43 public class JabberWireFormat extends WireFormat {
44 private static final Log log = LogFactory.getLog(JabberWireFormat.class);
45
46 public WireFormat copy() {
47 return new JabberWireFormat();
48 }
49
50 public Packet readPacket(DataInput in) throws IOException {
51 return null; /*** TODO */
52 }
53
54 public Packet readPacket(int firstByte, DataInput in) throws IOException {
55 return null; /*** TODO */
56 }
57
58 public void writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
59 switch (packet.getPacketType()) {
60 case Packet.ACTIVEMQ_MESSAGE:
61 writeMessage((ActiveMQMessage) packet, "", out);
62 break;
63
64 case Packet.ACTIVEMQ_TEXT_MESSAGE:
65 writeTextMessage((ActiveMQTextMessage) packet, out);
66 break;
67
68 case Packet.ACTIVEMQ_BYTES_MESSAGE:
69 writeBytesMessage((ActiveMQBytesMessage) packet, out);
70 break;
71
72 case Packet.ACTIVEMQ_OBJECT_MESSAGE:
73 writeObjectMessage((ActiveMQObjectMessage) packet, out);
74 break;
75
76 case Packet.ACTIVEMQ_MAP_MESSAGE:
77 case Packet.ACTIVEMQ_STREAM_MESSAGE:
78
79
80 case Packet.ACTIVEMQ_BROKER_INFO:
81 case Packet.ACTIVEMQ_CONNECTION_INFO:
82 case Packet.ACTIVEMQ_MSG_ACK:
83 case Packet.CONSUMER_INFO:
84 case Packet.DURABLE_UNSUBSCRIBE:
85 case Packet.INT_RESPONSE_RECEIPT_INFO:
86 case Packet.PRODUCER_INFO:
87 case Packet.RECEIPT_INFO:
88 case Packet.RESPONSE_RECEIPT_INFO:
89 case Packet.SESSION_INFO:
90 case Packet.TRANSACTION_INFO:
91 case Packet.XA_TRANSACTION_INFO:
92 default:
93 log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
94 }
95 }
96
97 /***
98 * Can this wireformat process packets of this version
99 * @param version the version number to test
100 * @return true if can accept the version
101 */
102 public boolean canProcessWireFormatVersion(int version){
103 return true;
104 }
105
106 /***
107 * @return the current version of this wire format
108 */
109 public int getCurrentWireFormatVersion(){
110 return 1;
111 }
112
113
114
115 protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
116 Serializable object = message.getObject();
117 String text = (object != null) ? object.toString() : "";
118 writeMessage(message, text, out);
119 }
120
121 protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
122 writeMessage(message, message.getText(), out);
123 }
124
125 protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
126 byte[] data = message.getBodyAsBytes();
127 String text = encodeBinary(data);
128 writeMessage(message, text, out);
129 }
130
131 protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
132 String type = getXmppType(message);
133
134 StringBuffer buffer = new StringBuffer("<");
135 buffer.append(type);
136 buffer.append(" to='");
137 buffer.append(message.getJMSDestination().toString());
138 buffer.append("' from='");
139 buffer.append(message.getJMSReplyTo().toString());
140 String messageID = message.getJMSMessageID();
141 if (messageID != null) {
142 buffer.append("' id='");
143 buffer.append(messageID);
144 }
145
146 Hashtable properties = message.getProperties();
147 if (properties != null) {
148 for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
149 Map.Entry entry = (Map.Entry) iter.next();
150 Object key = entry.getKey();
151 Object value = entry.getValue();
152 if (value != null) {
153 buffer.append("' ");
154 buffer.append(key.toString());
155 buffer.append("='");
156 buffer.append(value.toString());
157 }
158 }
159 }
160
161 buffer.append("'>");
162
163 String id = message.getJMSCorrelationID();
164 if (id != null) {
165 buffer.append("<thread>");
166 buffer.append(id);
167 buffer.append("</thread>");
168 }
169 buffer.append(body);
170 buffer.append("</");
171 buffer.append(type);
172 buffer.append(">");
173
174 out.write(buffer.toString().getBytes());
175 }
176
177 protected String encodeBinary(byte[] data) {
178
179 throw new RuntimeException("Not implemented yet!");
180 }
181
182 protected String getXmppType(ActiveMQMessage message) {
183 String type = message.getJMSType();
184 if (type == null) {
185 type = "message";
186 }
187 return type;
188 }
189 }