View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.journal.impl;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.reflect.InvocationTargetException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.codehaus.activemq.journal.InvalidRecordLocationException;
28  import org.codehaus.activemq.journal.JournalEventListener;
29  import org.codehaus.activemq.util.LongSequenceGenerator;
30  
31  import EDU.oswego.cs.dl.util.concurrent.FutureResult;
32  import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
33  import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
34  
35  /***
36   * The LogFileManager manages concurent access to a LogFile.
37   * 
38   * @version $Revision: 1.7 $
39   */
40  public class LogFileManager {
41      
42      static final private Log log = LogFactory.getLog(LogFileManager.class);
43      
44      static final private int LOG_HEADER_SIZE=512;
45      static final private int OVERFLOW_RENOTIFICATION_DELAY=500;
46  
47  	private final LongSequenceGenerator sequenceGenerator;
48  	private final byte fileManagerId;
49  	private boolean closed=false;
50  	
51  	// The id of the file that has the mark.
52  	private byte markedSegmentIndex=0;
53  	// The id of the current log file that is being filled.
54  	private byte appendSegmentIndex=0;	
55  	// The offset in the current log file that is being filled.
56  	private int appendSegmentOffset=0;	
57  		
58  	// Used to batch writes together.
59  	private BatchedWrite pendingBatchWrite;
60  
61  	private RecordLocationImpl lastMarkedLocation;
62      private LogFile file;
63      private QueuedExecutor executor;
64  
65      private int rolloverFence;
66      private JournalEventListener eventListener;
67  	private ByteBufferPool byteBufferPool;
68  	
69  	private long overflowNotificationTime=System.currentTimeMillis();
70  
71  	/***
72  	 */
73  	public LogFileManager(byte fileManagerId, LongSequenceGenerator sequenceGenerator, File logDirectory) 
74  		throws IOException {
75  	    this( fileManagerId, sequenceGenerator, new LogFile(logDirectory) );
76  	}
77  	
78  	/***
79  	 */
80  	public LogFileManager(byte fileManagerId, LongSequenceGenerator sequenceGenerator, LogFile logFile) {
81  		this.fileManagerId = fileManagerId;
82  		this.sequenceGenerator = sequenceGenerator;
83  	    this.file = logFile;		
84  	    this.byteBufferPool = new ByteBufferPool();
85  		this.executor = new QueuedExecutor();	
86  		this.executor.setThreadFactory(new ThreadFactory(){
87  			public Thread newThread(Runnable runnable) {
88  				Thread answer = new Thread(runnable, "Journal Writter");
89  				answer.setPriority(Thread.MAX_PRIORITY);
90  				answer.setDaemon(true);
91  				return answer;
92  			}
93  		});
94  		
95  		
96  		lastMarkedLocation = file.getLastMarkedRecordLocation(fileManagerId);
97  		appendSegmentIndex = file.getAppendSegmentIndex();
98  		appendSegmentOffset = file.getAppendSegmentOffset();
99  		rolloverFence = (file.getInitialSegmentSize()/10)*9;
100 	}
101 
102 	public RecordLocationImpl write(byte[] data, boolean sync) throws IOException {
103 	    return write(LogFile.DATA_RECORD_TYPE, data, sync, null);
104 	}
105 		
106 	private RecordLocationImpl write(byte recordType, byte[] data, boolean sync, Mark mark) throws IOException {
107 		try {
108 			RecordLocationImpl location;
109 			BatchedWrite writeCommand;
110 			synchronized( this ) {			
111 				if(closed) {
112 					throw new IOException("Journal has been closed.");
113 				}
114 				
115 				// Create our record
116 				long sequenceId = sequenceGenerator.getNextSequenceId();			
117 				location = new RecordLocationImpl(this.fileManagerId, appendSegmentIndex, appendSegmentOffset, sequenceId);			
118 				Record record = new Record(sequenceId, recordType, data, mark );			
119 				// Piggy back the packet on the pending write batch.
120 				writeCommand = addToPendingWriteBatch(record);
121 				
122 				// Update where the next record will land.
123 				appendSegmentOffset += data.length+Record.RECORD_BASE_SIZE;			
124 				rolloverCheck();			
125 			}
126 			
127 			if( sync ) {
128 				writeCommand.waitForForce();
129 			}
130 			
131 			return location;
132 		} catch (IOException e) {
133 			throw e;
134 		} catch (InterruptedException e) {
135 			throw (IOException)new InterruptedIOException().initCause(e);
136 		} catch (Throwable e) {
137 			throw (IOException)new IOException("Write failed: "+e).initCause(e);
138 		}
139 	}
140 	
141     /***
142 	 * @param record
143 	 * @return
144 	 * @throws InterruptedException
145 	 */
146 	private BatchedWrite addToPendingWriteBatch(Record record) throws InterruptedException {
147 		
148 		// Load the write batch up with data from our record.
149 		// it may take more than one write batch if the record is large.
150 	    BatchedWrite answer=null;
151 		while( record.remaining()>0 ) {
152 			// Do we need another BatchWrite?
153 			if( pendingBatchWrite==null ) {
154 				final BatchedWrite write = new BatchedWrite(byteBufferPool.getByteBuffer());
155 				pendingBatchWrite = write;
156 				executor.execute(new Runnable() {
157                     public void run() {
158                         try {
159                             queuedWrite(write);
160                         } catch (InterruptedException e) {
161                         }
162                     } 
163 				});
164 			}
165 			answer = pendingBatchWrite;
166 			// Can we continue to use the pendingBatchWrite?  
167 			if( !pendingBatchWrite.append(record) ) {
168 			    pendingBatchWrite = null;
169 			}
170 		}	
171 		return answer;
172 		
173 	}
174 
175 	/***
176 	 * This is a blocking call 
177 	 * @param write
178 	 * @throws InterruptedException
179 	 */
180 	private void queuedWrite(BatchedWrite write) throws InterruptedException {
181 		
182 	    // Stop other threads from appending more pendingBatchWrite.
183 		write.disableAppend();
184 		
185 		// Do the write.
186 		try {			
187 			file.appendAndForce(write);
188 			write.forced();
189 		} catch (Throwable e) {
190 		    write.writeFailed(e);
191 		} finally {
192 			byteBufferPool.returnByteBuffer(write.getByteBuffer());
193 		}
194 	}
195 
196 	/***
197      * 
198      */
199     private void rolloverCheck() throws IOException {
200 
201 		// See if we need to issue an overflow notification.
202         if( eventListener!=null 
203         		&& !file.canActivateNextSegment() 
204 				&& overflowNotificationTime+OVERFLOW_RENOTIFICATION_DELAY < System.currentTimeMillis() ) { 
205 
206     		// We need to send an overflow notification to at free up 
207             // some segments.                
208         	RecordLocationImpl safeSpot = file.getFirstRecordLocationOfSecondActiveSegment(fileManagerId);					
209             eventListener.overflowNotification(safeSpot);
210 			overflowNotificationTime = System.currentTimeMillis();                    
211         }
212     	
213     	// Is it time to rollover and can we rollover? 
214         if( appendSegmentOffset > rolloverFence && file.canActivateNextSegment() ) {
215         	
216         	// don't delay the next overflow notification.
217         	overflowNotificationTime -= OVERFLOW_RENOTIFICATION_DELAY;
218         		
219     	    final FutureResult result = new FutureResult();
220     	    try {
221     	        executor.execute(new Runnable() {
222     	            public void run() {
223     	                try {
224     	                    result.set( queuedActivateNextSegment() );                    
225     	                } catch (Throwable e) {
226     	                    result.setException(e);
227     	                }
228     	            }            
229     		    });
230                 appendSegmentIndex = ((Byte)result.get()).byteValue(); 
231                 appendSegmentOffset = Segment.SEGMENT_HEADER_SIZE;
232                 
233             } catch (InterruptedException e) {
234                 throw (IOException)new IOException("Interrupted.").initCause(e);
235             } catch (InvocationTargetException e) {
236                 if( e.getTargetException() instanceof IOException)
237                     throw (IOException)new IOException(e.getTargetException().getMessage()).initCause(e.getTargetException());
238                 throw (IOException)new IOException("Unexpected Exception: ").initCause(e.getTargetException());
239             }	    	
240         }
241     }
242 
243     /***
244 	 * This is a blocking call 
245 	 */
246 	private Byte queuedActivateNextSegment() throws IOException {
247 		file.activateNextSegment();
248 	    return new Byte(file.getAppendSegmentIndex());
249 	}
250 
251 
252 	/***
253 	 * @param recordLocator
254 	 * @param force
255 	 * @return
256 	 * @throws InvalidRecordLocationException
257 	 * @throws IOException
258 	 * @throws InterruptedException
259 	 */
260 	synchronized public void setMark(final RecordLocationImpl recordLocator, boolean force) throws InvalidRecordLocationException, InterruptedException, IOException {
261 		if( recordLocator==null ) 
262 	        throw new InvalidRecordLocationException("The location cannot be null.");	    	    
263 	    if( lastMarkedLocation!=null && recordLocator.compareTo(lastMarkedLocation)<0 ) 
264 	        throw new InvalidRecordLocationException("The location is less than the last mark.");	    	    
265 	    lastMarkedLocation = recordLocator;
266 	    Mark mark = new Mark(recordLocator);
267 	    byte data[] = mark.writeExternal(); 
268 	    write(LogFile.MARK_RECORD_TYPE,data,force,mark);
269 	}
270 
271 	/***
272 	 * @return
273 	 */
274 	public RecordLocationImpl getMark() {
275 	    return lastMarkedLocation;
276 	}
277 
278 	/***
279 	 * @param lastLocation
280 	 * @return
281 	 * @throws IOException
282 	 * @throws InvalidRecordLocationException
283 	 */
284 	public RecordLocationImpl getNextRecordLocation(final RecordLocationImpl lastLocation) throws IOException, InvalidRecordLocationException {
285 		if( lastLocation==null ) {
286 			if( lastMarkedLocation!=null) {
287 				return lastMarkedLocation;
288 			} else {
289                 byte safeSeg = file.getFirstActiveSegmentIndex();
290 				try {
291 					return file.readRecordLocation( new RecordLocationImpl(fileManagerId, safeSeg, Segment.SEGMENT_HEADER_SIZE));
292 				} catch (InvalidRecordLocationException e1) {
293 					return null;
294 				}
295 			}
296 		}
297 		
298 	    // Run this in the queued executor thread.
299 	    final FutureResult result = new FutureResult();
300 	    try {
301 	        executor.execute(new Runnable() {
302 	            public void run() {
303 	                try {
304 	                    result.set( queuedGetNextRecordLocation(lastLocation) );                    
305 	                } catch (Throwable e) {
306 	                    result.setException(e);
307 	                }
308 	            }            
309 		    });	        
310             return (RecordLocationImpl)result.get();
311         } catch (InterruptedException e) {
312             throw (IOException)new IOException("Interrupted.").initCause(e);
313         } catch (InvocationTargetException e) {
314             if( e.getTargetException() instanceof InvalidRecordLocationException)
315                 throw new InvalidRecordLocationException(e.getTargetException().getMessage(),e.getTargetException());
316             if( e.getTargetException() instanceof IOException)
317                 throw (IOException)new IOException(e.getTargetException().getMessage()).initCause(e.getTargetException());
318             throw (IOException)new IOException("Unexpected Exception: ").initCause(e.getTargetException());
319         }	    	
320 	}	
321 
322 	private RecordLocationImpl queuedGetNextRecordLocation(RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
323 		return file.getNextDataRecordLocation(location);
324 	}
325 
326 	/***
327 	 * @param location
328 	 * @return
329 	 * @throws InvalidRecordLocationException
330 	 * @throws IOException
331 	 */
332 	public byte[] read(final RecordLocationImpl location) throws IOException, InvalidRecordLocationException {	    
333 	    // Run this in the queued executor thread.
334 	    final FutureResult result = new FutureResult();	    
335 	    try {
336 	        executor.execute(new Runnable() {
337 	            public void run() {
338 	                try {
339 	                    result.set( queuedRead(location) );                    
340 	                } catch (Throwable e) {
341 	                    result.setException(e);
342 	                }
343 	            }
344 		    });
345             return (byte[])result.get();
346         } catch (InterruptedException e) {
347             throw (IOException)new IOException("Interrupted.").initCause(e);
348         } catch (InvocationTargetException e) {
349             if( e.getTargetException() instanceof InvalidRecordLocationException)
350                 throw new InvalidRecordLocationException(e.getTargetException().getMessage(),e.getTargetException());
351             if( e.getTargetException() instanceof IOException)
352                 throw (IOException)new IOException(e.getTargetException().getMessage()).initCause(e.getTargetException());
353             throw (IOException)new IOException("Unexpected Exception: ").initCause(e.getTargetException());
354         }	    	
355 	}
356 	
357     private byte[] queuedRead(RecordLocationImpl newLocation) throws IOException, InvalidRecordLocationException {
358 		
359 	    int segmentIndex;
360 	    int segmentOffset;
361 	    if( newLocation==null ) {
362 	        segmentIndex=markedSegmentIndex;
363 	    	segmentOffset=Segment.SEGMENT_HEADER_SIZE;
364 		} else {
365 	        segmentIndex=newLocation.getSegmentIndex();
366 	    	segmentOffset=newLocation.getSegmentOffset();
367 		}
368 	    
369 		return file.readData(segmentIndex,segmentOffset);
370 	}
371 
372     /***
373      * @param eventListener
374      */
375     public void setJournalEventListener(JournalEventListener eventListener) {
376         this.eventListener = eventListener;
377     }
378 	
379 	/***
380 	 * @throws InterruptedException
381 	 * 
382 	 */
383 	public void close() {
384 		if(closed) 
385 			return;			
386 		executor.shutdownAfterProcessingCurrentlyQueuedTasks();
387 	    try { file.close(); } catch ( Throwable e ) {}
388 		closed = true;
389 	}
390 
391 	public long getLastSequenceId() {
392 		return file.getLastSequenceId();
393 	}
394 
395 	/***
396 	 * @return
397 	 */
398 	public File getLogDirectory() {
399 		return file.getLogDirectory();
400 	}
401 
402 	public int getTotalSegements() {
403 		return file.getTotalSegements();
404 	}
405 
406 	public int getInitialSegmentSize() {
407 		return file.getInitialSegmentSize();
408 	}
409 }