View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    * 
6    * Licensed under the Apache License, Version 2.0 (the "License"); 
7    * you may not use this file except in compliance with the License. 
8    * You may obtain a copy of the License at 
9    * 
10   * http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS, 
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15   * See the License for the specific language governing permissions and 
16   * limitations under the License. 
17   * 
18   **/
19  package org.codehaus.activemq.store.journal;
20  
21  import EDU.oswego.cs.dl.util.concurrent.Channel;
22  import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
23  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
24  import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
25  import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
26  import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.codehaus.activemq.journal.InvalidRecordLocationException;
30  import org.codehaus.activemq.journal.Journal;
31  import org.codehaus.activemq.journal.JournalEventListener;
32  import org.codehaus.activemq.journal.RecordLocation;
33  import org.codehaus.activemq.journal.impl.JournalImpl;
34  import org.codehaus.activemq.message.ActiveMQMessage;
35  import org.codehaus.activemq.message.DefaultWireFormat;
36  import org.codehaus.activemq.message.MessageAck;
37  import org.codehaus.activemq.message.Packet;
38  import org.codehaus.activemq.message.WireFormat;
39  import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
40  import org.codehaus.activemq.store.MessageStore;
41  import org.codehaus.activemq.store.PersistenceAdapter;
42  import org.codehaus.activemq.store.PreparedTransactionStore;
43  import org.codehaus.activemq.store.TopicMessageStore;
44  import org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter;
45  import org.codehaus.activemq.util.JMSExceptionHelper;
46  import org.codehaus.activemq.util.TransactionTemplate;
47  
48  import javax.jms.JMSException;
49  import java.io.ByteArrayInputStream;
50  import java.io.ByteArrayOutputStream;
51  import java.io.DataInputStream;
52  import java.io.DataOutputStream;
53  import java.io.File;
54  import java.io.IOException;
55  import java.util.Iterator;
56  import java.util.Map;
57  
58  /***
59   * An implementation of {@link PersistenceAdapter} designed for
60   * use with a {@link Journal} and then checkpointing asynchronously
61   * on a timeout with some other long term persistent storage.
62   *
63   * @version $Revision: 1.9 $
64   */
65  public class JournalPersistenceAdapter extends PersistenceAdapterSupport implements JournalEventListener {
66  
67      private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
68      private Journal journal;
69      private PersistenceAdapter longTermPersistence;
70      private File directory = new File("logs");
71      private WireFormat wireFormat = new DefaultWireFormat();
72      private TransactionTemplate transactionTemplate;
73      private boolean sync = true;
74      private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
75      private boolean performingRecovery;
76      private static final int PACKET_RECORD_TYPE = 0;
77      private static final int COMMAND_RECORD_TYPE = 1;
78  
79      private Channel checkpointRequests = new LinkedQueue();
80      private QueuedExecutor checkpointExecutor;
81      ClockDaemon clockDaemon;
82      private Object clockTicket;
83  
84  
85      /***
86       * Factory method to create an instance using the defaults
87       *
88       * @param directory the directory in which to store the persistent files
89       * @return
90       * @throws JMSException
91       * @throws IOException
92       */
93      public static JournalPersistenceAdapter newInstance(File directory) throws IOException, JMSException {
94          return new JournalPersistenceAdapter(directory, JdbmPersistenceAdapter.newInstance(directory), new DefaultWireFormat());
95      }
96  
97      public JournalPersistenceAdapter() {
98          checkpointExecutor = new QueuedExecutor(new LinkedQueue());
99          checkpointExecutor.setThreadFactory(new ThreadFactory() {
100             public Thread newThread(Runnable runnable) {
101                 Thread answer = new Thread(runnable, "Checkpoint Worker");
102                 answer.setDaemon(true);
103                 answer.setPriority(Thread.MAX_PRIORITY);
104                 return answer;
105             }
106         });
107     }
108 
109     public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence, DefaultWireFormat wireFormat) throws IOException {
110         this();
111         this.directory = directory;
112         this.longTermPersistence = longTermPersistence;
113         this.wireFormat = wireFormat;
114     }
115 
116     public Map getInitialDestinations() {
117         return longTermPersistence.getInitialDestinations();
118     }
119 
120     public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
121         MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
122         JournalMessageStore store = new JournalMessageStore(this, checkpointStore, destinationName, sync);
123         messageStores.put(destinationName, store);
124         return store;
125     }
126 
127     public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
128         return longTermPersistence.createTopicMessageStore(destinationName);
129     }
130 
131     public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
132         return longTermPersistence.createPreparedTransactionStore();
133     }
134 
135     public void beginTransaction() throws JMSException {
136         longTermPersistence.beginTransaction();
137     }
138 
139     public void commitTransaction() throws JMSException {
140         longTermPersistence.commitTransaction();
141     }
142 
143     public void rollbackTransaction() {
144         longTermPersistence.rollbackTransaction();
145     }
146 
147     public synchronized void start() throws JMSException {
148         longTermPersistence.start();
149         if (journal == null) {
150             try {
151                 log.info("Opening journal.");
152                 journal = createJournal();
153                 log.info("Opened journal: " + journal);
154                 journal.setJournalEventListener(this);
155             }
156             catch (Exception e) {
157                 throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
158             }
159             try {
160                 recover();
161             }
162             catch (Exception e) {
163                 throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
164             }
165         }
166 
167         // Do a checkpoint periodically.
168         clockTicket = getClockDaemon().executePeriodically(1000 * 60, new Runnable() {
169             public void run() {
170                 checkpoint();
171             }
172         }, false);
173 
174     }
175 
176     public synchronized void stop() throws JMSException {
177 
178         if (clockTicket != null) {
179             // Stop the periodical checkpoint.
180             ClockDaemon.cancel(clockTicket);
181         }
182 
183         // Take one final checkpoint and stop checkpoint processing.
184         checkpoint();
185         checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
186 
187         JMSException firstException = null;
188         if (journal != null) {
189             try {
190                 journal.close();
191                 journal = null;
192             }
193             catch (Exception e) {
194                 firstException = JMSExceptionHelper.newJMSException("Failed to close Howl transaction log due to: " + e, e);
195             }
196         }
197         longTermPersistence.stop();
198 
199         if (firstException != null) {
200             throw firstException;
201         }
202     }
203 
204     // Properties
205     //-------------------------------------------------------------------------
206     public PersistenceAdapter getLongTermPersistence() {
207         return longTermPersistence;
208     }
209 
210     public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
211         this.longTermPersistence = longTermPersistence;
212     }
213 
214     /***
215      * @return Returns the directory.
216      */
217     public File getDirectory() {
218         return directory;
219     }
220 
221     /***
222      * @param directory The directory to set.
223      */
224     public void setDirectory(File directory) {
225         this.directory = directory;
226     }
227 
228     /***
229      * @return Returns the sync.
230      */
231     public boolean isSync() {
232         return sync;
233     }
234 
235     /***
236      * @param sync The sync to set.
237      */
238     public void setSync(boolean sync) {
239         this.sync = sync;
240     }
241 
242     /***
243      * @return Returns the wireFormat.
244      */
245     public WireFormat getWireFormat() {
246         return wireFormat;
247     }
248 
249     /***
250      * @param wireFormat The wireFormat to set.
251      */
252     public void setWireFormat(WireFormat wireFormat) {
253         this.wireFormat = wireFormat;
254     }
255 
256 
257     // Implementation methods
258 //-------------------------------------------------------------------------
259     private Journal createJournal() throws IOException {
260         return new JournalImpl(directory);
261     }
262 
263     /***
264      * The Journal give us a call back so that we can move old data out of the journal.
265      * Taking a checkpoint does this for us.
266      *
267      * @see org.codehaus.activemq.journal.JournalEventListener#overflowNotification(org.codehaus.activemq.journal.RecordLocation)
268      */
269     public void overflowNotification(RecordLocation safeLocation) {
270         checkpoint();
271     }
272 
273     /***
274      * When we checkpoint we move all the journaled data to long term storage.
275      */
276     private void checkpoint() {
277         try {
278             // Do the checkpoint asynchronously.
279             checkpointRequests.put(Boolean.TRUE);
280             checkpointExecutor.execute(new Runnable() {
281                 public void run() {
282 
283                     // Avoid running a checkpoint too many times in a row.
284                     // Consume any queued up checkpoint requests.
285                     try {
286                         boolean requested = false;
287                         while (checkpointRequests.poll(0) != null) {
288                             requested = true;
289                         }
290                         if (!requested) {
291                             return;
292                         }
293                     }
294                     catch (InterruptedException e1) {
295                         return;
296                     }
297 
298                     log.info("Checkpoint started.");
299                     Iterator iterator = messageStores.values().iterator();
300                     RecordLocation newMark = null;
301                     while (iterator.hasNext()) {
302                         try {
303                             JournalMessageStore ms = (JournalMessageStore) iterator.next();
304                             RecordLocation mark = ms.checkpoint();
305                             if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
306                                 newMark = mark;
307                             }
308                         }
309                         catch (Exception e) {
310                             log.error("Failed to checkpoint a message store: " + e, e);
311                         }
312                     }
313                     try {
314                         if (newMark != null) {
315                             journal.setMark(newMark, true);
316                         }
317                     }
318                     catch (Exception e) {
319                         log.error("Failed to mark the Journal: " + e, e);
320                     }
321                     log.info("Checkpoint done.");
322                 }
323             });
324         }
325         catch (InterruptedException e) {
326             log.warn("Request to start checkpoint failed: " + e, e);
327         }
328     }
329 
330     /***
331      * @param destinationName
332      * @param message
333      * @param sync
334      * @throws JMSException
335      */
336     public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
337         try {
338 
339             ByteArrayOutputStream baos = new ByteArrayOutputStream();
340             DataOutputStream os = new DataOutputStream(baos);
341             os.writeByte(PACKET_RECORD_TYPE);
342             os.writeUTF(destination);
343             wireFormat.writePacket(packet, os);
344             os.close();
345             byte[] data = baos.toByteArray();
346             return journal.write(data, sync);
347 
348         }
349         catch (IOException e) {
350             throw createWriteException(packet, e);
351         }
352     }
353 
354     /***
355      * @param destinationName
356      * @param message
357      * @param sync
358      * @throws JMSException
359      */
360     public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
361         try {
362 
363             ByteArrayOutputStream baos = new ByteArrayOutputStream();
364             DataOutputStream os = new DataOutputStream(baos);
365             os.writeByte(COMMAND_RECORD_TYPE);
366             os.writeUTF(command);
367             os.close();
368             byte[] data = baos.toByteArray();
369             return journal.write(data, sync);
370 
371         }
372         catch (IOException e) {
373             throw createWriteException(command, e);
374         }
375     }
376 
377     /***
378      * @param location
379      * @return
380      * @throws JMSException
381      */
382     public Packet readPacket(RecordLocation location) throws JMSException {
383         try {
384             byte[] data = journal.read(location);
385 
386             DataInputStream is = new DataInputStream(new ByteArrayInputStream(data));
387             byte type = is.readByte();
388             if (type != PACKET_RECORD_TYPE) {
389                 throw new IOException("Record is not a packet type.");
390             }
391             String destination = is.readUTF();
392             Packet packet = wireFormat.readPacket(is);
393             is.close();
394             return packet;
395 
396         }
397         catch (InvalidRecordLocationException e) {
398             throw createReadException(location, e);
399         }
400         catch (IOException e) {
401             throw createReadException(location, e);
402         }
403     }
404 
405 
406     /***
407      * Move all the messages that were in the journal into long term storeage.  We just replay and do a checkpoint.
408      *
409      * @throws JMSException
410      * @throws IOException
411      * @throws InvalidRecordLocationException
412      * @throws IllegalStateException
413      */
414     private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
415 
416         RecordLocation pos = null;
417         int transactionCounter = 0;
418 
419         log.info("Journal Recovery Started.");
420 
421         // While we have records in the journal.
422         while ((pos = journal.getNextRecordLocation(pos)) != null) {
423             byte[] data = journal.read(pos);
424 
425             // Read the destination and packate from the record.
426             String destination = null;
427             Packet packet = null;
428             DataInputStream is = new DataInputStream(new ByteArrayInputStream(data));
429             try {
430                 byte type = is.readByte();
431                 switch (type) {
432                     case PACKET_RECORD_TYPE:
433 
434                         // Is the current packet part of the destination?
435                         destination = is.readUTF();
436                         packet = wireFormat.readPacket(is);
437 
438                         // Try to replay the packet.
439                         JournalMessageStore store = (JournalMessageStore) createQueueMessageStore(destination);
440                         if (packet instanceof ActiveMQMessage) {
441                             ActiveMQMessage msg = (ActiveMQMessage) packet;
442                             try {
443                                 store.getLongTermStore().addMessage(msg);
444                                 transactionCounter++;
445                             }
446                             catch (Throwable e) {
447                                 log.error("Recovery Failure: Could not add message: " + msg.getJMSMessageIdentity().getMessageID() + ", reason: " + e, e);
448                             }
449                         }
450                         else if (packet instanceof MessageAck) {
451                             MessageAck ack = (MessageAck) packet;
452                             try {
453                                 store.getLongTermStore().removeMessage(ack.getMessageIdentity(), ack);
454                                 transactionCounter++;
455                             }
456                             catch (Throwable e) {
457                                 log.error("Recovery Failure: Could not remove message: " + ack.getMessageIdentity().getMessageID() + ", reason: " + e, e);
458                             }
459                         }
460                         else {
461                             log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
462                         }
463 
464                         break;
465                     case COMMAND_RECORD_TYPE:
466 
467                         break;
468                     default:
469                         log.error("Unknown type of record in transaction log which will be discarded: " + type);
470                         break;
471                 }
472             }
473             finally {
474                 is.close();
475             }
476         }
477 
478         RecordLocation location = writeCommand("RECOVERED", true);
479         journal.setMark(location, true);
480 
481         log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
482     }
483 
484     private JMSException createReadException(RecordLocation location, Exception e) {
485         return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
486     }
487 
488     protected JMSException createWriteException(Packet packet, Exception e) {
489         return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
490     }
491 
492     protected JMSException createWriteException(String command, Exception e) {
493         return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
494     }
495 
496     protected JMSException createRecoveryFailedException(Exception e) {
497         return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
498     }
499 
500     public ClockDaemon getClockDaemon() {
501         if (clockDaemon == null) {
502             clockDaemon = new ClockDaemon();
503             clockDaemon.setThreadFactory(new ThreadFactory() {
504                 public Thread newThread(Runnable runnable) {
505                     Thread thread = new Thread(runnable, "Checkpoint Timmer");
506                     thread.setDaemon(true);
507                     return thread;
508                 }
509             });
510         }
511         return clockDaemon;
512     }
513 
514     public void setClockDaemon(ClockDaemon clockDaemon) {
515         this.clockDaemon = clockDaemon;
516     }
517 }