1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.store.jdbc;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.ActiveMQMessage;
24 import org.codehaus.activemq.message.ConsumerInfo;
25 import org.codehaus.activemq.message.MessageAck;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.service.MessageContainer;
28 import org.codehaus.activemq.service.MessageIdentity;
29 import org.codehaus.activemq.service.SubscriberEntry;
30 import org.codehaus.activemq.service.Subscription;
31 import org.codehaus.activemq.store.TopicMessageStore;
32 import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
33 import org.codehaus.activemq.util.JMSExceptionHelper;
34
35 import javax.jms.JMSException;
36 import java.sql.Connection;
37 import java.sql.SQLException;
38
39 /***
40 * @version $Revision: 1.5 $
41 */
42 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
43
44 private static final Log log = LogFactory.getLog(JDBCTopicMessageStore.class);
45 private MessageContainer container;
46
47 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
48 super(persistenceAdapter, adapter, wireFormat, destinationName);
49 }
50
51 public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
52 long seq = ((Long) messageIdentity.getSequenceNumber()).longValue();
53
54 Connection c = null;
55 try {
56 c = persistenceAdapter.getConnection();
57 adapter.doSetLastAck(c, destinationName, subscription.getPersistentKey(), seq);
58 }
59 catch (SQLException e) {
60 throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e);
61 }
62 finally {
63 persistenceAdapter.returnConnection(c);
64 }
65 }
66
67 /***
68 * @see org.codehaus.activemq.store.TopicMessageStore#getLastestMessageIdentity()
69 */
70 public MessageIdentity getLastestMessageIdentity() throws JMSException {
71 return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
72 }
73
74 /***
75 * @see org.codehaus.activemq.store.TopicMessageStore#recoverSubscription(org.codehaus.activemq.service.Subscription, org.codehaus.activemq.service.MessageIdentity)
76 */
77 public void recoverSubscription(final Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
78
79 Connection c = null;
80 try {
81 c = persistenceAdapter.getConnection();
82 adapter.doRecoverSubscription(c, subscription.getPersistentKey(), destinationName, new MessageListResultHandler() {
83 public void onMessage(long seq, String messageID) throws JMSException {
84 MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
85 ActiveMQMessage message = getMessage(messageIdentity);
86 subscription.addMessage(container, message);
87 }
88 });
89 }
90 catch (SQLException e) {
91 throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
92 }
93 finally {
94 persistenceAdapter.returnConnection(c);
95 }
96 }
97
98 /***
99 * @see org.codehaus.activemq.store.TopicMessageStore#setSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo, org.codehaus.activemq.service.SubscriberEntry)
100 */
101 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
102 String key = info.getConsumerKey();
103 Connection c = null;
104 try {
105 c = persistenceAdapter.getConnection();
106 adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
107 }
108 catch (SQLException e) {
109 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
110 }
111 finally {
112 persistenceAdapter.returnConnection(c);
113 }
114 }
115
116 /***
117 * @see org.codehaus.activemq.store.TopicMessageStore#getSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo)
118 */
119 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
120 String key = info.getConsumerKey();
121 Connection c = null;
122 try {
123 c = persistenceAdapter.getConnection();
124 return adapter.doGetSubscriberEntry(c, destinationName, key);
125 }
126 catch (SQLException e) {
127 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
128 }
129 finally {
130 persistenceAdapter.returnConnection(c);
131 }
132 }
133
134
135 /***
136 * @see org.codehaus.activemq.store.TopicMessageStore#setMessageContainer(org.codehaus.activemq.service.MessageContainer)
137 */
138 public void setMessageContainer(MessageContainer container) {
139 this.container = container;
140 }
141
142 public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
143 }
144
145 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
146 }
147 }