View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdbn;
19  
20  import com.sleepycat.db.Db;
21  import com.sleepycat.db.DbException;
22  import com.sleepycat.db.DbTxn;
23  import com.sleepycat.db.Dbt;
24  import org.codehaus.activemq.message.ActiveMQMessage;
25  import org.codehaus.activemq.message.MessageAck;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.service.MessageContainer;
28  import org.codehaus.activemq.service.MessageIdentity;
29  import org.codehaus.activemq.service.QueueMessageContainer;
30  import org.codehaus.activemq.store.MessageStore;
31  import org.codehaus.activemq.util.JMSExceptionHelper;
32  
33  import javax.jms.JMSException;
34  import java.io.IOException;
35  
36  /***
37   * An implementation of {@link MessageStore} using
38   * <a href="http://www.sleepycat.com/">native C Berkeley DB</a>
39   *
40   * @version $Revision: 1.2 $
41   */
42  public class BDbMessageStore implements MessageStore {
43      private static final int SUCCESS = 0;
44  
45      private Db database;
46      private WireFormat wireFormat;
47      private MessageContainer container;
48  
49      public void setMessageContainer(MessageContainer container) {
50          this.container = container;
51      }
52  
53      public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
54          String messageID = message.getJMSMessageID();
55          try {
56              Dbt key = createKey(messageID);
57              Dbt value = new Dbt(asBytes(message));
58              database.put(BDbHelper.getTransaction(), key, value, 0);
59              return new MessageIdentity(messageID);
60          }
61          catch (DbException e) {
62              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
63          }
64          catch (IOException e) {
65              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
66          }
67      }
68  
69      public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
70          String messageID = identity.getMessageID();
71          ActiveMQMessage answer = null;
72          try {
73              Dbt key = createKey(messageID);
74              Dbt value = new Dbt();
75              if (database.get(null, key, value, 0) == SUCCESS) {
76                  answer = extractMessage(value);
77              }
78              return answer;
79          }
80          catch (DbException e) {
81              throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
82          }
83          catch (IOException e) {
84              throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
85          }
86      }
87  
88  
89      public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
90          DbTxn transaction = null;
91          String messageID = identity.getMessageID();
92          try {
93              database.delete(BDbHelper.getTransaction(), createKey(messageID), 0);
94          }
95          catch (DbException e) {
96              throw JMSExceptionHelper.newJMSException("Failed to delete message: " + messageID + " from container: " + e, e);
97          }
98      }
99  
100     public void recover(QueueMessageContainer container) throws JMSException {
101         /*** TODO */
102     }
103 
104 
105     public void start() throws JMSException {
106     }
107 
108     public void stop() throws JMSException {
109         try {
110             database.close(0);
111         }
112         catch (DbException e) {
113             throw JMSExceptionHelper.newJMSException("Failed to close MessageStore. Reason: " + e, e);
114         }
115     }
116 
117 
118     // Implementation methods
119     //-------------------------------------------------------------------------
120     protected Dbt createKey(String messageID) {
121         Dbt key = new Dbt(asBytes(messageID));
122         return key;
123     }
124 
125     protected ActiveMQMessage extractMessage(Dbt value) throws IOException {
126         // we must synchronize access to WireFormat
127         synchronized (wireFormat) {
128             return (ActiveMQMessage) wireFormat.fromBytes(value.getData());
129         }
130     }
131 
132 
133     protected byte[] asBytes(ActiveMQMessage message) throws IOException, JMSException {
134         // we must synchronize access to WireFormat
135         synchronized (wireFormat) {
136             return wireFormat.toBytes(message);
137         }
138     }
139 
140     protected byte[] asBytes(String messageID) {
141         return messageID.getBytes();
142     }
143 }