1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal.impl;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.reflect.InvocationTargetException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.codehaus.activemq.journal.InvalidRecordLocationException;
28 import org.codehaus.activemq.journal.JournalEventListener;
29 import org.codehaus.activemq.util.LongSequenceGenerator;
30
31 import EDU.oswego.cs.dl.util.concurrent.FutureResult;
32 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
33 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
34
35 /***
36 * The LogFileManager manages concurent access to a LogFile.
37 *
38 * @version $Revision: 1.7 $
39 */
40 public class LogFileManager {
41
42 static final private Log log = LogFactory.getLog(LogFileManager.class);
43
44 static final private int LOG_HEADER_SIZE=512;
45 static final private int OVERFLOW_RENOTIFICATION_DELAY=500;
46
47 private final LongSequenceGenerator sequenceGenerator;
48 private final byte fileManagerId;
49 private boolean closed=false;
50
51
52 private byte markedSegmentIndex=0;
53
54 private byte appendSegmentIndex=0;
55
56 private int appendSegmentOffset=0;
57
58
59 private BatchedWrite pendingBatchWrite;
60
61 private RecordLocationImpl lastMarkedLocation;
62 private LogFile file;
63 private QueuedExecutor executor;
64
65 private int rolloverFence;
66 private JournalEventListener eventListener;
67 private ByteBufferPool byteBufferPool;
68
69 private long overflowNotificationTime=System.currentTimeMillis();
70
71 /***
72 */
73 public LogFileManager(byte fileManagerId, LongSequenceGenerator sequenceGenerator, File logDirectory)
74 throws IOException {
75 this( fileManagerId, sequenceGenerator, new LogFile(logDirectory) );
76 }
77
78 /***
79 */
80 public LogFileManager(byte fileManagerId, LongSequenceGenerator sequenceGenerator, LogFile logFile) {
81 this.fileManagerId = fileManagerId;
82 this.sequenceGenerator = sequenceGenerator;
83 this.file = logFile;
84 this.byteBufferPool = new ByteBufferPool();
85 this.executor = new QueuedExecutor();
86 this.executor.setThreadFactory(new ThreadFactory(){
87 public Thread newThread(Runnable runnable) {
88 Thread answer = new Thread(runnable, "Journal Writter");
89 answer.setPriority(Thread.MAX_PRIORITY);
90 answer.setDaemon(true);
91 return answer;
92 }
93 });
94
95
96 lastMarkedLocation = file.getLastMarkedRecordLocation(fileManagerId);
97 appendSegmentIndex = file.getAppendSegmentIndex();
98 appendSegmentOffset = file.getAppendSegmentOffset();
99 rolloverFence = (file.getInitialSegmentSize()/10)*9;
100 }
101
102 public RecordLocationImpl write(byte[] data, boolean sync) throws IOException {
103 return write(LogFile.DATA_RECORD_TYPE, data, sync, null);
104 }
105
106 private RecordLocationImpl write(byte recordType, byte[] data, boolean sync, Mark mark) throws IOException {
107 try {
108 RecordLocationImpl location;
109 BatchedWrite writeCommand;
110 synchronized( this ) {
111 if(closed) {
112 throw new IOException("Journal has been closed.");
113 }
114
115
116 long sequenceId = sequenceGenerator.getNextSequenceId();
117 location = new RecordLocationImpl(this.fileManagerId, appendSegmentIndex, appendSegmentOffset, sequenceId);
118 Record record = new Record(sequenceId, recordType, data, mark );
119
120 writeCommand = addToPendingWriteBatch(record);
121
122
123 appendSegmentOffset += data.length+Record.RECORD_BASE_SIZE;
124 rolloverCheck();
125 }
126
127 if( sync ) {
128 writeCommand.waitForForce();
129 }
130
131 return location;
132 } catch (IOException e) {
133 throw e;
134 } catch (InterruptedException e) {
135 throw (IOException)new InterruptedIOException().initCause(e);
136 } catch (Throwable e) {
137 throw (IOException)new IOException("Write failed: "+e).initCause(e);
138 }
139 }
140
141 /***
142 * @param record
143 * @return
144 * @throws InterruptedException
145 */
146 private BatchedWrite addToPendingWriteBatch(Record record) throws InterruptedException {
147
148
149
150 BatchedWrite answer=null;
151 while( record.remaining()>0 ) {
152
153 if( pendingBatchWrite==null ) {
154 final BatchedWrite write = new BatchedWrite(byteBufferPool.getByteBuffer());
155 pendingBatchWrite = write;
156 executor.execute(new Runnable() {
157 public void run() {
158 try {
159 queuedWrite(write);
160 } catch (InterruptedException e) {
161 }
162 }
163 });
164 }
165 answer = pendingBatchWrite;
166
167 if( !pendingBatchWrite.append(record) ) {
168 pendingBatchWrite = null;
169 }
170 }
171 return answer;
172
173 }
174
175 /***
176 * This is a blocking call
177 * @param write
178 * @throws InterruptedException
179 */
180 private void queuedWrite(BatchedWrite write) throws InterruptedException {
181
182
183 write.disableAppend();
184
185
186 try {
187 file.appendAndForce(write);
188 write.forced();
189 } catch (Throwable e) {
190 write.writeFailed(e);
191 } finally {
192 byteBufferPool.returnByteBuffer(write.getByteBuffer());
193 }
194 }
195
196 /***
197 *
198 */
199 private void rolloverCheck() throws IOException {
200
201
202 if( eventListener!=null
203 && !file.canActivateNextSegment()
204 && overflowNotificationTime+OVERFLOW_RENOTIFICATION_DELAY < System.currentTimeMillis() ) {
205
206
207
208 RecordLocationImpl safeSpot = file.getFirstRecordLocationOfSecondActiveSegment(fileManagerId);
209 eventListener.overflowNotification(safeSpot);
210 overflowNotificationTime = System.currentTimeMillis();
211 }
212
213
214 if( appendSegmentOffset > rolloverFence && file.canActivateNextSegment() ) {
215
216
217 overflowNotificationTime -= OVERFLOW_RENOTIFICATION_DELAY;
218
219 final FutureResult result = new FutureResult();
220 try {
221 executor.execute(new Runnable() {
222 public void run() {
223 try {
224 result.set( queuedActivateNextSegment() );
225 } catch (Throwable e) {
226 result.setException(e);
227 }
228 }
229 });
230 appendSegmentIndex = ((Byte)result.get()).byteValue();
231 appendSegmentOffset = Segment.SEGMENT_HEADER_SIZE;
232
233 } catch (InterruptedException e) {
234 throw (IOException)new IOException("Interrupted.").initCause(e);
235 } catch (InvocationTargetException e) {
236 if( e.getTargetException() instanceof IOException)
237 throw (IOException)new IOException(e.getTargetException().getMessage()).initCause(e.getTargetException());
238 throw (IOException)new IOException("Unexpected Exception: ").initCause(e.getTargetException());
239 }
240 }
241 }
242
243 /***
244 * This is a blocking call
245 */
246 private Byte queuedActivateNextSegment() throws IOException {
247 file.activateNextSegment();
248 return new Byte(file.getAppendSegmentIndex());
249 }
250
251
252 /***
253 * @param recordLocator
254 * @param force
255 * @return
256 * @throws InvalidRecordLocationException
257 * @throws IOException
258 * @throws InterruptedException
259 */
260 synchronized public void setMark(final RecordLocationImpl recordLocator, boolean force) throws InvalidRecordLocationException, InterruptedException, IOException {
261 if( recordLocator==null )
262 throw new InvalidRecordLocationException("The location cannot be null.");
263 if( lastMarkedLocation!=null && recordLocator.compareTo(lastMarkedLocation)<0 )
264 throw new InvalidRecordLocationException("The location is less than the last mark.");
265 lastMarkedLocation = recordLocator;
266 Mark mark = new Mark(recordLocator);
267 byte data[] = mark.writeExternal();
268 write(LogFile.MARK_RECORD_TYPE,data,force,mark);
269 }
270
271 /***
272 * @return
273 */
274 public RecordLocationImpl getMark() {
275 return lastMarkedLocation;
276 }
277
278 /***
279 * @param lastLocation
280 * @return
281 * @throws IOException
282 * @throws InvalidRecordLocationException
283 */
284 public RecordLocationImpl getNextRecordLocation(final RecordLocationImpl lastLocation) throws IOException, InvalidRecordLocationException {
285 if( lastLocation==null ) {
286 if( lastMarkedLocation!=null) {
287 return lastMarkedLocation;
288 } else {
289 byte safeSeg = file.getFirstActiveSegmentIndex();
290 try {
291 return file.readRecordLocation( new RecordLocationImpl(fileManagerId, safeSeg, Segment.SEGMENT_HEADER_SIZE));
292 } catch (InvalidRecordLocationException e1) {
293 return null;
294 }
295 }
296 }
297
298
299 final FutureResult result = new FutureResult();
300 try {
301 executor.execute(new Runnable() {
302 public void run() {
303 try {
304 result.set( queuedGetNextRecordLocation(lastLocation) );
305 } catch (Throwable e) {
306 result.setException(e);
307 }
308 }
309 });
310 return (RecordLocationImpl)result.get();
311 } catch (InterruptedException e) {
312 throw (IOException)new IOException("Interrupted.").initCause(e);
313 } catch (InvocationTargetException e) {
314 if( e.getTargetException() instanceof InvalidRecordLocationException)
315 throw new InvalidRecordLocationException(e.getTargetException().getMessage(),e.getTargetException());
316 if( e.getTargetException() instanceof IOException)
317 throw (IOException)new IOException(e.getTargetException().getMessage()).initCause(e.getTargetException());
318 throw (IOException)new IOException("Unexpected Exception: ").initCause(e.getTargetException());
319 }
320 }
321
322 private RecordLocationImpl queuedGetNextRecordLocation(RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
323 return file.getNextDataRecordLocation(location);
324 }
325
326 /***
327 * @param location
328 * @return
329 * @throws InvalidRecordLocationException
330 * @throws IOException
331 */
332 public byte[] read(final RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
333
334 final FutureResult result = new FutureResult();
335 try {
336 executor.execute(new Runnable() {
337 public void run() {
338 try {
339 result.set( queuedRead(location) );
340 } catch (Throwable e) {
341 result.setException(e);
342 }
343 }
344 });
345 return (byte[])result.get();
346 } catch (InterruptedException e) {
347 throw (IOException)new IOException("Interrupted.").initCause(e);
348 } catch (InvocationTargetException e) {
349 if( e.getTargetException() instanceof InvalidRecordLocationException)
350 throw new InvalidRecordLocationException(e.getTargetException().getMessage(),e.getTargetException());
351 if( e.getTargetException() instanceof IOException)
352 throw (IOException)new IOException(e.getTargetException().getMessage()).initCause(e.getTargetException());
353 throw (IOException)new IOException("Unexpected Exception: ").initCause(e.getTargetException());
354 }
355 }
356
357 private byte[] queuedRead(RecordLocationImpl newLocation) throws IOException, InvalidRecordLocationException {
358
359 int segmentIndex;
360 int segmentOffset;
361 if( newLocation==null ) {
362 segmentIndex=markedSegmentIndex;
363 segmentOffset=Segment.SEGMENT_HEADER_SIZE;
364 } else {
365 segmentIndex=newLocation.getSegmentIndex();
366 segmentOffset=newLocation.getSegmentOffset();
367 }
368
369 return file.readData(segmentIndex,segmentOffset);
370 }
371
372 /***
373 * @param eventListener
374 */
375 public void setJournalEventListener(JournalEventListener eventListener) {
376 this.eventListener = eventListener;
377 }
378
379 /***
380 * @throws InterruptedException
381 *
382 */
383 public void close() {
384 if(closed)
385 return;
386 executor.shutdownAfterProcessingCurrentlyQueuedTasks();
387 try { file.close(); } catch ( Throwable e ) {}
388 closed = true;
389 }
390
391 public long getLastSequenceId() {
392 return file.getLastSequenceId();
393 }
394
395 /***
396 * @return
397 */
398 public File getLogDirectory() {
399 return file.getLogDirectory();
400 }
401
402 public int getTotalSegements() {
403 return file.getTotalSegements();
404 }
405
406 public int getInitialSegmentSize() {
407 return file.getInitialSegmentSize();
408 }
409 }