View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    * 
6    * Licensed under the Apache License, Version 2.0 (the "License"); 
7    * you may not use this file except in compliance with the License. 
8    * You may obtain a copy of the License at 
9    * 
10   * http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS, 
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15   * See the License for the specific language governing permissions and 
16   * limitations under the License. 
17   * 
18   **/
19  package org.codehaus.activemq.store.journal;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  
25  import javax.jms.JMSException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.codehaus.activemq.journal.RecordLocation;
30  import org.codehaus.activemq.message.ActiveMQMessage;
31  import org.codehaus.activemq.message.MessageAck;
32  import org.codehaus.activemq.service.MessageIdentity;
33  import org.codehaus.activemq.service.QueueMessageContainer;
34  import org.codehaus.activemq.store.MessageStore;
35  import org.codehaus.activemq.store.cache.CacheMessageStore;
36  import org.codehaus.activemq.store.cache.CacheMessageStoreAware;
37  import org.codehaus.activemq.util.Callback;
38  import org.codehaus.activemq.util.TransactionTemplate;
39  
40  /***
41   * A MessageStore that uses a Journal to store it's messages.
42   * 
43   * @version $Revision: 1.7 $
44   */
45  public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {
46  
47  	private static final Log log = LogFactory.getLog(JournalMessageStore.class);
48  	
49  	private final static class AckData {
50  		private final RecordLocation location;
51  		private final MessageAck ack;
52  		AckData(MessageAck ack, RecordLocation location) {
53  			this.ack = ack;
54  			this.location = location;			
55  		}
56  	}
57  	
58  	private final JournalPersistenceAdapter peristenceAdapter;
59  	private final MessageStore longTermStore;
60  	private final String destinationName;
61  	private final TransactionTemplate transactionTemplate;
62  	
63  	private HashMap addedMessageLocations = new HashMap();
64  	private ArrayList removedMessageLocations = new ArrayList();
65  	
66  	/*** A MessageStore that we can use to retreive messages quickly. */
67  	private MessageStore cacheMessageStore = this;
68  	
69  	private boolean sync = true;
70  
71  	public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName, boolean sync) {
72  		this.peristenceAdapter = adapter;
73  		this.longTermStore = checkpointStore;
74  		this.destinationName = destinationName;
75  		this.sync=sync;
76  		this.transactionTemplate = new TransactionTemplate(adapter);
77  	}
78  
79  	/***
80  	 * Not synchronized since the Journal has better throughput if you increase
81  	 * the number of conncurrent writes that it is doing.
82  	 */
83  	public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
84  		boolean sync = message.isReceiptRequired();
85  		RecordLocation location = peristenceAdapter.writePacket(destinationName, message, sync);
86  		synchronized(this) {
87  			addedMessageLocations.put(message.getJMSMessageIdentity(), location);
88  		}
89  
90  		// Update the messageIdentity sequence number so that we can reteive the message
91  		// from the journal at a later time.
92  		MessageIdentity messageIdentity = message.getJMSMessageIdentity();
93  		messageIdentity.setSequenceNumber(location);
94  		return messageIdentity;
95  	}	
96  	
97  	/***
98  	 */
99  	public void removeMessage(MessageIdentity identity, MessageAck ack)
100 			throws JMSException {
101 
102 		RecordLocation ackLocation = peristenceAdapter.writePacket(destinationName, ack, sync);
103 
104 		synchronized(this) {
105 			RecordLocation addLocation = (RecordLocation) addedMessageLocations.remove(identity);			
106 			if( addLocation==null ) {
107 				removedMessageLocations.add(new AckData(ack, ackLocation));
108 			}
109 		}		
110 	}
111 
112 	/***
113 	 * @return
114 	 * @throws JMSException
115 	 */
116 	public RecordLocation checkpoint() throws JMSException {
117 		final RecordLocation rc[] = new RecordLocation[]{null};
118 		
119 		// swap out the message hashmaps..
120 		final ArrayList addedMessageIdentitys;
121 		final ArrayList removedMessageLocations;
122 		synchronized(this) {
123 			addedMessageIdentitys = new ArrayList(this.addedMessageLocations.keySet());
124 			removedMessageLocations = this.removedMessageLocations;
125 			this.removedMessageLocations = new ArrayList();
126 		}
127 		
128 		transactionTemplate.run(new Callback() {
129 			public void execute() throws Throwable {
130 				
131 				// Checkpoint the added messages.
132 				Iterator iterator = addedMessageIdentitys.iterator();
133 				while (iterator.hasNext()) {					
134 					MessageIdentity identity = (MessageIdentity) iterator.next();
135 					
136 					ActiveMQMessage msg = getCacheMessage(identity);
137 					longTermStore.addMessage(msg);
138 					synchronized(this) {
139 						RecordLocation location = (RecordLocation)addedMessageLocations.remove(identity);
140 						if( rc[0]==null || rc[0].compareTo(location)<0 ) {
141 							rc[0] = location;
142 						}
143 					}
144 					
145 				}				
146 				
147 				// Checkpoint the removed messages.
148 				iterator = removedMessageLocations.iterator();
149 				while (iterator.hasNext()) {					
150 					AckData data = (AckData)iterator.next();
151 					longTermStore.removeMessage(data.ack.getMessageIdentity(),data.ack);
152 
153 					if( rc[0]==null || rc[0].compareTo(data.location)<0 ) {
154 						rc[0] = data.location;
155 					}
156 				}				
157 				
158 			}
159 
160 		});
161 		
162 		return rc[0];
163 	}
164 	
165 	private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
166 		return cacheMessageStore.getMessage(identity);
167 	}
168 
169 	/***
170 	 * 
171 	 */
172 	public ActiveMQMessage getMessage(MessageIdentity identity)	throws JMSException {
173 		ActiveMQMessage answer=null;
174 		
175 		Object location = identity.getSequenceNumber();
176 		if( location==null ) {
177 			// The sequence number may not have been set but it may still be in the journal.
178 			synchronized(this) {
179 				location = addedMessageLocations.get(identity);
180 			}
181 		}
182 		// Do we have a Journal sequence number?
183 		if(location!=null && location instanceof RecordLocation) {
184 			answer = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation)location);
185 			if( answer !=null )
186 				return answer;
187 		}
188 		
189 		// If all else fails try the long term message store.
190 		return longTermStore.getMessage(identity);
191 	}
192 
193 	/***
194 	 * Replays the checkpointStore first as those messages are the oldest ones,
195 	 * then messages are replayed from the transaction log and then the cache is
196 	 * updated.
197 	 * 
198 	 * @param container
199 	 * @throws JMSException
200 	 */
201 	public synchronized void recover(final QueueMessageContainer container)
202 			throws JMSException {
203 		longTermStore.recover(container);
204 	}
205 
206 	public void start() throws JMSException {
207 		longTermStore.start();
208 	}
209 
210 	public void stop() throws JMSException {
211 		longTermStore.stop();
212 	}
213 
214 	/***
215 	 * @return Returns the longTermStore.
216 	 */
217 	public MessageStore getLongTermStore() {
218 		return longTermStore;
219 	}
220 
221 	/***
222 	 * @see org.codehaus.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.codehaus.activemq.store.cache.CacheMessageStore)
223 	 */
224 	public void setCacheMessageStore(CacheMessageStore store) {
225 		cacheMessageStore = store;
226 		// Propagate the setCacheMessageStore method call to the longTermStore if possible.
227 		if( longTermStore instanceof CacheMessageStoreAware ) {
228 			((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
229 		}
230 	}
231 }