View Javadoc

1   package org.codehaus.activemq.ra;
2   
3   import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
4   
5   /***
6    */
7   public class CircularQueue {
8   
9       private final int size;
10  
11      private final SynchronizedBoolean stopping;
12  
13  
14      // For pooling objects
15      private final Object[] contents;
16      final private Object mutex = new Object();
17      //where the next worker to be supplied currently is.
18      private int start=0;
19      //where the next worker to be inserted will go
20      private int end=0;
21  
22      public CircularQueue(int size, SynchronizedBoolean stopping) {
23          this.size = size;
24          contents = new Object[size];
25          this.stopping = stopping;
26      }
27  
28      public Object get() {
29      	synchronized(mutex) {
30      		while( true ) {
31  	    			Object ew = contents[start];
32                  if (ew != null) {
33                      start++;
34  	    			if(start == contents.length) {
35  	    				start=0;
36  	    			}
37  	    			return ew;
38  	    		} else {
39  	    			try {
40  						mutex.wait();
41  						if(stopping.get()) {
42  							return null;
43  						}
44  					} catch (InterruptedException e) {
45  						return null;
46  					}
47  	    		}
48      		}
49      	}
50      }
51  
52      public void returnObject(Object worker) {
53      	synchronized(mutex) {
54      		contents[end++] = worker;
55              if( end == contents.length) {
56                  end=0;
57              }
58      		mutex.notify();
59      	}
60      }
61  
62      public int size() {
63          return contents.length;
64      }
65  
66      public void drain() {
67          int i = 0;
68          while (i < size) {
69              if (get() != null) {
70                  i++;
71              }
72          }
73      }
74  
75  
76      public void notifyWaiting() {
77          synchronized(mutex) {
78      		mutex.notifyAll();
79      	}
80      }
81  
82  }