1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.store.cache;
20
21 import javax.jms.JMSException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.message.ActiveMQMessage;
26 import org.codehaus.activemq.message.MessageAck;
27 import org.codehaus.activemq.service.MessageIdentity;
28 import org.codehaus.activemq.service.QueueMessageContainer;
29 import org.codehaus.activemq.store.MessageStore;
30
31 /***
32 * A MessageStore that uses an in memory cache to speed up getMessage() method calls.
33 *
34 * @version $Revision: 1.2 $
35 */
36 public class CacheMessageStore implements MessageStore, CacheMessageStoreAware {
37
38 private static final Log log = LogFactory.getLog(CacheMessageStore.class);
39
40 private final CachePersistenceAdapter peristenceAdapter;
41 private final MessageStore longTermStore;
42 private final MessageCache cache;
43
44 public CacheMessageStore(CachePersistenceAdapter adapter, MessageStore longTermStore, MessageCache cache) {
45 this.peristenceAdapter = adapter;
46 this.longTermStore = longTermStore;
47 this.cache = cache;
48
49
50 setCacheMessageStore(this);
51 }
52
53 /***
54 * Add the meessage to the long term store and cache it.
55 */
56 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
57 MessageIdentity messageIdentity = longTermStore.addMessage(message);
58 cache.put(messageIdentity.getMessageID(), message);
59 return messageIdentity;
60 }
61
62 /***
63 * Remove the meessage to the long term store and remove it from the cache.
64 */
65 public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
66 longTermStore.removeMessage( identity, ack);
67 cache.remove(identity.getMessageID());
68 }
69
70 /***
71 * Return the message from the cache or go to the longTermStore if it is not
72 * in there.
73 */
74 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
75 ActiveMQMessage answer=null;
76 answer = cache.get(identity.getMessageID());
77 if( answer!=null )
78 return answer;
79
80 answer = longTermStore.getMessage(identity);
81 cache.put(identity.getMessageID(), answer);
82 return answer;
83 }
84
85 /***
86 * Replays the checkpointStore first as those messages are the oldest ones,
87 * then messages are replayed from the transaction log and then the cache is
88 * updated.
89 *
90 * @param container
91 * @throws JMSException
92 */
93 public synchronized void recover(final QueueMessageContainer container)
94 throws JMSException {
95 longTermStore.recover(container);
96 }
97
98 public void start() throws JMSException {
99 longTermStore.start();
100 }
101
102 public void stop() throws JMSException {
103 longTermStore.stop();
104 }
105
106 /***
107 * @return Returns the longTermStore.
108 */
109 public MessageStore getLongTermStore() {
110 return longTermStore;
111 }
112
113 /***
114 * @see org.codehaus.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.codehaus.activemq.store.cache.CacheMessageStore)
115 */
116 public void setCacheMessageStore(CacheMessageStore store) {
117
118
119 if( longTermStore instanceof CacheMessageStoreAware ) {
120 ((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
121 }
122 }
123
124 }