View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.howl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.DefaultWireFormat;
23  import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
24  import org.codehaus.activemq.store.MessageStore;
25  import org.codehaus.activemq.store.PersistenceAdapter;
26  import org.codehaus.activemq.store.PreparedTransactionStore;
27  import org.codehaus.activemq.store.TopicMessageStore;
28  import org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter;
29  import org.codehaus.activemq.util.JMSExceptionHelper;
30  import org.objectweb.howl.log.Configuration;
31  import org.objectweb.howl.log.LogConfigurationException;
32  import org.objectweb.howl.log.Logger;
33  
34  import javax.jms.JMSException;
35  import java.io.File;
36  import java.io.IOException;
37  import java.io.InputStream;
38  import java.util.Map;
39  import java.util.Properties;
40  
41  /***
42   * An implementation of {@link PersistenceAdapter} designed for
43   * optimal use with <a href="http://howl.objectweb.org/">Howl</a>
44   * as the transaction log and then checkpointing asynchronously
45   * on a timeout with some other persistent storage.
46   *
47   * @version $Revision: 1.7 $
48   */
49  public class HowlPersistenceAdapter extends PersistenceAdapterSupport {
50  
51      private static final Log log = LogFactory.getLog(HowlPersistenceAdapter.class);
52  
53      private PersistenceAdapter longTermPersistence;
54      private Configuration configuration;
55      private int maximumTotalCachedMessages = 10000;
56      private int maximumCachedMessagesPerStore = 100;
57      private int cachedMessageCount;
58      private File directory;
59      private Logger transactionLog;
60  
61      /***
62       * Factory method to create an instance using the defaults
63       *
64       * @param directory the directory in which to store the persistent files
65       * @return
66       * @throws JMSException
67       */
68      public static HowlPersistenceAdapter newInstance(File directory) throws JMSException {
69          return new HowlPersistenceAdapter(directory, JdbmPersistenceAdapter.newInstance(directory));
70      }
71  
72      public HowlPersistenceAdapter() {
73      }
74  
75      public HowlPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) {
76          this.directory = directory;
77          this.longTermPersistence = longTermPersistence;
78      }
79  
80      public Map getInitialDestinations() {
81          return longTermPersistence.getInitialDestinations();
82      }
83  
84      public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
85          MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
86          return new HowlMessageStore(this, checkpointStore, transactionLog, new DefaultWireFormat());
87      }
88  
89      public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
90          /*** TODO not yet implemented for topics */
91          return longTermPersistence.createTopicMessageStore(destinationName);
92      }
93  
94      public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
95          // delegate to long term store
96          return longTermPersistence.createPreparedTransactionStore();
97      }
98  
99      public void beginTransaction() throws JMSException {
100     }
101 
102     public void commitTransaction() throws JMSException {
103     }
104 
105     public void rollbackTransaction() {
106     }
107 
108     public void start() throws JMSException {
109         if (transactionLog == null) {
110             if (directory != null) {
111                 directory.mkdirs();
112             }
113             try {
114                 transactionLog = createTransactionLog();
115             }
116             catch (Exception e) {
117                 throw JMSExceptionHelper.newJMSException("Failed to create Howl based message store due to: " + e, e);
118             }
119         }
120 
121         try {
122             log.info("Using Howl transaction log in directory: " + getLogFileDir());
123 
124             transactionLog.open();
125         }
126         catch (Exception e) {
127             throw JMSExceptionHelper.newJMSException("Failed to open Howl transaction log: " + e, e);
128         }
129         longTermPersistence.start();
130     }
131 
132     public void stop() throws JMSException {
133         try {
134             transactionLog.close();
135         }
136         catch (Exception e) {
137             throw JMSExceptionHelper.newJMSException("Failed to close Howl transaction log due to: " + e, e);
138         }
139     }
140 
141     /***
142      * Return true if a store is allowed to cache a message.
143      * Called by a store when its about to store a message in its cache.
144      *
145      * @param messageStore
146      * @return true if the cache is allowed to cache the mesage
147      */
148     public synchronized boolean hasCacheCapacity(HowlMessageStore messageStore) {
149         if (cachedMessageCount < maximumTotalCachedMessages) {
150             cachedMessageCount++;
151             return true;
152         }
153         return false;
154     }
155 
156     public synchronized void onMessageRemove(HowlMessageStore messageStore) {
157         cachedMessageCount--;
158     }
159 
160     // Properties
161     //-------------------------------------------------------------------------
162     public PersistenceAdapter getLongTermPersistence() {
163         return longTermPersistence;
164     }
165 
166     public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
167         this.longTermPersistence = longTermPersistence;
168     }
169 
170     public int getMaximumCachedMessagesPerStore() {
171         return maximumCachedMessagesPerStore;
172     }
173 
174     public void setMaximumCachedMessagesPerStore(int maximumCachedMessagesPerStore) {
175         this.maximumCachedMessagesPerStore = maximumCachedMessagesPerStore;
176     }
177 
178     public int getMaximumTotalCachedMessages() {
179         return maximumTotalCachedMessages;
180     }
181 
182     public void setMaximumTotalCachedMessages(int maximumTotalCachedMessages) {
183         this.maximumTotalCachedMessages = maximumTotalCachedMessages;
184     }
185 
186     public File getDirectory() {
187         return directory;
188     }
189 
190     public void setDirectory(File directory) {
191         this.directory = directory;
192     }
193 
194     public Configuration getConfiguration() throws LogConfigurationException, IOException {
195         if (configuration == null) {
196             configuration = createConfiguration();
197         }
198         return configuration;
199     }
200 
201     public void setConfiguration(Configuration configuration) {
202         this.configuration = configuration;
203     }
204 
205     public Logger getTransactionLog() {
206         return transactionLog;
207     }
208 
209     public void setTransactionLog(Logger transactionLog) {
210         this.transactionLog = transactionLog;
211     }
212 
213 
214     // Delegate Howl configuration properties
215     //-------------------------------------------------------------------------
216     public String getBufferClassName() throws LogConfigurationException, IOException {
217         return getConfiguration().getBufferClassName();
218     }
219 
220     public int getBufferSize() throws LogConfigurationException, IOException {
221         return getConfiguration().getBufferSize();
222     }
223 
224     public int getFlushSleepTime() throws LogConfigurationException, IOException {
225         return getConfiguration().getFlushSleepTime();
226     }
227 
228     public String getLogFileDir() throws LogConfigurationException, IOException {
229         return getConfiguration().getLogFileDir();
230     }
231 
232     public String getLogFileExt() throws LogConfigurationException, IOException {
233         return getConfiguration().getLogFileExt();
234     }
235 
236     public String getLogFileName() throws LogConfigurationException, IOException {
237         return getConfiguration().getLogFileName();
238     }
239 
240     public int getMaxBlocksPerFile() throws LogConfigurationException, IOException {
241         return getConfiguration().getMaxBlocksPerFile();
242     }
243 
244     public int getMaxBuffers() throws LogConfigurationException, IOException {
245         return getConfiguration().getMaxBuffers();
246     }
247 
248     public int getMaxLogFiles() throws LogConfigurationException, IOException {
249         return getConfiguration().getMaxLogFiles();
250     }
251 
252     public int getMinBuffers() throws LogConfigurationException, IOException {
253         return getConfiguration().getMinBuffers();
254     }
255 
256     public int getThreadsWaitingForceThreshold() throws LogConfigurationException, IOException {
257         return getConfiguration().getThreadsWaitingForceThreshold();
258     }
259 
260     public boolean isChecksumEnabled() throws LogConfigurationException, IOException {
261         return getConfiguration().isChecksumEnabled();
262     }
263 
264     public void setBufferClassName(String s) throws LogConfigurationException, IOException {
265         getConfiguration().setBufferClassName(s);
266     }
267 
268     public void setBufferSize(int i) throws LogConfigurationException, IOException {
269         getConfiguration().setBufferSize(i);
270     }
271 
272     public void setChecksumEnabled(boolean b) throws LogConfigurationException, IOException {
273         getConfiguration().setChecksumEnabled(b);
274     }
275 
276     public void setFlushSleepTime(int i) throws LogConfigurationException, IOException {
277         getConfiguration().setFlushSleepTime(i);
278     }
279 
280     public void setLogFileDir(String s) throws LogConfigurationException, IOException {
281         getConfiguration().setLogFileDir(s);
282     }
283 
284     public void setLogFileExt(String s) throws LogConfigurationException, IOException {
285         getConfiguration().setLogFileExt(s);
286     }
287 
288     public void setLogFileName(String s) throws LogConfigurationException, IOException {
289         getConfiguration().setLogFileName(s);
290     }
291 
292     public void setMaxBlocksPerFile(int i) throws LogConfigurationException, IOException {
293         getConfiguration().setMaxBlocksPerFile(i);
294     }
295 
296     public void setMaxBuffers(int i) throws LogConfigurationException, IOException {
297         getConfiguration().setMaxBuffers(i);
298     }
299 
300     public void setMaxLogFiles(int i) throws LogConfigurationException, IOException {
301         getConfiguration().setMaxLogFiles(i);
302     }
303 
304     public void setMinBuffers(int i) throws LogConfigurationException, IOException {
305         getConfiguration().setMinBuffers(i);
306     }
307 
308     public void setThreadsWaitingForceThreshold(int i) throws LogConfigurationException, IOException {
309         getConfiguration().setThreadsWaitingForceThreshold(i);
310     }
311 
312 
313     // Implementation methods
314     //-------------------------------------------------------------------------
315 
316     protected Logger createTransactionLog() throws IOException, LogConfigurationException {
317         return new Logger(getConfiguration());
318     }
319 
320     protected Configuration createConfiguration() throws IOException, LogConfigurationException {
321         String[] names = {"org/codehaus/activemq/howl.properties", "org/codehaus/activemq/defaultHowl.properties"};
322 
323         Configuration answer = null;
324         for (int i = 0; i < names.length; i++) {
325             InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(names[i]);
326             if (in == null) {
327                 in = getClass().getClassLoader().getResourceAsStream(names[i]);
328             }
329             if (in != null) {
330                 Properties properties = new Properties();
331                 properties.load(in);
332                 answer = new Configuration(properties);
333             }
334         }
335         if (answer == null) {
336             log.warn("Could not find file: " + names[0] + " or: " + names[1] + " on the classpath to initialise Howl");
337             answer = new Configuration();
338         }
339         if (directory != null) {
340             answer.setLogFileDir(directory.getAbsolutePath());
341         }
342         return answer;
343     }
344 }