1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.howl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQMessage;
23 import org.codehaus.activemq.message.MessageAck;
24 import org.codehaus.activemq.message.Packet;
25 import org.codehaus.activemq.message.WireFormat;
26 import org.codehaus.activemq.service.MessageIdentity;
27 import org.codehaus.activemq.service.QueueMessageContainer;
28 import org.codehaus.activemq.store.MessageStore;
29 import org.codehaus.activemq.util.Callback;
30 import org.codehaus.activemq.util.JMSExceptionHelper;
31 import org.codehaus.activemq.util.TransactionTemplate;
32 import org.objectweb.howl.log.LogConfigurationException;
33 import org.objectweb.howl.log.LogException;
34 import org.objectweb.howl.log.LogRecord;
35 import org.objectweb.howl.log.Logger;
36 import org.objectweb.howl.log.ReplayListener;
37
38 import javax.jms.JMSException;
39 import java.io.IOException;
40 import java.util.LinkedHashMap;
41 import java.util.Map;
42
43 /***
44 * An implementation of {@link MessageStore} designed for
45 * optimal use with <a href="http://howl.objectweb.org/">Howl</a>
46 * as the transaction log and then checkpointing asynchronously
47 * on a timeout with some other persistent storage.
48 *
49 * @version $Revision: 1.3 $
50 */
51 public class HowlMessageStore implements MessageStore {
52
53 private static final int DEFAULT_RECORD_SIZE = 64 * 1024;
54 private static final Log log = LogFactory.getLog(HowlMessageStore.class);
55
56 private HowlPersistenceAdapter longTermPersistence;
57 private MessageStore longTermStore;
58 private Logger transactionLog;
59 private WireFormat wireFormat;
60 private TransactionTemplate transactionTemplate;
61 private int maximumCacheSize = 100;
62 private Map map = new LinkedHashMap();
63 private boolean sync = true;
64 private long lastLogMark;
65 private Exception firstException;
66
67 public HowlMessageStore(HowlPersistenceAdapter adapter, MessageStore checkpointStore, Logger transactionLog, WireFormat wireFormat) {
68 this.longTermPersistence = adapter;
69 this.longTermStore = checkpointStore;
70 this.transactionLog = transactionLog;
71 this.wireFormat = wireFormat;
72 this.transactionTemplate = new TransactionTemplate(adapter);
73 }
74
75
76 /***
77 * This method is synchronized to ensure that only 1 thread can write to the log and cache
78 * and possibly checkpoint at once, to preserve order across the transaction log,
79 * cache and checkpointStore.
80 */
81 public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
82
83 writePacket(message);
84
85
86 if (!addMessageToCache(message)) {
87 log.warn("Not enough RAM to store the active transaction log and so we're having to force" +
88 "a checkpoint so that we can ensure that reads are efficient and do not have to " +
89 "replay the transaction log");
90 checkpoint(message);
91
92
93 longTermStore.addMessage(message);
94 }
95 return message.getJMSMessageIdentity();
96 }
97
98 /***
99 * Lets ensure that readers don't block writers so there only synchronization on
100 * the cache and checkpointStore.
101 */
102 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
103 ActiveMQMessage answer = null;
104 synchronized (map) {
105 answer = (ActiveMQMessage) map.get(identity.getMessageID());
106 }
107 if (answer == null) {
108 answer = longTermStore.getMessage(identity);
109 }
110 return answer;
111 }
112
113 /***
114 * Removes can be done in any order so we only synchronize on the cache and
115 * checkpointStore
116 */
117 public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
118
119 writePacket(ack);
120
121 synchronized (map) {
122 map.remove(identity.getMessageID());
123 }
124 longTermPersistence.onMessageRemove(this);
125 }
126
127 /***
128 * Replays the checkpointStore first as those messages are the oldest ones,
129 * then messages are replayed from the transaction log and then
130 * the cache is updated.
131 *
132 * @param container
133 * @throws JMSException
134 */
135 public synchronized void recover(final QueueMessageContainer container) throws JMSException {
136 longTermStore.recover(container);
137
138
139
140 firstException = null;
141 try {
142 transactionLog.replay(new ReplayListener() {
143 LogRecord record = new LogRecord(DEFAULT_RECORD_SIZE);
144
145 public void onRecord(LogRecord logRecord) {
146 readPacket(logRecord, container);
147 }
148
149 public void onError(LogException e) {
150 log.error("Error while recovering Howl transaction log: " + e, e);
151 }
152
153 public LogRecord getLogRecord() {
154 return record;
155 }
156 });
157 }
158 catch (LogConfigurationException e) {
159 throw createRecoveryFailedException(e);
160 }
161 if (firstException != null) {
162 if (firstException instanceof JMSException) {
163 throw (JMSException) firstException;
164 }
165 else {
166 throw createRecoveryFailedException(firstException);
167 }
168 }
169 }
170
171 public synchronized void start() throws JMSException {
172 longTermStore.start();
173 }
174
175 public synchronized void stop() throws JMSException {
176 longTermStore.stop();
177 }
178
179 /***
180 * Writes the current RAM cache to the long term, checkpoint store so that the
181 * transaction log can be truncated.
182 */
183 public synchronized void checkpoint() throws JMSException {
184 checkpoint(null);
185 }
186
187
188
189
190 public int getMaximumCacheSize() {
191 return maximumCacheSize;
192 }
193
194 public void setMaximumCacheSize(int maximumCacheSize) {
195 this.maximumCacheSize = maximumCacheSize;
196 }
197
198
199
200
201 /***
202 * Writes the current RAM image of the transaction log to stable, checkpoint store
203 *
204 * @param message is an optional message. This is null for timer based
205 * checkpoints or is the message which cannot fit into the cache if cache-exhaustion
206 * based checkpoints
207 * @throws JMSException
208 */
209 protected void checkpoint(final ActiveMQMessage message) throws JMSException {
210
211 ActiveMQMessage[] temp = null;
212 synchronized (map) {
213 temp = new ActiveMQMessage[map.size()];
214 map.values().toArray(temp);
215
216
217
218 map.clear();
219 }
220
221 final ActiveMQMessage[] data = temp;
222 transactionTemplate.run(new Callback() {
223 public void execute() throws Throwable {
224 for (int i = 0, size = data.length; i < size; i++) {
225 longTermStore.addMessage(data[i]);
226 }
227 if (message != null) {
228 longTermStore.addMessage(message);
229 }
230 }
231 });
232
233 try {
234 transactionLog.mark(lastLogMark);
235 }
236 catch (Exception e) {
237 throw JMSExceptionHelper.newJMSException("Failed to checkpoint the Howl transaction log: " + e, e);
238 }
239 }
240
241 /***
242 * Adds the given message to the cache if there is spare capacity
243 *
244 * @param message
245 * @return true if the message was added to the cache or false
246 */
247 protected boolean addMessageToCache(ActiveMQMessage message) {
248 synchronized (map) {
249 if (map.size() < maximumCacheSize && longTermPersistence.hasCacheCapacity(this)) {
250 map.put(message.getJMSMessageID(), message);
251 return true;
252 }
253 }
254 return false;
255 }
256
257 protected void readPacket(LogRecord logRecord, QueueMessageContainer container) {
258 if (!logRecord.isCTRL() && !logRecord.isEOB() && logRecord.length > 0) {
259 try {
260
261 Packet packet = wireFormat.fromBytes(logRecord.data, 2, logRecord.length - 2);
262 if (packet instanceof ActiveMQMessage) {
263 container.addMessage((ActiveMQMessage) packet);
264 }
265 else if (packet instanceof MessageAck) {
266 MessageAck ack = (MessageAck) packet;
267 container.delete(ack.getMessageIdentity(), ack);
268 }
269 else {
270 log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
271 }
272 }
273 catch (Exception e) {
274 if (firstException == null) {
275 firstException = e;
276 }
277 }
278 }
279 }
280
281 /***
282 * Writes a message to the transaction log using the current sync mode
283 */
284 protected synchronized void writePacket(Packet packet) throws JMSException {
285 try {
286 byte[] data = wireFormat.toBytes(packet);
287 lastLogMark = transactionLog.put(data, sync);
288 }
289 catch (IOException e) {
290 throw createWriteException(packet, e);
291 }
292 catch (LogException e) {
293 throw createWriteException(packet, e);
294 }
295 catch (InterruptedException e) {
296 throw createWriteException(packet, e);
297 }
298 }
299
300
301 protected JMSException createRecoveryFailedException(Exception e) {
302 return JMSExceptionHelper.newJMSException("Failed to recover from Howl transaction log. Reason: " + e, e);
303 }
304
305 protected JMSException createWriteException(Packet packet, Exception e) {
306 return JMSExceptionHelper.newJMSException("Failed to write to Howl transaction log for: " + packet + ". Reason: " + e, e);
307 }
308 }