View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.journal.impl;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.File;
23  import java.io.IOException;
24  import java.io.RandomAccessFile;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.FileChannel;
27  
28  /***
29   * Allows read/append access to a segment of a LogFile. 
30   *  
31   * @version $Revision: 1.2 $
32   */
33  public class Segment {
34      static final public int SEGMENT_HEADER_SIZE=0x0100;
35      
36      /*** The File that holds the log data. */
37      private File file;     				    
38      /*** Prefered size.  The size that the segment file is set to when initilaized. */
39      private final int initialSize;
40      /*** The index of this Segment */
41      private final byte index;
42      /*** The id of the log file. */
43      private int id;   				    
44      /*** The sequenceId of the first record */
45      private long firstSequenceId;		
46      /*** The sequenceId of the last record */
47      private long lastSequenceId;  		
48      /*** Does it have live records in it? */
49      private boolean active=false;       
50      /*** The location of the mark */
51      private final Mark lastMark = new Mark();
52      /*** The next free offset in the file. */
53      private int appendOffset=0;   		
54      /*** Used to access the file */
55      private RandomAccessFile randomAccessFile;
56      /*** Used to know where we seeked to last */
57      //private int seekOffset;
58      /*** Is the segment in readonly mode */
59      private boolean readOnly;
60  	private FileChannel fileChannel;
61  
62  
63      public Segment(File file, int initialSize, byte index) throws IOException {
64          this.file = file;
65          this.initialSize = initialSize;
66          this.index = index;
67          boolean initializationNeeeded = !file.exists();
68          randomAccessFile = new RandomAccessFile(file, "rw");
69          randomAccessFile.seek(0);
70          fileChannel = randomAccessFile.getChannel();
71          if( initializationNeeeded ) {
72              reinitialize();
73          } 
74          loadState();
75      }
76      
77      public void seek( int newPos ) throws IOException {
78      	randomAccessFile.seek(newPos);
79      }
80  
81      private void loadState() throws IOException {
82          seek(0);
83          try {
84              // It's quicker to load that state from the header..
85              // but it might not be consistent.  Detect for failure.
86              read(randomAccessFile);
87              int firstCode = hashCode();
88              read(randomAccessFile);
89                          
90              if( firstCode==hashCode() && ( readOnly || !active ) ) {
91                  // Since the header is consistent,
92                  // and this is inactive or read only segement,
93                  // there is no need to restore state from the log.
94                  return;
95              }                
96          } catch (IOException e) {
97          }
98          
99          // We need to build the segment state by reading the log.
100         firstSequenceId=-1;
101         lastSequenceId=-1;
102         readOnly=false;
103         active=true; 
104         
105         int offset=SEGMENT_HEADER_SIZE;
106         long fileSize = randomAccessFile.length();
107         RecordHeader header = new RecordHeader();
108         RecordFooter footer = new RecordFooter();
109         while( (offset+Record.RECORD_BASE_SIZE) < fileSize) {
110             
111         	// Read the next header
112         	seek(offset);
113             try { 
114                 header.readRecordHeader(randomAccessFile);
115             } catch (IOException e) {
116                 break;
117             }
118             
119             // Validate the header..
120             if( !header.isValid() )
121                 break;
122             if( firstSequenceId==-1 ) {
123                 firstSequenceId = header.sequenceId;
124             } else {
125                 // Make sure the sequence is incrementing.
126                 if( lastSequenceId >= header.sequenceId ) {
127                     break;
128                 }
129             }            
130             // Validate the whole record is in the segment.
131             if( offset+header.length+Record.RECORD_BASE_SIZE > fileSize) {
132             	break;
133             }
134             
135             // Should we load the record's data?
136             byte data[]=null;
137             if( RecordFooter.isChecksumingEnabled() || header.recordType==LogFile.MARK_RECORD_TYPE ) {
138             	data = new byte[header.length];
139             	randomAccessFile.readFully(data);
140             }
141             
142             // Load the footer.
143             seek(offset+header.length+RecordHeader.RECORD_HEADER_SIZE);
144             try { 
145                 footer.readRecordFooter(randomAccessFile);
146             } catch (IOException e) {
147                 break;
148             }
149             
150             // Validate the footer..
151             if( !footer.matches(header) )
152                 break;            
153             if( RecordFooter.isChecksumingEnabled() ) {
154                 if( !footer.matches(data) )
155                     break;          	
156             }
157 
158             // read the mark data in if it was a mark record.
159             if(header.recordType==LogFile.MARK_RECORD_TYPE) {
160                 lastMark.readExternal(data);
161             }
162                 
163             lastSequenceId = header.sequenceId;	  
164             offset += header.length+Record.RECORD_BASE_SIZE;
165             appendOffset = offset;
166         }
167         
168         // Now that we have the log state, store it so that next time we come up 
169         // we don't have to read the log to get a consistent state.        
170         storeState();            
171     }
172 
173     private void resize() throws IOException {
174         randomAccessFile.setLength(initialSize);
175     }
176     
177     private void storeState() throws IOException {
178         int restorePos=getSeekPos();
179         // We store the data 2 times just in case we die in the
180         // middle of one of the writes.  The hashCode will
181         // help us figure out if the written data is valid.
182         seek(0);               
183         writeHeader(randomAccessFile);
184         writeHeader(randomAccessFile);
185         force();
186         seek(restorePos);
187     }
188     
189     private int getSeekPos() throws IOException {
190 		return (int) randomAccessFile.getFilePointer();
191 	}
192 
193 	public void force() throws IOException {
194         fileChannel.force(false);
195     }
196     
197     public void reinitialize() throws IOException {
198         this.readOnly=true;
199         this.active=false;
200         this.firstSequenceId=-1;
201         this.appendOffset=-1;
202         this.id=-1;
203         this.lastSequenceId=-1;
204         this.lastMark.offsetId=-1;
205         this.lastMark.sequenceId=-1;
206         this.appendOffset=SEGMENT_HEADER_SIZE;
207         resize();
208         storeState();
209     }
210 
211     public void setReadOnly(boolean enable) throws IOException {
212         if( !active )
213             throw new IllegalStateException("Not Active.");
214         
215         this.readOnly=enable;
216         storeState();
217     }
218 
219     public void activate(int id) throws IOException {
220         if( active )
221             throw new IllegalStateException("Allready Active.");
222         
223         this.id=id;
224         this.readOnly=false;
225         this.active=true;
226         this.appendOffset=SEGMENT_HEADER_SIZE;
227         lastSequenceId=firstSequenceId=-1;
228         storeState();
229         seek(appendOffset);
230     }
231 
232     public int hashCode() {
233         int rc = id;
234         rc ^= appendOffset;
235 		rc ^= (int) (0xFFFFFFFF & firstSequenceId);
236 		rc ^= (int) (0xFFFFFFFF & (firstSequenceId>>4) );
237 		rc ^= (int) (0xFFFFFFFF & lastSequenceId);
238 		rc ^= (int) (0xFFFFFFFF & (lastSequenceId>>4) );
239 		if( active )
240 		    rc ^= 0xABD01212;
241 		if( readOnly )
242 		    rc ^= 0x1F8ADC21;
243 		return rc;
244     }
245 
246     private void writeHeader(DataOutput f) throws IOException {
247         f.writeInt(id);
248         f.writeLong(firstSequenceId);
249         f.writeLong(lastSequenceId);
250         f.writeBoolean(active);
251         f.writeBoolean(readOnly);
252         f.writeLong(lastMark.sequenceId);
253         f.writeInt(lastMark.offsetId);
254         f.writeInt(appendOffset);
255         f.writeInt(hashCode());
256     }
257     
258     private void read(DataInput f) throws IOException {
259         id = f.readInt();
260         firstSequenceId = f.readLong();
261         lastSequenceId = f.readLong();
262         active = f.readBoolean();
263         readOnly = f.readBoolean();
264         lastMark.sequenceId = f.readLong();
265         lastMark.offsetId = f.readInt();
266         appendOffset = f.readInt();
267         
268         // Validate the state was consistent with the hashcode 
269         if( f.readInt()!=hashCode())
270             throw new IOException();            
271     }
272 
273     public int getId() {
274         return id;
275     }
276     public boolean isActive() {
277         return active;
278     }
279     public long getFirstSequenceId() {
280         return firstSequenceId;
281     }
282     public Mark getLastMark() {
283         return lastMark;
284     }
285     public long getLastSequenceId() {
286         return lastSequenceId;
287     }
288     public int getAppendOffset() {
289         return appendOffset;
290     }
291     
292     public boolean isAtAppendOffset() throws IOException {
293         return appendOffset==getSeekPos();
294     }
295     
296 	/***
297 	 * @param write
298 	 * @throws IOException
299 	 */
300 	public void write(BatchedWrite write) throws IOException {
301 		
302 		ByteBuffer byteBuffer = write.getByteBuffer();
303 		byteBuffer.flip();
304 		
305 		write(byteBuffer);
306 		
307 		if( firstSequenceId==-1 )
308             firstSequenceId=write.getFirstSequenceId();
309         lastSequenceId=write.getLastSequenceId();
310         
311 	}
312 	
313     public void write(ByteBuffer byteBuffer) throws IOException {
314         if( !active )
315             throw new IllegalStateException("Segment is not active.  Writes are not allowed");
316         if( readOnly )
317             throw new IllegalStateException("Segment has been marked Read Only.  Writes are not allowed");
318         if( !isAtAppendOffset() )
319             throw new IllegalStateException("Segment not at append location.");
320         
321         int writeLength = byteBuffer.remaining();
322         while( byteBuffer.remaining() > 0 ) {
323         	fileChannel.write(byteBuffer);
324         }
325         
326         // Keep track of where we are at.
327         appendOffset+=writeLength;        
328 	}
329 
330     public void readRecordHeader(RecordHeader seekedRecordHeader) throws IOException {
331         seekedRecordHeader.readRecordHeader(randomAccessFile);
332     }
333     
334     public void close() throws IOException {
335         this.randomAccessFile.close();
336     }
337 
338     public void read(byte[] answer) throws IOException {
339         randomAccessFile.readFully(answer);        
340     }
341 
342     /***
343      * @return
344      */
345     public boolean isReadOnly() {
346         return readOnly;
347     }
348 
349     public byte getIndex() {
350         return index;
351     }
352     
353 	public String toString() {
354 		return "Segment:"+index+":"+id;
355 	}
356 
357 	/***
358 	 * @return
359 	 */
360 	public RecordLocationImpl getFirstRecordLocation(byte fm) {
361 		return new RecordLocationImpl(fm, index, SEGMENT_HEADER_SIZE, firstSequenceId);
362 	}
363 
364 }