View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQMessage;
23  import org.codehaus.activemq.message.MessageAck;
24  import org.codehaus.activemq.service.MessageIdentity;
25  import org.codehaus.activemq.service.QueueList;
26  import org.codehaus.activemq.service.QueueListEntry;
27  import org.codehaus.activemq.service.QueueMessageContainer;
28  import org.codehaus.activemq.store.MessageStore;
29  import org.codehaus.activemq.store.PersistenceAdapter;
30  import org.codehaus.activemq.util.Callback;
31  import org.codehaus.activemq.util.TransactionTemplate;
32  
33  import javax.jms.JMSException;
34  
35  /***
36   * A default implemenation of a Durable Queue based
37   * {@link org.codehaus.activemq.service.MessageContainer}
38   * which acts as an adapter between the {@link org.codehaus.activemq.service.MessageContainerManager}
39   * requirements and those of the persistent {@link MessageStore} implementations.
40   *
41   * @version $Revision: 1.19 $
42   */
43  public class DurableQueueMessageContainer implements QueueMessageContainer {
44      private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
45  
46      private MessageStore messageStore;
47      private String destinationName;
48  
49      /***
50       * messages to be delivered
51       */
52      private QueueList messagesToBeDelivered;
53      /***
54       * messages that have been delivered but not acknowledged
55       */
56      private QueueList deliveredMessages;
57      private PersistenceAdapter persistenceAdapter;
58      private TransactionTemplate transactionTemplate;
59  
60      public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) {
61          this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
62      }
63  
64      public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
65          this.persistenceAdapter = persistenceAdapter;
66          this.messageStore = messageStore;
67          this.destinationName = destinationName;
68          this.messagesToBeDelivered = messagesToBeDelivered;
69          this.deliveredMessages = deliveredMessages;
70          this.transactionTemplate = new TransactionTemplate(persistenceAdapter);
71      }
72  
73      public String getDestinationName() {
74          return destinationName;
75      }
76  
77      public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
78          MessageIdentity answer = messageStore.addMessage(message);
79          synchronized( this ) {
80          	messagesToBeDelivered.add(answer);
81          }
82          return answer;
83  
84      }
85  
86      public synchronized void delete(MessageIdentity messageID, MessageAck ack) throws JMSException {
87          
88          // lets find the cached identity as it has the sequence number
89          // attached to it
90      	MessageIdentity storedIdentity=null;
91  
92      	synchronized( this ) {
93  	        QueueListEntry entry = deliveredMessages.getFirstEntry();
94  	        while (entry != null) {
95  	            MessageIdentity identity = (MessageIdentity) entry.getElement();
96  	            if (messageID.equals(identity)) {
97  	                deliveredMessages.remove(entry);
98  	                storedIdentity=identity;
99  	                break;
100 	            }
101 	            entry = deliveredMessages.getNextEntry(entry);
102 	        }
103 	        
104 	        if (storedIdentity==null) {
105 	            // maybe the messages have not been delivered yet
106 	            // as we are recovering from a previous transaction log
107 	            entry = messagesToBeDelivered.getFirstEntry();
108 	            while (entry != null) {
109 	                MessageIdentity identity = (MessageIdentity) entry.getElement();
110 	                if (messageID.equals(identity)) {
111 	                    messagesToBeDelivered.remove(entry);
112 	                    storedIdentity=identity;
113 	                    break;
114 	                }
115 	                entry = messagesToBeDelivered.getNextEntry(entry);
116 	            }
117 	        }
118     	}
119     	
120         if (storedIdentity==null) {
121             log.error("Attempt to acknowledge unknown messageID: " + messageID);
122         } else {
123             messageStore.removeMessage(storedIdentity, ack);        	
124         }
125         
126     }
127 
128     public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
129         return messageStore.getMessage(messageID);
130     }
131 
132 
133     public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
134         /*** TODO: make more optimal implementation */
135         return getMessage(messageIdentity) != null;
136     }
137 
138     /***
139      * Does nothing since when we receive an acknowledgement on a queue
140      * we can delete the message
141      *
142      * @param messageIdentity
143      */
144     public void registerMessageInterest(MessageIdentity messageIdentity) {
145     }
146 
147     /***
148      * Does nothing since when we receive an acknowledgement on a queue
149      * we can delete the message
150      *
151      * @param messageIdentity
152      * @param ack
153      */
154     public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) {
155     }
156 
157     public ActiveMQMessage poll() throws JMSException {
158         ActiveMQMessage message = null;
159         MessageIdentity messageIdentity=null;
160     	synchronized( this ) {
161 	        messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst();
162 	        if (messageIdentity != null) {
163 	            deliveredMessages.add(messageIdentity);
164 	        }
165     	}
166         if (messageIdentity != null) {
167             message = messageStore.getMessage(messageIdentity);
168         }
169         return message;
170     }
171 
172     public ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
173     	ActiveMQMessage answer = null;
174     	MessageIdentity identity = null;
175     	synchronized( this ) {
176             if (messageID == null) {
177             	identity = (MessageIdentity) messagesToBeDelivered.getFirst();
178             }
179             else {
180                 int index = messagesToBeDelivered.indexOf(messageID);
181                 if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) {
182                 	identity = (MessageIdentity) messagesToBeDelivered.get(index + 1);
183                 }
184             }
185             
186     	}
187         if (identity != null) {
188             answer = messageStore.getMessage(identity);
189         }
190         return answer;
191     }
192 
193 
194     public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
195         boolean result = deliveredMessages.remove(messageIdentity);
196         messagesToBeDelivered.addFirst(messageIdentity);
197     }
198 
199     /***
200      * called to reset dispatch pointers if a new Message Consumer joins
201      *
202      * @throws javax.jms.JMSException
203      */
204     public synchronized void reset() throws JMSException {
205         //new Message Consumer - move all filtered/undispatched messages to front of queue
206         int count = 0;
207         MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
208         while (messageIdentity != null) {
209             messagesToBeDelivered.add(count++, messageIdentity);
210             messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
211         }
212     }
213 
214     public synchronized void start() throws JMSException {
215         final QueueMessageContainer container = this;
216         transactionTemplate.run(new Callback() {
217             public void execute() throws Throwable {
218                 messageStore.start();
219                 messageStore.recover(container);
220             }
221         });
222 
223     }
224 
225     public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
226         messagesToBeDelivered.add(messageIdentity);
227     }
228 
229     public void stop() throws JMSException {
230         messageStore.stop();
231     }
232 }