View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbm;
19  
20  import jdbm.btree.BTree;
21  import jdbm.helper.Tuple;
22  import jdbm.helper.TupleBrowser;
23  import org.codehaus.activemq.AlreadyClosedException;
24  import org.codehaus.activemq.message.ActiveMQMessage;
25  import org.codehaus.activemq.message.ConsumerInfo;
26  import org.codehaus.activemq.message.MessageAck;
27  import org.codehaus.activemq.service.MessageIdentity;
28  import org.codehaus.activemq.service.SubscriberEntry;
29  import org.codehaus.activemq.service.Subscription;
30  import org.codehaus.activemq.store.TopicMessageStore;
31  import org.codehaus.activemq.util.JMSExceptionHelper;
32  
33  import javax.jms.JMSException;
34  import java.io.IOException;
35  
36  /***
37   * @version $Revision: 1.5 $
38   */
39  public class JdbmTopicMessageStore extends JdbmMessageStore implements TopicMessageStore {
40      private static final Integer ONE = new Integer(1);
41  
42      private BTree ackDatabase;
43      private BTree messageCounts;
44      private BTree subscriberDetails;
45  
46      public JdbmTopicMessageStore(BTree messageTable, BTree orderedIndex, BTree ackDatabase, BTree subscriberDetails, BTree messageCounts) {
47          super(messageTable, orderedIndex);
48          this.ackDatabase = ackDatabase;
49          this.subscriberDetails = subscriberDetails;
50          this.messageCounts = messageCounts;
51      }
52  
53      public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
54          try {
55              Integer number = (Integer) getMessageCounts().find(messageId);
56              if (number == null) {
57                  number = ONE;
58              }
59              else {
60                  number = new Integer(number.intValue() + 1);
61              }
62              getMessageCounts().insert(messageId, number, true);
63          }
64          catch (IOException e) {
65              throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for  messageID: " + messageId + ". Reason: " + e, e);
66          }
67      }
68  
69      public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
70          try {
71              Integer number = (Integer) getMessageCounts().find(messageIdentity);
72              if (number == null || number.intValue() <= 1) {
73                  removeMessage(messageIdentity, ack);
74                  if (number != null) {
75                      getMessageCounts().remove(messageIdentity);
76                  }
77              }
78              else {
79                  getMessageCounts().insert(messageIdentity, new Integer(number.intValue() - 1), true);
80                  number = ONE;
81              }
82          }
83          catch (IOException e) {
84              throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for  messageID: " + messageIdentity + ". Reason: " + e, e);
85          }
86      }
87  
88      public synchronized void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
89          String key = subscription.getPersistentKey();
90          try {
91              getAckDatabase().insert(key, messageIdentity, true);
92          }
93          catch (IOException e) {
94              throw JMSExceptionHelper.newJMSException("Failed to set ack messageID: " + messageIdentity + " for consumerId: " + key + ". Reason: " + e, e);
95          }
96      }
97  
98      public synchronized void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
99          try {
100             MessageIdentity lastAcked = getLastAcknowledgedMessageIdentity(subscription);
101             if (lastAcked == null) {
102                 // for a new durable subscription lets write the last ack messageID
103                 // as the previous one that the container delivered to ensure that
104                 // if we go down before acking anything, we will recover to the right point
105                 setLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
106                 return;
107             }
108             Object lastAckedSequenceNumber = lastAcked.getSequenceNumber();
109 
110             // lets iterate through all IDs from the
111 
112             //Tuple tuple = new Tuple();
113             Tuple tuple = getOrderedIndex().findGreaterOrEqual(lastAckedSequenceNumber);
114 
115             TupleBrowser iter = getOrderedIndex().browse();
116             while (iter.getNext(tuple)) {
117                 Long sequenceNumber = (Long) tuple.getKey();
118                 if (sequenceNumber.compareTo(lastAckedSequenceNumber) > 0) {
119                     ActiveMQMessage message = null;
120 
121                     // TODO we could probably tune this some more since we have tuple.getValue() already
122                     message = getMessageBySequenceNumber(sequenceNumber);
123                     if (message != null) {
124                         subscription.addMessage(getContainer(), message);
125                     }
126                 }
127             }
128         }
129         catch (IOException e) {
130             throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
131         }
132     }
133 
134     public synchronized MessageIdentity getLastestMessageIdentity() throws JMSException {
135         return new MessageIdentity(null, new Long(getLastSequenceNumber()));
136     }
137 
138     public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
139         Object key = info.getConsumerKey();
140         try {
141             return (SubscriberEntry) subscriberDetails.find(key);
142         }
143         catch (IOException e) {
144             throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
145         }
146     }
147 
148     public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
149         Object key = info.getConsumerKey();
150         try {
151             subscriberDetails.insert(key, subscriberEntry, true);
152         }
153         catch (IOException e) {
154             throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
155         }
156     }
157 
158     public synchronized void stop() throws JMSException {
159         JMSException firstException = closeTable(ackDatabase, null);
160         firstException = closeTable(messageCounts, firstException);
161         ackDatabase = null;
162         messageCounts = null;
163         super.stop();
164         if (firstException != null) {
165             throw firstException;
166         }
167     }
168 
169     // Implementation methods
170     //-------------------------------------------------------------------------
171     protected BTree getMessageCounts() throws AlreadyClosedException {
172         if (messageCounts == null) {
173             throw new AlreadyClosedException("JDBM TopicMessageStore");
174         }
175         return messageCounts;
176     }
177 
178     protected BTree getAckDatabase() throws AlreadyClosedException {
179         if (ackDatabase == null) {
180             throw new AlreadyClosedException("JDBM TopicMessageStore");
181         }
182         return ackDatabase;
183     }
184 
185     protected MessageIdentity getLastAcknowledgedMessageIdentity(Subscription subscription) throws IOException, AlreadyClosedException {
186         return (MessageIdentity) getAckDatabase().find(subscription.getPersistentKey());
187     }
188 
189 
190 }