View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.journal.impl;
19  
20  import java.nio.ByteBuffer;
21  
22  import EDU.oswego.cs.dl.util.concurrent.Latch;
23  
24  /***
25   * This contains all the data needed to write and force a list of records to a LogFile.
26   * The more records that can be cramed into a single BatchedWrite, the higher throughput
27   * that can be achived by a write and force operation.
28   * 
29   * @version $Revision: 1.2 $
30   */
31  public class BatchedWrite {
32  	
33  	private final Latch writeDoneLatch = new Latch();	
34  	private Record data=null;
35      public Throwable error;
36  	private final ByteBuffer byteBuffer;
37  	private long firstSequenceId=-1;
38  	private long lastSequenceId;
39  	private Mark mark;
40  	
41      private boolean appendDisabled=false;
42      private boolean appendInProgress=false;
43  	
44  	/***
45  	 * @param byteBuffer
46  	 */
47  	public BatchedWrite(ByteBuffer byteBuffer) {
48  		this.byteBuffer = byteBuffer;
49  	}
50  
51      /***
52       * @throws InterruptedException
53       * 
54       */
55      synchronized public void disableAppend() throws InterruptedException {
56          appendDisabled=true;
57          while( appendInProgress ) {
58              wait();
59          }
60      }
61  	
62  	/***
63  	 * @param record
64  	 */
65  	public boolean append(Record record) {
66  	    
67  	    synchronized(this) {
68  	        if( appendDisabled )
69  	            return false;
70  	        appendInProgress=true;
71  	    }
72  	    
73  		record.fill( byteBuffer );
74  		// if we fit the record in this batch
75  		if( record.remaining()==0 ) {
76  			if( firstSequenceId== -1 )
77  				firstSequenceId = record.getHeader().sequenceId;
78  			lastSequenceId = record.getHeader().sequenceId;;
79  			if( record.getMark()!=null )
80  				mark = record.getMark();
81  		}
82  
83  		synchronized(this) {		    
84  	        appendInProgress=false;
85  	        this.notify();
86  	        
87  	        if( appendDisabled )
88  	            return false;
89  	        else
90  	            return byteBuffer.remaining()>0;
91  	    }			
92  	}
93  	
94  	public void waitForForce() throws Throwable {
95  		writeDoneLatch.acquire();
96  		synchronized(this) {
97  		    if( error!=null )
98  		        throw error; 
99  		}
100 	}
101 
102 	public void forced() {
103 		writeDoneLatch.release();
104 	}
105 
106 	public void writeFailed(Throwable error) {
107 		synchronized(this) {
108 		    this.error=error; 
109 		}
110 		writeDoneLatch.release();
111 	}
112 	
113 	/***
114 	 * @return Returns the data.
115 	 */
116 	public Record getData() {
117 		return data;
118 	}
119 	
120 	public ByteBuffer getByteBuffer() {
121 		return byteBuffer;
122 	}
123 
124 	/***
125 	 * @return
126 	 */
127 	public Mark getMark() {
128 		return mark;
129 	}
130 
131 	/***
132 	 * @return
133 	 */
134 	public long getLastSequenceId() {
135 		return lastSequenceId;
136 	}
137 
138 	/***
139 	 * @return
140 	 */
141 	public long getFirstSequenceId() {
142 		return firstSequenceId;
143 	}
144 
145 
146 }