View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.journal.impl;
19  
20  import org.codehaus.activemq.journal.InvalidRecordLocationException;
21  import org.codehaus.activemq.journal.Journal;
22  import org.codehaus.activemq.journal.JournalEventListener;
23  import org.codehaus.activemq.journal.RecordLocation;
24  import org.codehaus.activemq.util.LongSequenceGenerator;
25  
26  import java.io.File;
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  
30  /***
31   * A high speed Journal implementation.  Inspired by the ideas of the
32   * <a href="http://howl.objectweb.org/">Howl</a> project but tailored to the needs
33   * of ActiveMQ.
34   * <p/>
35   * This Journal provides the following features:
36   * <ul>
37   * <li> Conncurrent writes are batched into a single write/force done by a background thread.</li>
38   * <li> Uses preallocated logs to avoid disk fragmentation and performance degregation. </li>
39   * <li> The number and size of the preallocated logs are configurable. </li>
40   * <li> Uses direct ByteBuffers to write data to log files. </li>
41   * <li> Allows logs to grow in case of an overflow condition so that overflow exceptions are not
42   * not thrown.  Grown logs that are inactivated (due to a new mark) are resized to
43   * thier original size.</li>
44   * <li> No limit on the size of the record written to the journal</li>
45   * <li> Should be possible to extend so that multiple physical disk are used concurrently to
46   * increase throughput and decrease latency.</li>
47   * </ul>
48   * <p/>
49   * This class is a thing wrapper around a LogFileManager.  It may be possible to extends this class so
50   * that it manages multiple LogFileManager objects to allow logging to multiple physical disks.
51   *
52   * @version $Revision: 1.3 $
53   */
54  public class JournalImpl implements Journal {
55  
56      LogFileManager manager;
57      LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
58  
59      public JournalImpl() throws IOException {
60          this(new File("logs"));
61      }
62  
63      public JournalImpl(File logDirectory) throws IOException {
64          manager = new LogFileManager((byte) 0, sequenceGenerator, new LogFile(logDirectory));
65          initSequenceId();
66      }
67  
68      public JournalImpl(File logDirectory, int segments, int segmentSize) throws IOException {
69          manager = new LogFileManager((byte) 0, sequenceGenerator, new LogFile(logDirectory, segments, segmentSize));
70          initSequenceId();
71      }
72  
73  
74      private void initSequenceId() {
75          long id = manager.getLastSequenceId();
76          if (id == -1) {
77              id = 0;
78          }
79          sequenceGenerator.setLastSequenceId(id);
80      }
81  
82      /***
83       * @throws IOException
84       * @see org.codehaus.activemq.journal.Journal#write(byte[], boolean)
85       */
86      public RecordLocation write(byte[] data, boolean sync) throws IOException {
87          return manager.write(data, sync);
88      }
89  
90      /***
91       * @see org.codehaus.activemq.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation,
92              *      boolean)
93       */
94      public void setMark(RecordLocation recordLocator, boolean force)
95              throws InvalidRecordLocationException, IOException {
96          RecordLocationImpl rl = (RecordLocationImpl) recordLocator;
97          try {
98              manager.setMark(rl, force);
99          }
100         catch (InterruptedException e) {
101             throw new InterruptedIOException();
102         }
103     }
104 
105     /***
106      * @see org.codehaus.activemq.journal.Journal#getMark()
107      */
108     public RecordLocation getMark() {
109         return manager.getMark();
110     }
111 
112     /***
113      * @see org.codehaus.activemq.journal.Journal#close()
114      */
115     public void close() throws IOException {
116         manager.close();
117     }
118 
119     /***
120      * @see org.codehaus.activemq.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
121      */
122     public void setJournalEventListener(JournalEventListener eventListener) {
123         manager.setJournalEventListener(eventListener);
124     }
125 
126     /***
127      * @throws InvalidRecordLocationException
128      * @throws IOException
129      * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
130      */
131     public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
132             throws IOException, InvalidRecordLocationException {
133 
134         RecordLocationImpl rl = (RecordLocationImpl) lastLocation;
135         return manager.getNextRecordLocation(rl);
136     }
137 
138     /***
139      * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
140      */
141     public byte[] read(RecordLocation location)
142             throws InvalidRecordLocationException, IOException {
143 
144         RecordLocationImpl rl = (RecordLocationImpl) location;
145         return manager.read(rl);
146     }
147 
148     /***
149      * @see java.lang.Object#toString()
150      */
151     public String toString() {
152         return "JournalImpl at '" + manager.getLogDirectory() + "' using " + manager.getTotalSegements() + " x " + ((float) manager.getInitialSegmentSize() / ((float) 1024 * 1024)) + " Meg log files.";
153     }
154 }