View Javadoc

1   /***
2    * 
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    * 
6    * Licensed under the Apache License, Version 2.0 (the "License"); 
7    * you may not use this file except in compliance with the License. 
8    * You may obtain a copy of the License at 
9    * 
10   * http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS, 
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15   * See the License for the specific language governing permissions and 
16   * limitations under the License. 
17   * 
18   **/
19  package org.codehaus.activemq.store.jdbc;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.ActiveMQMessage;
24  import org.codehaus.activemq.message.ConsumerInfo;
25  import org.codehaus.activemq.message.MessageAck;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.service.MessageContainer;
28  import org.codehaus.activemq.service.MessageIdentity;
29  import org.codehaus.activemq.service.SubscriberEntry;
30  import org.codehaus.activemq.service.Subscription;
31  import org.codehaus.activemq.store.TopicMessageStore;
32  import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
33  import org.codehaus.activemq.util.JMSExceptionHelper;
34  
35  import javax.jms.JMSException;
36  import java.sql.Connection;
37  import java.sql.SQLException;
38  
39  /***
40   * @version $Revision: 1.5 $
41   */
42  public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
43  
44      private static final Log log = LogFactory.getLog(JDBCTopicMessageStore.class);
45      private MessageContainer container;
46  
47      public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
48          super(persistenceAdapter, adapter, wireFormat, destinationName);
49      }
50  
51      public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
52          long seq = ((Long) messageIdentity.getSequenceNumber()).longValue();
53          // Get a connection and insert the message into the DB.
54          Connection c = null;
55          try {
56              c = persistenceAdapter.getConnection();
57              adapter.doSetLastAck(c, destinationName, subscription.getPersistentKey(),  seq);
58          }
59          catch (SQLException e) {
60              throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e);
61          }
62          finally {
63              persistenceAdapter.returnConnection(c);
64          }
65      }
66  
67      /***
68       * @see org.codehaus.activemq.store.TopicMessageStore#getLastestMessageIdentity()
69       */
70      public MessageIdentity getLastestMessageIdentity() throws JMSException {
71          return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
72      }
73  
74      /***
75       * @see org.codehaus.activemq.store.TopicMessageStore#recoverSubscription(org.codehaus.activemq.service.Subscription, org.codehaus.activemq.service.MessageIdentity)
76       */
77      public void recoverSubscription(final Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
78  
79          Connection c = null;
80          try {
81              c = persistenceAdapter.getConnection();
82              adapter.doRecoverSubscription(c, subscription.getPersistentKey(), destinationName, new MessageListResultHandler() {
83                  public void onMessage(long seq, String messageID) throws JMSException {
84                      MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
85                      ActiveMQMessage message = getMessage(messageIdentity);
86                      subscription.addMessage(container, message);
87                  }
88              });
89          }
90          catch (SQLException e) {
91              throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
92          }
93          finally {
94              persistenceAdapter.returnConnection(c);
95          }
96      }
97  
98      /***
99       * @see org.codehaus.activemq.store.TopicMessageStore#setSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo, org.codehaus.activemq.service.SubscriberEntry)
100      */
101     public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
102         String key = info.getConsumerKey();
103         Connection c = null;
104         try {
105             c = persistenceAdapter.getConnection();
106             adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
107         }
108         catch (SQLException e) {
109             throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
110         }
111         finally {
112             persistenceAdapter.returnConnection(c);
113         }
114     }
115 
116     /***
117      * @see org.codehaus.activemq.store.TopicMessageStore#getSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo)
118      */
119     public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
120         String key = info.getConsumerKey();
121         Connection c = null;
122         try {
123             c = persistenceAdapter.getConnection();
124             return adapter.doGetSubscriberEntry(c, destinationName, key);
125         }
126         catch (SQLException e) {
127             throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
128         }
129         finally {
130             persistenceAdapter.returnConnection(c);
131         }
132     }
133 
134 
135     /***
136      * @see org.codehaus.activemq.store.TopicMessageStore#setMessageContainer(org.codehaus.activemq.service.MessageContainer)
137      */
138     public void setMessageContainer(MessageContainer container) {
139         this.container = container;
140     }
141 
142     public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
143     }
144 
145     public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
146     }
147 }