View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdb;
19  
20  import com.sleepycat.je.CursorConfig;
21  import com.sleepycat.je.DatabaseEntry;
22  import com.sleepycat.je.DatabaseException;
23  import com.sleepycat.je.LockMode;
24  import com.sleepycat.je.OperationStatus;
25  import com.sleepycat.je.SecondaryCursor;
26  import com.sleepycat.je.SecondaryDatabase;
27  import com.sleepycat.je.SecondaryKeyCreator;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  
31  import javax.jms.JMSException;
32  
33  /***
34   * @version $Revision: 1.1 $
35   */
36  public class SequenceNumberCreator implements SecondaryKeyCreator {
37      private static final Log log = LogFactory.getLog(SequenceNumberCreator.class);
38  
39      private long counter = 1;
40      private ThreadLocal lastKeyStore = new ThreadLocal();
41      private ThreadLocal deleteKeyStore = new ThreadLocal();
42  
43      public synchronized void initialise(SecondaryDatabase database) throws JMSException, DatabaseException {
44          // lets calculate the size
45          counter = queryLatestKeyInDatabase(database);
46      }
47  
48      public boolean createSecondaryKey(SecondaryDatabase secondaryDatabase, DatabaseEntry keyEntry, DatabaseEntry valueEntry, DatabaseEntry resultEntry) throws DatabaseException {
49          DatabaseEntry nextKey = (DatabaseEntry) deleteKeyStore.get();
50          if (nextKey != null) {
51              resultEntry.setData(nextKey.getData());
52              deleteKeyStore.set(null);
53          }
54          else {
55              long value = 1;
56              synchronized (this) {
57                  value = ++counter;
58              }
59              //System.out.println("Creating new counter key of value: " + value);
60              resultEntry.setData(BDbHelper.asBytes(value));
61          }
62          lastKeyStore.set(resultEntry);
63          return true;
64      }
65  
66      /***
67       * @return the last primary key we created
68       */
69      public DatabaseEntry getLastKey() {
70          return (DatabaseEntry) lastKeyStore.get();
71      }
72  
73      /***
74       * Sets the next primary key to return, such as doing a delete
75       *
76       * @param nextKey
77       */
78      public void setDeleteKey(DatabaseEntry nextKey) {
79          this.deleteKeyStore.set(nextKey);
80      }
81  
82      protected long queryLatestKeyInDatabase(SecondaryDatabase database) throws JMSException, DatabaseException {
83          CursorConfig cursorConfig = null;
84          SecondaryCursor cursor = null;
85          try {
86              cursor = database.openSecondaryCursor(BDbHelper.getTransaction(), cursorConfig);
87              DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
88              DatabaseEntry keyEntry = new DatabaseEntry();
89              DatabaseEntry valueEntry = new DatabaseEntry();
90              OperationStatus status = cursor.getLast(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
91              long answer = 1;
92              if (status != OperationStatus.NOTFOUND) {
93                  if (status == OperationStatus.SUCCESS) {
94                      answer = extractLong(sequenceNumberEntry);
95                  }
96                  else {
97                      throw new JMSException("Invalid status code: " + status + " cannot read last sequence number");
98                  }
99              }
100             return answer;
101         }
102         finally {
103             if (cursor != null) {
104                 try {
105                     cursor.close();
106                 }
107                 catch (DatabaseException e) {
108                     log.warn("Error closing cursor: " + e, e);
109                 }
110             }
111         }
112     }
113 
114     protected long extractLong(DatabaseEntry entry) {
115         return BDbHelper.longFromBytes(entry.getData());
116     }
117 }