1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.bdbn;
19
20 import com.sleepycat.db.Db;
21 import com.sleepycat.db.DbException;
22 import com.sleepycat.db.DbTxn;
23 import com.sleepycat.db.Dbt;
24 import org.codehaus.activemq.message.ActiveMQMessage;
25 import org.codehaus.activemq.message.MessageAck;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.service.MessageContainer;
28 import org.codehaus.activemq.service.MessageIdentity;
29 import org.codehaus.activemq.service.QueueMessageContainer;
30 import org.codehaus.activemq.store.MessageStore;
31 import org.codehaus.activemq.util.JMSExceptionHelper;
32
33 import javax.jms.JMSException;
34 import java.io.IOException;
35
36 /***
37 * An implementation of {@link MessageStore} using
38 * <a href="http://www.sleepycat.com/">native C Berkeley DB</a>
39 *
40 * @version $Revision: 1.2 $
41 */
42 public class BDbMessageStore implements MessageStore {
43 private static final int SUCCESS = 0;
44
45 private Db database;
46 private WireFormat wireFormat;
47 private MessageContainer container;
48
49 public void setMessageContainer(MessageContainer container) {
50 this.container = container;
51 }
52
53 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
54 String messageID = message.getJMSMessageID();
55 try {
56 Dbt key = createKey(messageID);
57 Dbt value = new Dbt(asBytes(message));
58 database.put(BDbHelper.getTransaction(), key, value, 0);
59 return new MessageIdentity(messageID);
60 }
61 catch (DbException e) {
62 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
63 }
64 catch (IOException e) {
65 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
66 }
67 }
68
69 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
70 String messageID = identity.getMessageID();
71 ActiveMQMessage answer = null;
72 try {
73 Dbt key = createKey(messageID);
74 Dbt value = new Dbt();
75 if (database.get(null, key, value, 0) == SUCCESS) {
76 answer = extractMessage(value);
77 }
78 return answer;
79 }
80 catch (DbException e) {
81 throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
82 }
83 catch (IOException e) {
84 throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
85 }
86 }
87
88
89 public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
90 DbTxn transaction = null;
91 String messageID = identity.getMessageID();
92 try {
93 database.delete(BDbHelper.getTransaction(), createKey(messageID), 0);
94 }
95 catch (DbException e) {
96 throw JMSExceptionHelper.newJMSException("Failed to delete message: " + messageID + " from container: " + e, e);
97 }
98 }
99
100 public void recover(QueueMessageContainer container) throws JMSException {
101 /*** TODO */
102 }
103
104
105 public void start() throws JMSException {
106 }
107
108 public void stop() throws JMSException {
109 try {
110 database.close(0);
111 }
112 catch (DbException e) {
113 throw JMSExceptionHelper.newJMSException("Failed to close MessageStore. Reason: " + e, e);
114 }
115 }
116
117
118
119
120 protected Dbt createKey(String messageID) {
121 Dbt key = new Dbt(asBytes(messageID));
122 return key;
123 }
124
125 protected ActiveMQMessage extractMessage(Dbt value) throws IOException {
126
127 synchronized (wireFormat) {
128 return (ActiveMQMessage) wireFormat.fromBytes(value.getData());
129 }
130 }
131
132
133 protected byte[] asBytes(ActiveMQMessage message) throws IOException, JMSException {
134
135 synchronized (wireFormat) {
136 return wireFormat.toBytes(message);
137 }
138 }
139
140 protected byte[] asBytes(String messageID) {
141 return messageID.getBytes();
142 }
143 }