1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message.util;
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.List;
23 import javax.jms.JMSException;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.codehaus.activemq.message.DefaultWireFormat;
27 import org.codehaus.activemq.message.Packet;
28 import org.codehaus.activemq.message.WireFormat;
29 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
30
31 /***
32 * Implements a controlled thread safe queue, with Packets being spooled to disk for reading asynchronously.
33 */
34 public class SpooledBoundedPacketQueue implements BoundedPacketQueue {
35 private String name;
36 private DataContainer container;
37 private WireFormat wireFormat;
38 private long maxDataLength;
39 private boolean closed;
40 private boolean stopped;
41 private SynchronizedInt size = new SynchronizedInt(0);
42 private Object inLock = new Object();
43 private Object outLock = new Object();
44 private static int WAIT_TIMEOUT = 250;
45 private static final Log log = LogFactory.getLog(SpooledBoundedPacketQueue.class);
46
47 /***
48 * Constructor for SpooledBoundedPacketQueue
49 *
50 * @param dir
51 * @param name
52 * @param maxDataLength
53 * @param maxBlockSize
54 * @throws IOException
55 */
56 public SpooledBoundedPacketQueue(File dir, String name, long maxDataLength, int maxBlockSize) throws IOException {
57
58 char[] chars = name.toCharArray();
59 for (int i = 0;i < chars.length;i++) {
60 if (!Character.isLetterOrDigit(chars[i])) {
61 chars[i] = '_';
62 }
63 }
64 this.name = new String(chars);
65 this.maxDataLength = maxDataLength;
66 this.wireFormat = new DefaultWireFormat();
67 this.container = new DataContainer(dir, this.name, maxBlockSize);
68
69 this.container.deleteAll();
70 }
71
72 /***
73 * Constructor for SpooledBoundedPacketQueue
74 * @param dir
75 * @param name
76 * @throws IOException
77 */
78 public SpooledBoundedPacketQueue(File dir,String name) throws IOException{
79 this(dir,name,1024 * 1024 * 64,8192);
80 }
81
82 /***
83 * Place a Packet at the head of the Queue
84 *
85 * @param packet
86 * @throws JMSException
87 */
88 public void enqueue(Packet packet) throws JMSException {
89 if (!isFull()) {
90 enqueueNoBlock(packet);
91 }
92 else {
93 synchronized (inLock) {
94 try {
95 while (isFull()) {
96 inLock.wait(WAIT_TIMEOUT);
97 }
98 }
99 catch (InterruptedException ie) {
100 }
101 }
102 enqueueNoBlock(packet);
103 }
104 }
105
106 /***
107 * Enqueue a Packet without checking usage limits
108 *
109 * @param packet
110 * @throws JMSException
111 */
112 public void enqueueNoBlock(Packet packet) throws JMSException {
113 byte[] data;
114 try {
115 data = wireFormat.toBytes(packet);
116 size.increment();
117 container.write(data);
118 }
119 catch (IOException e) {
120 JMSException jmsEx = new JMSException("toBytes failed");
121 jmsEx.setLinkedException(e);
122 throw jmsEx;
123 }
124 synchronized (outLock) {
125 outLock.notify();
126 }
127 }
128
129 /***
130 * @return the first dequeued Packet or blocks until one is available
131 * @throws JMSException
132 * @throws InterruptedException
133 */
134 public Packet dequeue() throws JMSException, InterruptedException {
135 Packet result = null;
136 synchronized (outLock) {
137 while ((result = dequeueNoWait()) == null) {
138 outLock.wait(WAIT_TIMEOUT);
139 }
140 }
141 return result;
142 }
143
144 /***
145 * @return the Packet from the head of the Queue or null if the Queue is empty
146 * @param timeInMillis maximum time to wait to dequeue a Packet
147 * @throws JMSException
148 * @throws InterruptedException
149 */
150 public Packet dequeue(long timeInMillis) throws JMSException, InterruptedException {
151 Packet result = dequeueNoWait();
152 if (result == null) {
153 synchronized (outLock) {
154 outLock.wait(timeInMillis);
155 result = dequeueNoWait();
156 }
157 }
158 return result;
159 }
160
161 /***
162 * @return the Packet from the head of the Queue or null if the Queue is empty
163 * @throws JMSException
164 * @throws InterruptedException
165 */
166 public Packet dequeueNoWait() throws JMSException, InterruptedException {
167 Packet result = null;
168 if (stopped) {
169 synchronized (outLock) {
170 while (stopped && !closed) {
171 outLock.wait(WAIT_TIMEOUT);
172 }
173 }
174 }
175 byte[] data;
176 try {
177 data = container.read();
178 if (data != null) {
179 result = wireFormat.fromBytes(data);
180 size.decrement();
181 }
182 }
183 catch (IOException e) {
184 JMSException jmsEx = new JMSException("fromBytes failed");
185 jmsEx.setLinkedException(e);
186 throw jmsEx;
187 }
188 if (result != null && !isFull()) {
189 synchronized (inLock) {
190 inLock.notify();
191 }
192 }
193 return result;
194 }
195
196 /***
197 * @return true if this queue has reached it's data length limit
198 */
199 public boolean isFull() {
200 return container.length() >= maxDataLength;
201 }
202
203 /***
204 * close this queue
205 */
206 public void close() {
207 try {
208 closed = true;
209 container.close();
210 }
211 catch (IOException ioe) {
212 log.warn("Couldn't close queue", ioe);
213 }
214 }
215
216 /***
217 * @return the name of this BoundedPacketQueue
218 */
219 public String getName() {
220 return name;
221 }
222
223 /***
224 * @return number of Packets held by this queue
225 */
226 public int size() {
227 return size.get();
228 }
229
230 /***
231 * @return true if the queue is enabled for dequeing (default = true)
232 */
233 public boolean isStarted() {
234 return stopped == false;
235 }
236
237 /***
238 * disable dequeueing
239 */
240 public void stop() {
241 synchronized (outLock) {
242 stopped = true;
243 }
244 }
245
246 /***
247 * enable dequeueing
248 */
249 public void start() {
250 stopped = false;
251 synchronized (outLock) {
252 outLock.notifyAll();
253 }
254 synchronized (inLock) {
255 inLock.notifyAll();
256 }
257 }
258
259 /***
260 * @return true if this queue is empty
261 */
262 public boolean isEmpty() {
263 return size.get() == 0;
264 }
265
266 /***
267 * clear the queue
268 */
269
270 public void clear() {
271
272 }
273
274 /***
275 * @return a copy of the contents
276 */
277 public List getContents() {
278 return null;
279 }
280 }