1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.bdb;
19
20 import com.sleepycat.je.CursorConfig;
21 import com.sleepycat.je.DatabaseEntry;
22 import com.sleepycat.je.DatabaseException;
23 import com.sleepycat.je.LockMode;
24 import com.sleepycat.je.OperationStatus;
25 import com.sleepycat.je.SecondaryCursor;
26 import com.sleepycat.je.SecondaryDatabase;
27 import com.sleepycat.je.SecondaryKeyCreator;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 import javax.jms.JMSException;
32
33 /***
34 * @version $Revision: 1.1 $
35 */
36 public class SequenceNumberCreator implements SecondaryKeyCreator {
37 private static final Log log = LogFactory.getLog(SequenceNumberCreator.class);
38
39 private long counter = 1;
40 private ThreadLocal lastKeyStore = new ThreadLocal();
41 private ThreadLocal deleteKeyStore = new ThreadLocal();
42
43 public synchronized void initialise(SecondaryDatabase database) throws JMSException, DatabaseException {
44
45 counter = queryLatestKeyInDatabase(database);
46 }
47
48 public boolean createSecondaryKey(SecondaryDatabase secondaryDatabase, DatabaseEntry keyEntry, DatabaseEntry valueEntry, DatabaseEntry resultEntry) throws DatabaseException {
49 DatabaseEntry nextKey = (DatabaseEntry) deleteKeyStore.get();
50 if (nextKey != null) {
51 resultEntry.setData(nextKey.getData());
52 deleteKeyStore.set(null);
53 }
54 else {
55 long value = 1;
56 synchronized (this) {
57 value = ++counter;
58 }
59
60 resultEntry.setData(BDbHelper.asBytes(value));
61 }
62 lastKeyStore.set(resultEntry);
63 return true;
64 }
65
66 /***
67 * @return the last primary key we created
68 */
69 public DatabaseEntry getLastKey() {
70 return (DatabaseEntry) lastKeyStore.get();
71 }
72
73 /***
74 * Sets the next primary key to return, such as doing a delete
75 *
76 * @param nextKey
77 */
78 public void setDeleteKey(DatabaseEntry nextKey) {
79 this.deleteKeyStore.set(nextKey);
80 }
81
82 protected long queryLatestKeyInDatabase(SecondaryDatabase database) throws JMSException, DatabaseException {
83 CursorConfig cursorConfig = null;
84 SecondaryCursor cursor = null;
85 try {
86 cursor = database.openSecondaryCursor(BDbHelper.getTransaction(), cursorConfig);
87 DatabaseEntry sequenceNumberEntry = new DatabaseEntry();
88 DatabaseEntry keyEntry = new DatabaseEntry();
89 DatabaseEntry valueEntry = new DatabaseEntry();
90 OperationStatus status = cursor.getLast(sequenceNumberEntry, keyEntry, valueEntry, LockMode.DEFAULT);
91 long answer = 1;
92 if (status != OperationStatus.NOTFOUND) {
93 if (status == OperationStatus.SUCCESS) {
94 answer = extractLong(sequenceNumberEntry);
95 }
96 else {
97 throw new JMSException("Invalid status code: " + status + " cannot read last sequence number");
98 }
99 }
100 return answer;
101 }
102 finally {
103 if (cursor != null) {
104 try {
105 cursor.close();
106 }
107 catch (DatabaseException e) {
108 log.warn("Error closing cursor: " + e, e);
109 }
110 }
111 }
112 }
113
114 protected long extractLong(DatabaseEntry entry) {
115 return BDbHelper.longFromBytes(entry.getData());
116 }
117 }