1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.bdb;
19
20 import com.sleepycat.je.Database;
21 import com.sleepycat.je.DatabaseConfig;
22 import com.sleepycat.je.DatabaseException;
23 import com.sleepycat.je.Environment;
24 import com.sleepycat.je.SecondaryConfig;
25 import com.sleepycat.je.SecondaryDatabase;
26 import com.sleepycat.je.SecondaryKeyCreator;
27 import com.sleepycat.je.Transaction;
28 import com.sleepycat.je.TransactionConfig;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.codehaus.activemq.message.DefaultWireFormat;
32 import org.codehaus.activemq.message.WireFormat;
33 import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
34 import org.codehaus.activemq.store.MessageStore;
35 import org.codehaus.activemq.store.PersistenceAdapter;
36 import org.codehaus.activemq.store.PreparedTransactionStore;
37 import org.codehaus.activemq.store.TopicMessageStore;
38 import org.codehaus.activemq.util.JMSExceptionHelper;
39
40 import javax.jms.JMSException;
41 import java.io.File;
42 import java.util.Map;
43
44 /***
45 * A {@link PersistenceAdapter} implementation using
46 * <a href="http://www.sleepycat.com">Berkeley DB Java Edition</a>
47 *
48 * @version $Revision: 1.6 $
49 */
50 public class BDbPersistenceAdapter extends PersistenceAdapterSupport {
51 private static final Log log = LogFactory.getLog(BDbPersistenceAdapter.class);
52
53 private Environment environment;
54 private WireFormat wireFormat;
55 private DatabaseConfig config;
56 private TransactionConfig transactionConfig;
57 private File directory = new File("ActiveMQ");
58
59
60 /***
61 * Factory method to create an instance using the defaults
62 *
63 * @param directory the directory in which to store the persistent files
64 * @return
65 * @throws JMSException
66 */
67 public static BDbPersistenceAdapter newInstance(File directory) throws JMSException {
68 return new BDbPersistenceAdapter(directory);
69 }
70
71
72 public BDbPersistenceAdapter() {
73 this(null, new DefaultWireFormat());
74 }
75
76 public BDbPersistenceAdapter(File directory) {
77 this();
78 this.directory = directory;
79 }
80
81 public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat) {
82 this(environment, wireFormat, BDbHelper.createDatabaseConfig(), new TransactionConfig());
83 }
84
85 public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat, DatabaseConfig config, TransactionConfig transactionConfig) {
86 this.environment = environment;
87 this.wireFormat = wireFormat;
88 this.config = config;
89 this.transactionConfig = transactionConfig;
90 }
91
92 public Map getInitialDestinations() {
93 return null; /*** TODO */
94 }
95
96 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
97 try {
98 Database database = createDatabase("Queue_" + destinationName);
99 SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
100 SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
101 SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Queue_Index_" + destinationName, database, secondaryConfig);
102 sequenceNumberCreator.initialise(secondaryDatabase);
103 return new BDbMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy());
104 }
105 catch (DatabaseException e) {
106 throw JMSExceptionHelper.newJMSException("Could not create Queue MessageContainer for destination: "
107 + destinationName + ". Reason: " + e, e);
108 }
109 }
110
111 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
112 try {
113 Database database = createDatabase("Topic_" + destinationName);
114 SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
115 SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
116 SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Topic_Index_" + destinationName, database, secondaryConfig);
117 sequenceNumberCreator.initialise(secondaryDatabase);
118 Database subscriptionDatabase = createDatabase("ConsumeAck_" + destinationName);
119 return new BDbTopicMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy(), subscriptionDatabase);
120 }
121 catch (DatabaseException e) {
122 throw JMSExceptionHelper.newJMSException("Could not create Topic MessageContainer for destination: "
123 + destinationName + ". Reason: " + e, e);
124 }
125 }
126
127 public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
128 try {
129 return new BDbPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
130 }
131 catch (DatabaseException e) {
132 throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
133 }
134 }
135
136 public void beginTransaction() throws JMSException {
137 try {
138
139 if (BDbHelper.getTransactionCount() == 0) {
140 Transaction transaction = environment.beginTransaction(BDbHelper.getTransaction(), transactionConfig);
141 BDbHelper.pushTransaction(transaction);
142 }
143 else {
144 Transaction transaction = BDbHelper.getTransaction();
145 BDbHelper.pushTransaction(transaction);
146 }
147 }
148 catch (DatabaseException e) {
149 throw JMSExceptionHelper.newJMSException("Failed to begin transaction: " + e, e);
150 }
151 }
152
153 public void commitTransaction() throws JMSException {
154
155 if (BDbHelper.getTransactionCount() == 1) {
156 Transaction transaction = BDbHelper.getTransaction();
157 if (transaction == null) {
158 log.warn("Attempt to commit transaction when non in progress");
159 }
160 else {
161 try {
162 transaction.commit();
163 }
164 catch (DatabaseException e) {
165 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + transaction + ": " + e, e);
166 }
167 finally {
168 BDbHelper.popTransaction();
169 }
170 }
171 }
172 else {
173 BDbHelper.popTransaction();
174 }
175 }
176
177 public void rollbackTransaction() {
178 Transaction transaction = BDbHelper.getTransaction();
179 if (transaction != null) {
180 if (BDbHelper.getTransactionCount() == 1) {
181 try {
182 transaction.abort();
183 }
184 catch (DatabaseException e) {
185 log.warn("Cannot rollback transaction due to: " + e, e);
186 }
187 finally {
188 BDbHelper.popTransaction();
189 }
190 }
191 else {
192 BDbHelper.popTransaction();
193 }
194 }
195 }
196
197
198 public void start() throws JMSException {
199 if (environment == null) {
200 directory.mkdirs();
201
202 log.info("Creating Berkeley DB based message store in directory: " + directory.getAbsolutePath());
203
204 try {
205 environment = BDbHelper.createEnvironment(directory);
206 }
207 catch (DatabaseException e) {
208 throw JMSExceptionHelper.newJMSException("Failed to open Berkeley DB persistent store at directory: "
209 + directory + ". Reason: " + e, e);
210 }
211 }
212 }
213
214 public synchronized void stop() throws JMSException {
215 if (environment != null) {
216 try {
217 environment.close();
218 }
219 catch (DatabaseException e) {
220 throw JMSExceptionHelper.newJMSException("Failed to close environment. Reason: " + e, e);
221 }
222 finally {
223 environment = null;
224 }
225 }
226 }
227
228
229
230 public File getDirectory() {
231 return directory;
232 }
233
234 public void setDirectory(File directory) {
235 this.directory = directory;
236 }
237
238 public WireFormat getWireFormat() {
239 return wireFormat;
240 }
241
242 public void setWireFormat(WireFormat wireFormat) {
243 this.wireFormat = wireFormat;
244 }
245
246 public TransactionConfig getTransactionConfig() {
247 return transactionConfig;
248 }
249
250 public void setTransactionConfig(TransactionConfig transactionConfig) {
251 this.transactionConfig = transactionConfig;
252 }
253
254 public Environment getEnvironment() {
255 return environment;
256 }
257
258 public void setEnvironment(Environment environment) {
259 this.environment = environment;
260 }
261
262 public DatabaseConfig getConfig() {
263 return config;
264 }
265
266 public void setConfig(DatabaseConfig config) {
267 this.config = config;
268 }
269
270
271
272 protected Database createDatabase(String name) throws DatabaseException {
273
274
275 if (log.isTraceEnabled()) {
276 log.trace("Opening database: " + name);
277 }
278 return environment.openDatabase(null, name, config);
279 }
280
281 protected SecondaryDatabase createSecondaryDatabase(String name, Database database, SecondaryConfig secondaryConfig) throws DatabaseException {
282
283
284 if (log.isTraceEnabled()) {
285 log.trace("Opening secondary database: " + name);
286 }
287 return environment.openSecondaryDatabase(null, name, database, secondaryConfig);
288 }
289
290 public static JMSException closeDatabase(Database db, JMSException firstException) {
291 if (db != null) {
292
293 if (log.isTraceEnabled()) {
294 try {
295 log.trace("Closing database: " + db.getDatabaseName());
296 }
297 catch (DatabaseException e) {
298 log.trace("Closing database: " + db + " but could not get the name: " + e);
299 }
300 }
301 try {
302
303 db.close();
304 }
305 catch (DatabaseException e) {
306 if (firstException == null) {
307 firstException = JMSExceptionHelper.newJMSException("Failed to close database. Reason: " + e, e);
308 }
309 }
310 }
311 return firstException;
312 }
313
314 protected SecondaryConfig createSecondaryConfig(SecondaryKeyCreator keyGenerator) {
315 SecondaryConfig answer = new SecondaryConfig();
316 answer.setKeyCreator(keyGenerator);
317 answer.setAllowCreate(true);
318 answer.setAllowPopulate(true);
319 answer.setTransactional(true);
320 return answer;
321 }
322 }