View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbm;
19  
20  import jdbm.btree.BTree;
21  import jdbm.helper.Tuple;
22  import jdbm.helper.TupleBrowser;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.AlreadyClosedException;
26  import org.codehaus.activemq.message.ActiveMQMessage;
27  import org.codehaus.activemq.message.MessageAck;
28  import org.codehaus.activemq.service.MessageContainer;
29  import org.codehaus.activemq.service.MessageIdentity;
30  import org.codehaus.activemq.service.QueueMessageContainer;
31  import org.codehaus.activemq.service.impl.MessageEntry;
32  import org.codehaus.activemq.store.MessageStore;
33  import org.codehaus.activemq.util.JMSExceptionHelper;
34  
35  import javax.jms.JMSException;
36  import java.io.IOException;
37  
38  /***
39   * @version $Revision: 1.4 $
40   */
41  public class JdbmMessageStore implements MessageStore {
42      private static final Log log = LogFactory.getLog(JdbmMessageStore.class);
43  
44      private MessageContainer container;
45      private BTree messageTable;
46      private BTree orderedIndex;
47      private long lastSequenceNumber = 0;
48  
49      public JdbmMessageStore(BTree messageTable, BTree orderedIndex) {
50          this.messageTable = messageTable;
51          this.orderedIndex = orderedIndex;
52      }
53  
54      public void setMessageContainer(MessageContainer container) {
55          this.container = container;
56      }
57  
58      public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
59          if (log.isDebugEnabled()) {
60              log.debug("Adding message to container: " + message);
61          }
62          MessageEntry entry = new MessageEntry(message);
63          Object sequenceNumber = null;
64          synchronized (this) {
65              sequenceNumber = new Long(++lastSequenceNumber);
66          }
67          try {
68              String messageID = message.getJMSMessageID();
69              getMessageTable().insert(messageID, entry, true);
70              getOrderedIndex().insert(sequenceNumber, messageID, true);
71  
72              MessageIdentity answer = message.getJMSMessageIdentity();
73              answer.setSequenceNumber(sequenceNumber);
74              return answer;
75          }
76          catch (IOException e) {
77              throw JMSExceptionHelper.newJMSException("Failed to add message: " + message + " in container: " + e, e);
78          }
79      }
80  
81      public synchronized ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
82          String messageID = identity.getMessageID();
83          ActiveMQMessage message = null;
84          try {
85              MessageEntry entry = (MessageEntry) getMessageTable().find(messageID);
86              if (entry != null) {
87                  message = entry.getMessage();
88                  message.getJMSMessageIdentity().setSequenceNumber(identity.getSequenceNumber());
89              }
90          }
91          catch (IOException e) {
92              throw JMSExceptionHelper.newJMSException("Failed to get message for messageID: " + messageID + " " + e, e);
93          }
94          return message;
95      }
96  
97      public synchronized void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
98          String messageID = identity.getMessageID();
99          Object sequenceNumber = null;
100         if (messageID == null) {
101             throw new JMSException("Cannot remove message with null messageID for sequence number: " + identity.getSequenceNumber());
102         }
103         try {
104             sequenceNumber = identity.getSequenceNumber();
105             if (sequenceNumber == null) {
106                 sequenceNumber = findSequenceNumber(messageID);
107                 identity.setSequenceNumber(sequenceNumber);
108             }
109             getMessageTable().remove(messageID);
110             getOrderedIndex().remove(sequenceNumber);
111         }
112         catch (IOException e) {
113             throw JMSExceptionHelper.newJMSException("Failed to delete message for messageID: " + messageID + " " + e, e);
114         }
115         catch (IllegalArgumentException e) {
116             log.warn("Could not find sequence number: " + sequenceNumber + " in queue. " + e);
117         }
118     }
119 
120     public synchronized void recover(QueueMessageContainer container) throws JMSException {
121         try {
122             Tuple tuple = new Tuple();
123             TupleBrowser iter = getOrderedIndex().browse();
124             while (iter.getNext(tuple)) {
125                 Long key = (Long) tuple.getKey();
126                 MessageIdentity messageIdentity = null;
127                 if (key != null) {
128                     String messageID = (String) tuple.getValue();
129                     if (messageID != null) {
130                         messageIdentity = new MessageIdentity(messageID, key);
131                     }
132                 }
133                 if (messageIdentity != null) {
134                     container.recoverMessageToBeDelivered(messageIdentity);
135                 }
136                 else {
137                     log.warn("Could not find message for sequenceNumber: " + key);
138                 }
139             }
140         }
141         catch (IOException e) {
142             throw JMSExceptionHelper.newJMSException("Failed to recover the durable queue store. Reason: " + e, e);
143         }
144     }
145 
146     public synchronized void start() throws JMSException {
147         try {
148             // lets iterate through all IDs from the
149             Tuple tuple = new Tuple();
150             Long lastSequenceNumber = null;
151             TupleBrowser iter = getOrderedIndex().browse();
152             while (iter.getNext(tuple)) {
153                 lastSequenceNumber = (Long) tuple.getKey();
154             }
155             if (lastSequenceNumber != null) {
156                 this.lastSequenceNumber = lastSequenceNumber.longValue();
157                 if (log.isDebugEnabled()) {
158                     log.debug("Last sequence number is: " + lastSequenceNumber + " for: " + this);
159                 }
160             }
161             else {
162                 if (log.isDebugEnabled()) {
163                     log.debug("Started empty database for: " + this);
164                 }
165             }
166         }
167         catch (IOException e) {
168             throw JMSExceptionHelper.newJMSException("Failed to find the last sequence number. Reason: " + e, e);
169         }
170     }
171 
172     public synchronized void stop() throws JMSException {
173         JMSException firstException = closeTable(orderedIndex, null);
174         firstException = closeTable(messageTable, firstException);
175         orderedIndex = null;
176         messageTable = null;
177         if (firstException != null) {
178             throw firstException;
179         }
180     }
181 
182 
183     // Implementation methods
184     //-------------------------------------------------------------------------
185 
186     protected MessageContainer getContainer() {
187         return container;
188     }
189 
190     protected long getLastSequenceNumber() {
191         return lastSequenceNumber;
192     }
193 
194     protected BTree getMessageTable() throws AlreadyClosedException {
195         if (messageTable == null) {
196             throw new AlreadyClosedException("JDBM MessageStore");
197         }
198         return messageTable;
199     }
200 
201     protected BTree getOrderedIndex() throws AlreadyClosedException {
202         if (orderedIndex == null) {
203             throw new AlreadyClosedException("JDBM MessageStore");
204         }
205         return orderedIndex;
206     }
207 
208 
209     /***
210      * Looks up the message using the given sequence number
211      */
212     protected ActiveMQMessage getMessageBySequenceNumber(Long sequenceNumber) throws IOException, JMSException {
213         ActiveMQMessage message = null;
214         String messageID = (String) getOrderedIndex().find(sequenceNumber);
215         if (messageID != null) {
216             message = getMessage(new MessageIdentity(messageID, sequenceNumber));
217         }
218         return message;
219     }
220 
221     /***
222      * Finds the sequence number for the given messageID
223      *
224      * @param messageID
225      * @return
226      */
227     protected Object findSequenceNumber(String messageID) throws IOException, AlreadyClosedException {
228         log.warn("Having to table scan to find the sequence number for messageID: " + messageID);
229 
230         Tuple tuple = new Tuple();
231         TupleBrowser iter = getOrderedIndex().browse();
232         while (iter.getNext(tuple)) {
233             Object value = tuple.getValue();
234             if (messageID.equals(value)) {
235                 return tuple.getKey();
236             }
237         }
238         return null;
239     }
240 
241     protected JMSException closeTable(BTree table, JMSException firstException) {
242         table = null;
243         return null;
244     }
245 }