1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.bdb;
19
20 import com.sleepycat.je.Database;
21 import com.sleepycat.je.DatabaseEntry;
22 import com.sleepycat.je.DatabaseException;
23 import com.sleepycat.je.LockMode;
24 import com.sleepycat.je.OperationStatus;
25 import com.sleepycat.je.SecondaryConfig;
26 import com.sleepycat.je.SecondaryCursor;
27 import com.sleepycat.je.SecondaryDatabase;
28 import com.sleepycat.je.Transaction;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.message.ConsumerInfo;
33 import org.codehaus.activemq.message.MessageAck;
34 import org.codehaus.activemq.message.WireFormat;
35 import org.codehaus.activemq.service.MessageIdentity;
36 import org.codehaus.activemq.service.SubscriberEntry;
37 import org.codehaus.activemq.service.Subscription;
38 import org.codehaus.activemq.store.TopicMessageStore;
39 import org.codehaus.activemq.util.JMSExceptionHelper;
40
41 import javax.jms.JMSException;
42 import java.io.IOException;
43
44 /***
45 * @version $Revision: 1.2 $
46 */
47 public class BDbTopicMessageStore extends BDbMessageStore implements TopicMessageStore {
48 private static final Log log = LogFactory.getLog(BDbTopicMessageStore.class);
49
50 private Database subscriptionDatabase;
51
52 public BDbTopicMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat, Database subscriptionDatabase) {
53 super(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat);
54 this.subscriptionDatabase = subscriptionDatabase;
55 }
56
57 public void incrementMessageCount(MessageIdentity messageId) {
58 /*** TODO */
59 }
60
61 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) {
62 /*** TODO */
63 }
64
65 public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
66 checkClosed();
67 try {
68 doSetLastAcknowledgedMessageIdentity(subscription, messageIdentity);
69 }
70 catch (DatabaseException e) {
71 throw JMSExceptionHelper.newJMSException("Failed to update last acknowledge messageID for : "
72 + messageIdentity + ". Reason: " + e, e);
73 }
74 }
75
76
77 public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
78 checkClosed();
79 SecondaryCursor cursor = null;
80 try {
81 DatabaseEntry lastAckKey = getLastAcknowledgedMessageID(subscription, lastDispatchedMessage);
82 if (lastAckKey != null) {
83 cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
84 DatabaseEntry valueEntry = new DatabaseEntry();
85 OperationStatus status = cursor.getSearchKey(lastAckKey, valueEntry, LockMode.DEFAULT);
86 if (status != OperationStatus.SUCCESS) {
87 log.error("Could not find the last acknowledged record for: " + subscription + ". Status: " + status);
88 }
89 else {
90 while (true) {
91
92 status = cursor.getNext(lastAckKey, valueEntry, LockMode.DEFAULT);
93 if (status != OperationStatus.SUCCESS) {
94 if (status != OperationStatus.NOTFOUND) {
95 log.warn("Strange result when iterating to end of collection: " + status);
96 }
97 break;
98 }
99
100 ActiveMQMessage message = extractMessage(valueEntry);
101 subscription.addMessage(getContainer(), message);
102 }
103 }
104 }
105 }
106 catch (DatabaseException e) {
107 throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: "
108 + subscription + ". Reason: " + e, e);
109 }
110 catch (IOException e) {
111 throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: "
112 + subscription + ". Reason: " + e, e);
113 }
114 finally {
115 if (cursor != null) {
116 try {
117 cursor.close();
118 }
119 catch (DatabaseException e) {
120 log.warn("Caught exception closing cursor: " + e, e);
121 }
122 }
123 }
124 }
125
126 public MessageIdentity getLastestMessageIdentity() throws JMSException {
127 checkClosed();
128 SecondaryCursor cursor = null;
129 try {
130 cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
131 DatabaseEntry keyEntry = new DatabaseEntry();
132 DatabaseEntry valueEntry = new DatabaseEntry();
133 OperationStatus status = cursor.getLast(keyEntry, valueEntry, LockMode.DEFAULT);
134 if (status == OperationStatus.SUCCESS) {
135 if (log.isDebugEnabled()) {
136 log.debug("Loaded last sequence number of: " + BDbHelper.longFromBytes(keyEntry.getData()));
137 }
138 return new MessageIdentity(null, keyEntry);
139 }
140 else if (status != OperationStatus.NOTFOUND) {
141 log.error("Could not find the last sequence number. Status: " + status);
142 }
143 return null;
144 }
145 catch (DatabaseException e) {
146 throw JMSExceptionHelper.newJMSException("Unable to load the last sequence number. Reason: " + e, e);
147 }
148 finally {
149 if (cursor != null) {
150 try {
151 cursor.close();
152 }
153 catch (DatabaseException e) {
154 log.warn("Caught exception closing cursor: " + e, e);
155 }
156 }
157 }
158 }
159
160 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
161 return null; /*** TODO */
162 }
163
164 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
165 /*** TODO */
166 }
167
168 public synchronized void stop() throws JMSException {
169 JMSException firstException = BDbPersistenceAdapter.closeDatabase(subscriptionDatabase, null);
170 subscriptionDatabase = null;
171 super.stop();
172 if (firstException != null) {
173 throw JMSExceptionHelper.newJMSException("Unable to close the subscription database: " + firstException, firstException);
174 }
175 }
176
177
178
179
180 protected DatabaseEntry getLastAcknowledgedMessageID(Subscription subscription, MessageIdentity lastDispatchedMessage) throws DatabaseException {
181 DatabaseEntry key = createKey(subscription.getPersistentKey());
182 DatabaseEntry value = new DatabaseEntry();
183 OperationStatus status = subscriptionDatabase.get(null, key, value, null);
184 if (status == OperationStatus.SUCCESS) {
185 return value;
186 }
187 else if (status == OperationStatus.NOTFOUND) {
188
189 if (lastDispatchedMessage != null) {
190 return doSetLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
191 }
192 }
193 else {
194 log.warn("Unexpected status return from querying lastAcknowledgeSequenceNumber for: " + subscription + " status: " + status);
195 }
196 return null;
197 }
198
199 protected DatabaseEntry doSetLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws DatabaseException {
200 Transaction transaction = BDbHelper.getTransaction();
201 DatabaseEntry key = createKey(subscription.getPersistentKey());
202 DatabaseEntry value = getSequenceNumberKey(messageIdentity);
203 subscriptionDatabase.put(transaction, key, value);
204 return value;
205 }
206 }