1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal.impl;
19
20 import org.codehaus.activemq.journal.InvalidRecordLocationException;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.text.NumberFormat;
25 import java.util.Iterator;
26 import java.util.LinkedList;
27
28 /***
29 * Provides a logical view of many seperate files as one single long
30 * log file. The seperate files that compose the LogFile are Segements
31 * of the LogFile.
32 * <p/>
33 * This class is not thread safe.
34 *
35 * @version $Revision: 1.7 $
36 */
37 final public class LogFile {
38
39 static final public byte DATA_RECORD_TYPE = 1;
40 static final public byte MARK_RECORD_TYPE = 2;
41 static final private NumberFormat onlineLogNameFormat = NumberFormat.getNumberInstance();
42
43 static {
44 onlineLogNameFormat.setMinimumIntegerDigits(3);
45 onlineLogNameFormat.setMaximumIntegerDigits(3);
46 onlineLogNameFormat.setGroupingUsed(false);
47 onlineLogNameFormat.setParseIntegerOnly(true);
48 onlineLogNameFormat.setMaximumFractionDigits(0);
49 }
50
51
52 private final File logDirectory;
53 private final int initialSegmentSize;
54 private boolean closed;
55
56
57 private final Segment segments[];
58 private final LinkedList activeSegments = new LinkedList();
59 private final LinkedList inactiveSegments = new LinkedList();;
60
61 private byte markSegment = -1;
62 private final Mark lastMark = new Mark();
63 private byte appendSegment = -1;
64 private int lastSegmentId = -1;
65
66 public LogFile(File logDirectory) throws IOException {
67 this(logDirectory, 4, 1024 * 1024 * 5);
68 }
69
70 public LogFile(File logDirectory, int onlineSegmentCount, int initialSegmentSize) throws IOException {
71 this.logDirectory = logDirectory;
72 this.segments = new Segment[onlineSegmentCount];
73 this.initialSegmentSize = initialSegmentSize;
74 initialize();
75 }
76
77 /***
78 * Creates/Loads the log segments.
79 *
80 * @throws IOException
81 */
82 private void initialize() throws IOException {
83
84
85 if (!logDirectory.exists()) {
86 if (!logDirectory.mkdirs()) {
87 throw new IOException("Could not create directory: " + logDirectory);
88 }
89 }
90
91 byte lastIndex = (byte) (segments.length - 1);
92 int lastSegmentId = 0;
93 Mark mark = null;
94 for (byte i = 0; i < segments.length; i++) {
95
96 segments[i] = new Segment(new File(logDirectory, "log-" + onlineLogNameFormat.format(i) + ".dat"), initialSegmentSize, i);
97
98 if (segments[i].isActive()) {
99 if (segments[i].getLastMark().sequenceId >= 0) {
100 markSegment = i;
101 mark = segments[i].getLastMark();
102 }
103 if (segments[i].getId() > lastSegmentId) {
104 lastSegmentId = segments[i].getId();
105 lastIndex = i;
106 }
107 }
108 }
109
110
111
112 byte i = nextSegmentIndex(lastIndex);
113 while (inactiveSegments.size() + activeSegments.size() < segments.length) {
114 Segment segment = segments[i];
115 if (segment.isActive()) {
116 activeSegments.add(segment);
117 }
118 else {
119 inactiveSegments.add(segment);
120 }
121 i = nextSegmentIndex(i);
122 }
123 if (mark != null) {
124 setMark(mark);
125 }
126
127 if (activeSegments.size() == 0) {
128
129 activateNextSegment();
130 }
131 else {
132
133 Segment lastSegment = (Segment) activeSegments.getLast();
134 lastSegment.setReadOnly(false);
135 appendSegment = lastSegment.getIndex();
136 for (Iterator iter = activeSegments.iterator(); iter.hasNext();) {
137 Segment s = (Segment) iter.next();
138 if (s != lastSegment) {
139 s.setReadOnly(true);
140 }
141 }
142 }
143 if (mark == null) {
144
145 Segment segment = (Segment) activeSegments.getFirst();
146 mark = new Mark();
147 mark.sequenceId = segment.getIndex();
148 mark.offsetId = Segment.SEGMENT_HEADER_SIZE;
149 }
150 lastMark.copy(mark);
151 }
152
153
154 /***
155 * @return
156 */
157 private int getNextSegmentId() {
158 return ++lastSegmentId;
159 }
160
161
162 public void close() throws IOException {
163 if (closed) {
164 return;
165 }
166 this.closed = true;
167 for (int i = 0; i < segments.length; i++) {
168 segments[i].close();
169 }
170 }
171
172 private void setMark(Mark mark) throws IOException {
173 lastMark.copy(mark);
174
175
176 for (Iterator i = activeSegments.iterator(); i.hasNext();) {
177 Segment segment = (Segment) i.next();
178 if (segment.getLastSequenceId() < lastMark.sequenceId) {
179 segment.reinitialize();
180 i.remove();
181 inactiveSegments.add(segment);
182 } else {
183 markSegment = segment.getIndex();
184 break;
185 }
186 }
187 }
188
189 /***
190 * @param write
191 * @throws IOException
192 */
193 public void appendAndForce(BatchedWrite write) throws IOException {
194 Segment segment = segments[appendSegment];
195 segment.seek(segment.getAppendOffset());
196 segment.write(write);
197 if (write.getMark() != null) {
198 setMark(write.getMark());
199 }
200 segment.force();
201 }
202
203
204 public static class RecordInfo {
205 private final RecordLocationImpl location;
206 private final RecordHeader header;
207
208 public RecordInfo(RecordLocationImpl location, RecordHeader header) {
209 this.location = location;
210 this.header = header;
211 }
212
213 int getNextLocation() {
214 return location.getSegmentOffset()+header.length+Record.RECORD_BASE_SIZE;
215 }
216
217 }
218
219 private RecordInfo readRecordInfo(RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
220 if (0 > location.getSegmentIndex() || location.getSegmentIndex() > segments.length) {
221 throw new InvalidRecordLocationException("Invalid segment id.");
222 }
223
224 Segment segment = segments[location.getSegmentIndex()];
225 segment.seek(location.getSegmentOffset());
226
227
228 if (segment.isAtAppendOffset()) {
229 throw new InvalidRecordLocationException("No record at end of log.");
230 }
231
232
233 try {
234 RecordHeader header = new RecordHeader();
235 segment.readRecordHeader(header);
236 return new RecordInfo(location, header);
237 }
238 catch (IOException e) {
239 throw new InvalidRecordLocationException("No record at found.");
240 }
241 }
242
243 /***
244 * @param location
245 * @return
246 * @throws InvalidRecordLocationException
247 * @throws IOException
248 */
249 public RecordLocationImpl readRecordLocation(RecordLocationImpl location) throws IOException, InvalidRecordLocationException {
250 RecordInfo info = readRecordInfo(location);
251 return info.location.setSequence(info.header.sequenceId);
252 }
253
254 /***
255 * @param lastLocation
256 * @return
257 */
258 public RecordLocationImpl getNextDataRecordLocation(RecordLocationImpl lastLocation) throws IOException, InvalidRecordLocationException {
259 RecordInfo ri = readRecordInfo(lastLocation);
260 while (true) {
261 byte segmentIndex = ri.location.getSegmentIndex();
262 int offset = ri.getNextLocation();
263
264
265 if (offset >= segments[segmentIndex].getAppendOffset()) {
266 segmentIndex = nextActiveSegmentIndex(segmentIndex);
267 if (segmentIndex < 0) {
268 return null;
269 }
270 offset = Segment.SEGMENT_HEADER_SIZE;
271 }
272
273 try {
274 ri = readRecordInfo(ri.location.setSegmentIndexAndOffset(segmentIndex, offset));
275 }
276 catch (InvalidRecordLocationException e) {
277 return null;
278 }
279
280
281 if (ri.header.recordType == DATA_RECORD_TYPE) {
282 return ri.location.setSequence(ri.header.sequenceId);
283 }
284
285 }
286 }
287
288 /***
289 * @param i
290 * @return -1 if no next segment is available.
291 */
292 private byte nextActiveSegmentIndex(byte i) {
293 byte rc = nextSegmentIndex(i);
294 return segments[rc].isActive() ? rc : -1;
295 }
296
297 private byte nextSegmentIndex(byte i) {
298 i++;
299 if (i < segments.length) {
300 return i;
301 }
302 return 0;
303 }
304
305 /***
306 * @param segmentIndex
307 * @param segmentOffset
308 * @return
309 * @throws IOException
310 */
311 public byte[] readData(int segmentIndex, int segmentOffset) throws IOException {
312 if (0 > segmentIndex || segmentIndex > segments.length) {
313 return null;
314 }
315
316 Segment segment = segments[segmentIndex];
317 segment.seek(segmentOffset);
318
319
320 if (segment.isAtAppendOffset()) {
321 return null;
322 }
323
324
325 RecordHeader header = new RecordHeader();
326 segment.readRecordHeader(header);
327 byte data[] = new byte[header.length];
328 segment.read(data);
329
330 return data;
331 }
332
333 public int getInitialSegmentSize() {
334 return initialSegmentSize;
335 }
336
337 public boolean isSegmentIndexActive(byte i) {
338 synchronized (segments[i]) {
339 return segments[i].isActive();
340 }
341 }
342
343 public long getFirstSequenceIdOfSegementIndex(byte i) {
344 synchronized (segments[i]) {
345 return segments[i].getFirstSequenceId();
346 }
347 }
348
349 synchronized public boolean canActivateNextSegment() {
350 return inactiveSegments.size() > 0;
351 }
352
353 public byte getFirstActiveSegmentIndex() {
354 return ((Segment) activeSegments.getFirst()).getIndex();
355 }
356
357 void activateNextSegment() throws IOException {
358
359 if (appendSegment >= 0) {
360 segments[appendSegment].setReadOnly(true);
361 }
362 Segment next = (Segment) inactiveSegments.removeFirst();
363 activeSegments.addLast(next);
364 next.activate(getNextSegmentId());
365 appendSegment = next.getIndex();
366 }
367
368 /***
369 * @return
370 */
371 public byte getAppendSegmentIndex() {
372 return appendSegment;
373 }
374
375 /***
376 * @return
377 */
378 public int getAppendSegmentOffset() {
379 return segments[appendSegment].getAppendOffset();
380 }
381
382 int getTotalSegements() {
383 return segments.length;
384 }
385
386 /***
387 *
388 */
389 public long getLastSequenceId() {
390 return segments[appendSegment].getLastSequenceId();
391 }
392
393 /***
394 * @return
395 */
396 synchronized public RecordLocationImpl getFirstRecordLocationOfSecondActiveSegment(byte fm) {
397 return ((Segment) activeSegments.get(1)).getFirstRecordLocation(fm);
398 }
399
400 /***
401 * @return Returns the logDirectory.
402 */
403 public File getLogDirectory() {
404 return logDirectory;
405 }
406 /***
407 * @return Returns the lastMark.
408 */
409 public RecordLocationImpl getLastMarkedRecordLocation(byte fm) {
410 if( markSegment==-1 )
411 return null;
412 return new RecordLocationImpl(fm, markSegment, lastMark.offsetId, lastMark.sequenceId);
413 }
414 }