1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 19 package org.codehaus.activemq.message; 20 21 import java.util.Iterator; 22 import org.codehaus.activemq.util.BitArray; 23 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 24 25 /*** 26 * Abstract class for a transportable Packet 27 * 28 * @version $Revision: 1.19 $ 29 */ 30 public abstract class AbstractPacket implements Packet { 31 32 /*** 33 * Message flag indexes (used for writing/reading to/from a Stream 34 */ 35 static final int RECEIPT_REQUIRED_INDEX = 0; 36 static final int BROKERS_VISITED_INDEX =1; 37 private String id; 38 protected BitArray bitArray; 39 private boolean receiptRequired; 40 private transient int memoryUsage = 2048; 41 private transient int memoryUsageReferenceCount; 42 43 private CopyOnWriteArraySet brokersVisited; 44 45 protected AbstractPacket(){ 46 this.bitArray = new BitArray(); 47 } 48 49 /*** 50 * @return the unique id for this Packet 51 */ 52 public String getId() { 53 return this.id; 54 } 55 56 /*** 57 * Set the unique id for this Packet 58 * 59 * @param newId 60 */ 61 public void setId(String newId) { 62 this.id = newId; 63 } 64 65 /*** 66 * @return true if a Recipt is required 67 */ 68 public boolean isReceiptRequired() { 69 return this.receiptRequired; 70 } 71 72 /*** 73 * @return false since most packets are not receipt packets 74 */ 75 public boolean isReceipt() { 76 return false; 77 } 78 79 /*** 80 * Set if a Recipt if required on receiving this Packet 81 * 82 * @param value 83 */ 84 public void setReceiptRequired(boolean value) { 85 this.receiptRequired = value; 86 } 87 88 /*** 89 * Retrieve if a JMS Message type or not 90 * 91 * @return true if it is a JMS Message 92 */ 93 public boolean isJMSMessage() { 94 return false; 95 } 96 97 /*** 98 * Tests equality with another instance 99 * 100 * @param obj - the other instance to test equality with 101 * @return Returns true if the objects are equilvant 102 */ 103 public boolean equals(Object obj) { 104 boolean result = this == obj; 105 if (!result && obj != null && obj instanceof AbstractPacket) { 106 AbstractPacket other = (AbstractPacket) obj; 107 result = other.getId().equals(this.getId()); 108 } 109 return result; 110 } 111 112 /*** 113 * @return Returns hash code for this instance 114 */ 115 public int hashCode() { 116 return this.id != null ? this.id.hashCode() : super.hashCode(); 117 } 118 119 /*** 120 * Get a hint about how much memory this Packet is consuming 121 * 122 * @return an aproximation of the current memory used by this instance 123 */ 124 public int getMemoryUsage() { 125 return memoryUsage; 126 } 127 128 /*** 129 * Set a hint about how mujch memory this packet is consuming 130 * 131 * @param newMemoryUsage 132 */ 133 public void setMemoryUsage(int newMemoryUsage) { 134 this.memoryUsage = newMemoryUsage; 135 } 136 137 /*** 138 * Increment reference count for bounded memory collections 139 * 140 * @return the incremented reference value 141 * @see org.codehaus.activemq.message.util.MemoryBoundedQueue 142 */ 143 public synchronized int incrementMemoryReferenceCount() { 144 return ++memoryUsageReferenceCount; 145 } 146 147 /*** 148 * Decrement reference count for bounded memory collections 149 * 150 * @return the decremented reference value 151 * @see org.codehaus.activemq.message.util.MemoryBoundedQueue 152 */ 153 public synchronized int decrementMemoryReferenceCount() { 154 return --memoryUsageReferenceCount; 155 } 156 157 /*** 158 * @return the current reference count for bounded memory collections 159 * @see org.codehaus.activemq.message.util.MemoryBoundedQueue 160 */ 161 public synchronized int getMemoryUsageReferenceCount() { 162 return memoryUsageReferenceCount; 163 } 164 165 /*** 166 * As the packet passes through the broker add the broker to the visited list 167 * 168 * @param brokerName the name of the broker 169 */ 170 public void addBrokerVisited(String brokerName) { 171 initializeBrokersVisited(); 172 brokersVisited.add(brokerName); 173 } 174 175 /*** 176 * test to see if the named broker has already seen this packet 177 * 178 * @param brokerName the name of the broker 179 * @return true if the packet has visited the broker 180 */ 181 public boolean hasVisited(String brokerName) { 182 if (brokersVisited == null){ 183 return false; 184 } 185 return brokersVisited.contains(brokerName); 186 } 187 188 /*** 189 * @return Returns the brokersVisited. 190 */ 191 public String getBrokersVisitedAsString() { 192 String result = ""; 193 if (brokersVisited != null && !brokersVisited.isEmpty()){ 194 for (Iterator i = brokersVisited.iterator(); i.hasNext();){ 195 result += i.next().toString() + ","; 196 } 197 } 198 return result; 199 } 200 201 202 /*** 203 * @return pretty print of this Packet 204 */ 205 public String toString() { 206 return getPacketTypeAsString(getPacketType()) + ": " + getId(); 207 } 208 209 210 protected static String getPacketTypeAsString(int type) { 211 String packetTypeStr = ""; 212 switch (type) { 213 case ACTIVEMQ_MESSAGE: 214 packetTypeStr = "ACTIVEMQ_MESSAGE"; 215 break; 216 case ACTIVEMQ_TEXT_MESSAGE: 217 packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE"; 218 break; 219 case ACTIVEMQ_OBJECT_MESSAGE: 220 packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE"; 221 break; 222 case ACTIVEMQ_BYTES_MESSAGE: 223 packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE"; 224 break; 225 case ACTIVEMQ_STREAM_MESSAGE: 226 packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE"; 227 break; 228 case ACTIVEMQ_MAP_MESSAGE: 229 packetTypeStr = "ACTIVEMQ_MAP_MESSAGE"; 230 break; 231 case ACTIVEMQ_MSG_ACK: 232 packetTypeStr = "ACTIVEMQ_MSG_ACK"; 233 break; 234 case RECEIPT_INFO: 235 packetTypeStr = "RECEIPT_INFO"; 236 break; 237 case CONSUMER_INFO: 238 packetTypeStr = "CONSUMER_INFO"; 239 break; 240 case PRODUCER_INFO: 241 packetTypeStr = "PRODUCER_INFO"; 242 break; 243 case TRANSACTION_INFO: 244 packetTypeStr = "TRANSACTION_INFO"; 245 break; 246 case XA_TRANSACTION_INFO: 247 packetTypeStr = "XA_TRANSACTION_INFO"; 248 break; 249 case ACTIVEMQ_BROKER_INFO: 250 packetTypeStr = "ACTIVEMQ_BROKER_INFO"; 251 break; 252 case ACTIVEMQ_CONNECTION_INFO: 253 packetTypeStr = "ACTIVEMQ_CONNECTION_INFO"; 254 break; 255 case SESSION_INFO: 256 packetTypeStr = "SESSION_INFO"; 257 break; 258 default : 259 packetTypeStr = "UNKNOWN PACKET TYPE: " + type; 260 } 261 return packetTypeStr; 262 } 263 264 /*** 265 * A helper method used when implementing equals() which returns true if the objects are identical or equal handling 266 * nulls properly 267 * @param left 268 * @param right 269 * 270 * @return true if the objects are the same or equal or both null 271 */ 272 protected boolean equals(Object left, Object right) { 273 return left == right || (left != null && left.equals(right)); 274 } 275 276 /*** 277 * Initializes another message with current values from this instance 278 * 279 * @param other the other ActiveMQMessage to initialize 280 */ 281 protected void initializeOther(AbstractPacket other) { 282 initializeBrokersVisited(); 283 other.id = this.id; 284 other.receiptRequired = this.receiptRequired; 285 CopyOnWriteArraySet set = this.brokersVisited; 286 if (set != null && !set.isEmpty()){ 287 other.brokersVisited = new CopyOnWriteArraySet(set); 288 } 289 } 290 291 synchronized void initializeBrokersVisited(){ 292 if (this.brokersVisited == null){ 293 this.brokersVisited = new CopyOnWriteArraySet(); 294 } 295 } 296 297 /*** 298 * @return Returns the brokersVisited. 299 */ 300 Object[] getBrokersVisited() { 301 if (brokersVisited == null || brokersVisited.isEmpty()){ 302 return null; 303 } 304 return brokersVisited.toArray(); 305 } 306 307 /*** 308 * @return Returns the bitArray. 309 */ 310 public BitArray getBitArray() { 311 return bitArray; 312 } 313 /*** 314 * @param bitArray The bitArray to set. 315 */ 316 public void setBitArray(BitArray bitArray) { 317 this.bitArray = bitArray; 318 } 319 }