1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.store.journal;
20
21 import EDU.oswego.cs.dl.util.concurrent.Channel;
22 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
23 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
24 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
25 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
26 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.codehaus.activemq.journal.InvalidRecordLocationException;
30 import org.codehaus.activemq.journal.Journal;
31 import org.codehaus.activemq.journal.JournalEventListener;
32 import org.codehaus.activemq.journal.RecordLocation;
33 import org.codehaus.activemq.journal.impl.JournalImpl;
34 import org.codehaus.activemq.message.ActiveMQMessage;
35 import org.codehaus.activemq.message.DefaultWireFormat;
36 import org.codehaus.activemq.message.MessageAck;
37 import org.codehaus.activemq.message.Packet;
38 import org.codehaus.activemq.message.WireFormat;
39 import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
40 import org.codehaus.activemq.store.MessageStore;
41 import org.codehaus.activemq.store.PersistenceAdapter;
42 import org.codehaus.activemq.store.PreparedTransactionStore;
43 import org.codehaus.activemq.store.TopicMessageStore;
44 import org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter;
45 import org.codehaus.activemq.util.JMSExceptionHelper;
46 import org.codehaus.activemq.util.TransactionTemplate;
47
48 import javax.jms.JMSException;
49 import java.io.ByteArrayInputStream;
50 import java.io.ByteArrayOutputStream;
51 import java.io.DataInputStream;
52 import java.io.DataOutputStream;
53 import java.io.File;
54 import java.io.IOException;
55 import java.util.Iterator;
56 import java.util.Map;
57
58 /***
59 * An implementation of {@link PersistenceAdapter} designed for
60 * use with a {@link Journal} and then checkpointing asynchronously
61 * on a timeout with some other long term persistent storage.
62 *
63 * @version $Revision: 1.9 $
64 */
65 public class JournalPersistenceAdapter extends PersistenceAdapterSupport implements JournalEventListener {
66
67 private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
68 private Journal journal;
69 private PersistenceAdapter longTermPersistence;
70 private File directory = new File("logs");
71 private WireFormat wireFormat = new DefaultWireFormat();
72 private TransactionTemplate transactionTemplate;
73 private boolean sync = true;
74 private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
75 private boolean performingRecovery;
76 private static final int PACKET_RECORD_TYPE = 0;
77 private static final int COMMAND_RECORD_TYPE = 1;
78
79 private Channel checkpointRequests = new LinkedQueue();
80 private QueuedExecutor checkpointExecutor;
81 ClockDaemon clockDaemon;
82 private Object clockTicket;
83
84
85 /***
86 * Factory method to create an instance using the defaults
87 *
88 * @param directory the directory in which to store the persistent files
89 * @return
90 * @throws JMSException
91 * @throws IOException
92 */
93 public static JournalPersistenceAdapter newInstance(File directory) throws IOException, JMSException {
94 return new JournalPersistenceAdapter(directory, JdbmPersistenceAdapter.newInstance(directory), new DefaultWireFormat());
95 }
96
97 public JournalPersistenceAdapter() {
98 checkpointExecutor = new QueuedExecutor(new LinkedQueue());
99 checkpointExecutor.setThreadFactory(new ThreadFactory() {
100 public Thread newThread(Runnable runnable) {
101 Thread answer = new Thread(runnable, "Checkpoint Worker");
102 answer.setDaemon(true);
103 answer.setPriority(Thread.MAX_PRIORITY);
104 return answer;
105 }
106 });
107 }
108
109 public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence, DefaultWireFormat wireFormat) throws IOException {
110 this();
111 this.directory = directory;
112 this.longTermPersistence = longTermPersistence;
113 this.wireFormat = wireFormat;
114 }
115
116 public Map getInitialDestinations() {
117 return longTermPersistence.getInitialDestinations();
118 }
119
120 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
121 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
122 JournalMessageStore store = new JournalMessageStore(this, checkpointStore, destinationName, sync);
123 messageStores.put(destinationName, store);
124 return store;
125 }
126
127 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
128 return longTermPersistence.createTopicMessageStore(destinationName);
129 }
130
131 public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
132 return longTermPersistence.createPreparedTransactionStore();
133 }
134
135 public void beginTransaction() throws JMSException {
136 longTermPersistence.beginTransaction();
137 }
138
139 public void commitTransaction() throws JMSException {
140 longTermPersistence.commitTransaction();
141 }
142
143 public void rollbackTransaction() {
144 longTermPersistence.rollbackTransaction();
145 }
146
147 public synchronized void start() throws JMSException {
148 longTermPersistence.start();
149 if (journal == null) {
150 try {
151 log.info("Opening journal.");
152 journal = createJournal();
153 log.info("Opened journal: " + journal);
154 journal.setJournalEventListener(this);
155 }
156 catch (Exception e) {
157 throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
158 }
159 try {
160 recover();
161 }
162 catch (Exception e) {
163 throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
164 }
165 }
166
167
168 clockTicket = getClockDaemon().executePeriodically(1000 * 60, new Runnable() {
169 public void run() {
170 checkpoint();
171 }
172 }, false);
173
174 }
175
176 public synchronized void stop() throws JMSException {
177
178 if (clockTicket != null) {
179
180 ClockDaemon.cancel(clockTicket);
181 }
182
183
184 checkpoint();
185 checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
186
187 JMSException firstException = null;
188 if (journal != null) {
189 try {
190 journal.close();
191 journal = null;
192 }
193 catch (Exception e) {
194 firstException = JMSExceptionHelper.newJMSException("Failed to close Howl transaction log due to: " + e, e);
195 }
196 }
197 longTermPersistence.stop();
198
199 if (firstException != null) {
200 throw firstException;
201 }
202 }
203
204
205
206 public PersistenceAdapter getLongTermPersistence() {
207 return longTermPersistence;
208 }
209
210 public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
211 this.longTermPersistence = longTermPersistence;
212 }
213
214 /***
215 * @return Returns the directory.
216 */
217 public File getDirectory() {
218 return directory;
219 }
220
221 /***
222 * @param directory The directory to set.
223 */
224 public void setDirectory(File directory) {
225 this.directory = directory;
226 }
227
228 /***
229 * @return Returns the sync.
230 */
231 public boolean isSync() {
232 return sync;
233 }
234
235 /***
236 * @param sync The sync to set.
237 */
238 public void setSync(boolean sync) {
239 this.sync = sync;
240 }
241
242 /***
243 * @return Returns the wireFormat.
244 */
245 public WireFormat getWireFormat() {
246 return wireFormat;
247 }
248
249 /***
250 * @param wireFormat The wireFormat to set.
251 */
252 public void setWireFormat(WireFormat wireFormat) {
253 this.wireFormat = wireFormat;
254 }
255
256
257
258
259 private Journal createJournal() throws IOException {
260 return new JournalImpl(directory);
261 }
262
263 /***
264 * The Journal give us a call back so that we can move old data out of the journal.
265 * Taking a checkpoint does this for us.
266 *
267 * @see org.codehaus.activemq.journal.JournalEventListener#overflowNotification(org.codehaus.activemq.journal.RecordLocation)
268 */
269 public void overflowNotification(RecordLocation safeLocation) {
270 checkpoint();
271 }
272
273 /***
274 * When we checkpoint we move all the journaled data to long term storage.
275 */
276 private void checkpoint() {
277 try {
278
279 checkpointRequests.put(Boolean.TRUE);
280 checkpointExecutor.execute(new Runnable() {
281 public void run() {
282
283
284
285 try {
286 boolean requested = false;
287 while (checkpointRequests.poll(0) != null) {
288 requested = true;
289 }
290 if (!requested) {
291 return;
292 }
293 }
294 catch (InterruptedException e1) {
295 return;
296 }
297
298 log.info("Checkpoint started.");
299 Iterator iterator = messageStores.values().iterator();
300 RecordLocation newMark = null;
301 while (iterator.hasNext()) {
302 try {
303 JournalMessageStore ms = (JournalMessageStore) iterator.next();
304 RecordLocation mark = ms.checkpoint();
305 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
306 newMark = mark;
307 }
308 }
309 catch (Exception e) {
310 log.error("Failed to checkpoint a message store: " + e, e);
311 }
312 }
313 try {
314 if (newMark != null) {
315 journal.setMark(newMark, true);
316 }
317 }
318 catch (Exception e) {
319 log.error("Failed to mark the Journal: " + e, e);
320 }
321 log.info("Checkpoint done.");
322 }
323 });
324 }
325 catch (InterruptedException e) {
326 log.warn("Request to start checkpoint failed: " + e, e);
327 }
328 }
329
330 /***
331 * @param destinationName
332 * @param message
333 * @param sync
334 * @throws JMSException
335 */
336 public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
337 try {
338
339 ByteArrayOutputStream baos = new ByteArrayOutputStream();
340 DataOutputStream os = new DataOutputStream(baos);
341 os.writeByte(PACKET_RECORD_TYPE);
342 os.writeUTF(destination);
343 wireFormat.writePacket(packet, os);
344 os.close();
345 byte[] data = baos.toByteArray();
346 return journal.write(data, sync);
347
348 }
349 catch (IOException e) {
350 throw createWriteException(packet, e);
351 }
352 }
353
354 /***
355 * @param destinationName
356 * @param message
357 * @param sync
358 * @throws JMSException
359 */
360 public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
361 try {
362
363 ByteArrayOutputStream baos = new ByteArrayOutputStream();
364 DataOutputStream os = new DataOutputStream(baos);
365 os.writeByte(COMMAND_RECORD_TYPE);
366 os.writeUTF(command);
367 os.close();
368 byte[] data = baos.toByteArray();
369 return journal.write(data, sync);
370
371 }
372 catch (IOException e) {
373 throw createWriteException(command, e);
374 }
375 }
376
377 /***
378 * @param location
379 * @return
380 * @throws JMSException
381 */
382 public Packet readPacket(RecordLocation location) throws JMSException {
383 try {
384 byte[] data = journal.read(location);
385
386 DataInputStream is = new DataInputStream(new ByteArrayInputStream(data));
387 byte type = is.readByte();
388 if (type != PACKET_RECORD_TYPE) {
389 throw new IOException("Record is not a packet type.");
390 }
391 String destination = is.readUTF();
392 Packet packet = wireFormat.readPacket(is);
393 is.close();
394 return packet;
395
396 }
397 catch (InvalidRecordLocationException e) {
398 throw createReadException(location, e);
399 }
400 catch (IOException e) {
401 throw createReadException(location, e);
402 }
403 }
404
405
406 /***
407 * Move all the messages that were in the journal into long term storeage. We just replay and do a checkpoint.
408 *
409 * @throws JMSException
410 * @throws IOException
411 * @throws InvalidRecordLocationException
412 * @throws IllegalStateException
413 */
414 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
415
416 RecordLocation pos = null;
417 int transactionCounter = 0;
418
419 log.info("Journal Recovery Started.");
420
421
422 while ((pos = journal.getNextRecordLocation(pos)) != null) {
423 byte[] data = journal.read(pos);
424
425
426 String destination = null;
427 Packet packet = null;
428 DataInputStream is = new DataInputStream(new ByteArrayInputStream(data));
429 try {
430 byte type = is.readByte();
431 switch (type) {
432 case PACKET_RECORD_TYPE:
433
434
435 destination = is.readUTF();
436 packet = wireFormat.readPacket(is);
437
438
439 JournalMessageStore store = (JournalMessageStore) createQueueMessageStore(destination);
440 if (packet instanceof ActiveMQMessage) {
441 ActiveMQMessage msg = (ActiveMQMessage) packet;
442 try {
443 store.getLongTermStore().addMessage(msg);
444 transactionCounter++;
445 }
446 catch (Throwable e) {
447 log.error("Recovery Failure: Could not add message: " + msg.getJMSMessageIdentity().getMessageID() + ", reason: " + e, e);
448 }
449 }
450 else if (packet instanceof MessageAck) {
451 MessageAck ack = (MessageAck) packet;
452 try {
453 store.getLongTermStore().removeMessage(ack.getMessageIdentity(), ack);
454 transactionCounter++;
455 }
456 catch (Throwable e) {
457 log.error("Recovery Failure: Could not remove message: " + ack.getMessageIdentity().getMessageID() + ", reason: " + e, e);
458 }
459 }
460 else {
461 log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
462 }
463
464 break;
465 case COMMAND_RECORD_TYPE:
466
467 break;
468 default:
469 log.error("Unknown type of record in transaction log which will be discarded: " + type);
470 break;
471 }
472 }
473 finally {
474 is.close();
475 }
476 }
477
478 RecordLocation location = writeCommand("RECOVERED", true);
479 journal.setMark(location, true);
480
481 log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
482 }
483
484 private JMSException createReadException(RecordLocation location, Exception e) {
485 return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
486 }
487
488 protected JMSException createWriteException(Packet packet, Exception e) {
489 return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
490 }
491
492 protected JMSException createWriteException(String command, Exception e) {
493 return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
494 }
495
496 protected JMSException createRecoveryFailedException(Exception e) {
497 return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
498 }
499
500 public ClockDaemon getClockDaemon() {
501 if (clockDaemon == null) {
502 clockDaemon = new ClockDaemon();
503 clockDaemon.setThreadFactory(new ThreadFactory() {
504 public Thread newThread(Runnable runnable) {
505 Thread thread = new Thread(runnable, "Checkpoint Timmer");
506 thread.setDaemon(true);
507 return thread;
508 }
509 });
510 }
511 return clockDaemon;
512 }
513
514 public void setClockDaemon(ClockDaemon clockDaemon) {
515 this.clockDaemon = clockDaemon;
516 }
517 }