1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbc.adapter;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQXid;
23 import org.codehaus.activemq.service.SubscriberEntry;
24 import org.codehaus.activemq.service.Transaction;
25 import org.codehaus.activemq.service.TransactionManager;
26 import org.codehaus.activemq.service.impl.XATransactionCommand;
27 import org.codehaus.activemq.store.jdbc.JDBCAdapter;
28 import org.codehaus.activemq.store.jdbc.StatementProvider;
29 import org.codehaus.activemq.util.LongSequenceGenerator;
30
31 import javax.jms.JMSException;
32 import javax.transaction.xa.XAException;
33 import java.sql.Connection;
34 import java.sql.PreparedStatement;
35 import java.sql.ResultSet;
36 import java.sql.SQLException;
37 import java.sql.Statement;
38 import java.util.List;
39
40 /***
41 * Implements all the default JDBC operations that are used
42 * by the JDBCPersistenceAdapter.
43 * <p/>
44 * Subclassing is encouraged to override the default
45 * implementation of methods to account for differences
46 * in JDBC Driver implementations.
47 * <p/>
48 * The JDBCAdapter inserts and extracts BLOB data using the
49 * getBytes()/setBytes() operations.
50 * <p/>
51 * The databases/JDBC drivers that use this adapter are:
52 * <ul>
53 * <li></li>
54 * </ul>
55 *
56 * @version $Revision: 1.7 $
57 */
58 public class DefaultJDBCAdapter implements JDBCAdapter {
59
60 private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
61
62 final protected CachingStatementProvider statementProvider;
63 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
64
65 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
66 s.setBytes(index, data);
67 }
68
69 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
70 return rs.getBytes(index);
71 }
72
73 /***
74 * @param provider
75 */
76 public DefaultJDBCAdapter(StatementProvider provider) {
77 this.statementProvider = new CachingStatementProvider(provider);
78 }
79
80 public DefaultJDBCAdapter() {
81 this(new DefaultStatementProvider());
82 }
83
84 public LongSequenceGenerator getSequenceGenerator() {
85 return sequenceGenerator;
86 }
87
88 public void doCreateTables(Connection c) throws SQLException {
89 Statement s = null;
90 try {
91 s = c.createStatement();
92 String[] createStatments = statementProvider.getCreateSchemaStatments();
93 for (int i = 0; i < createStatments.length; i++) {
94
95
96 try {
97 boolean rc = s.execute(createStatments[i]);
98 }
99 catch (SQLException e) {
100 log.debug("Statment failed: " + createStatments[i], e);
101 }
102 }
103 c.commit();
104 }
105 finally {
106 try {
107 s.close();
108 }
109 catch (Throwable e) {
110 }
111 }
112 }
113
114 public void initSequenceGenerator(Connection c) {
115 PreparedStatement s = null;
116 ResultSet rs = null;
117 try {
118 s = c.prepareStatement(statementProvider.getFindLastSequenceId());
119 rs = s.executeQuery();
120 if (rs.next()) {
121 sequenceGenerator.setLastSequenceId(rs.getLong(1));
122 }
123 }
124 catch (SQLException e) {
125 log.warn("Failed to find last sequence number: " + e, e);
126 }
127 finally {
128 try {
129 rs.close();
130 }
131 catch (Throwable e) {
132 }
133 try {
134 s.close();
135 }
136 catch (Throwable e) {
137 }
138 }
139 }
140
141 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, JMSException {
142 PreparedStatement s = null;
143 try {
144 s = c.prepareStatement(statementProvider.getAddMessageStatment());
145 s.setLong(1, seq);
146 s.setString(2, destinationName);
147 s.setString(3, messageID);
148 setBinaryData(s, 4, data);
149 if (s.executeUpdate() != 1) {
150 throw new JMSException("Failed to broker message: " + messageID + " in container. ");
151 }
152 }
153 finally {
154 try {
155 s.close();
156 }
157 catch (Throwable e) {
158 }
159 }
160 }
161
162 public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException {
163 PreparedStatement s = null;
164 ResultSet rs = null;
165 try {
166
167 s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment());
168 s.setString(1, messageID);
169 rs = s.executeQuery();
170
171 if (!rs.next()) {
172 return null;
173 }
174 return new Long( rs.getLong(1) );
175
176 }
177 finally {
178 try {
179 rs.close();
180 }
181 catch (Throwable e) {
182 }
183 try {
184 s.close();
185 }
186 catch (Throwable e) {
187 }
188 }
189 }
190
191 public byte[] doGetMessage(Connection c, long seq) throws SQLException {
192 PreparedStatement s = null;
193 ResultSet rs = null;
194 try {
195
196 s = c.prepareStatement(statementProvider.getFindMessageStatment());
197 s.setLong(1, seq);
198 rs = s.executeQuery();
199
200 if (!rs.next()) {
201 return null;
202 }
203 return getBinaryData(rs, 1);
204
205 }
206 finally {
207 try {
208 rs.close();
209 }
210 catch (Throwable e) {
211 }
212 try {
213 s.close();
214 }
215 catch (Throwable e) {
216 }
217 }
218 }
219
220 public void doRemoveMessage(Connection c, long seq) throws SQLException {
221 PreparedStatement s = null;
222 try {
223 s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
224 s.setLong(1, seq);
225 if (s.executeUpdate() != 1) {
226 log.error("Could not delete sequenece number for: " + seq);
227 }
228 }
229 finally {
230 try {
231 s.close();
232 }
233 catch (Throwable e) {
234 }
235 }
236 }
237
238 public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException {
239 PreparedStatement s = null;
240 ResultSet rs = null;
241 try {
242
243 s = c.prepareStatement(statementProvider.getFindAllMessagesStatment());
244 s.setString(1, destinationName);
245 rs = s.executeQuery();
246
247 while (rs.next()) {
248 long seq = rs.getLong(1);
249 String msgid = rs.getString(2);
250 listener.onMessage(seq, msgid);
251 }
252
253 }
254 finally {
255 try {
256 rs.close();
257 }
258 catch (Throwable e) {
259 }
260 try {
261 s.close();
262 }
263 catch (Throwable e) {
264 }
265 }
266 }
267
268 public void doGetXids(Connection c, List list) throws SQLException {
269 PreparedStatement s = null;
270 ResultSet rs = null;
271 try {
272 s = c.prepareStatement(statementProvider.getFindAllXidStatment());
273 rs = s.executeQuery();
274
275 while (rs.next()) {
276 String xid = rs.getString(1);
277 try {
278 list.add(new ActiveMQXid(xid));
279 }
280 catch (JMSException e) {
281 log.error("Failed to recover prepared transaction due to invalid xid: " + xid, e);
282 }
283 }
284
285 }
286 finally {
287 try {
288 rs.close();
289 }
290 catch (Throwable e) {
291 }
292 try {
293 s.close();
294 }
295 catch (Throwable e) {
296 }
297 }
298 }
299
300 public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
301 PreparedStatement s = null;
302 try {
303 s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
304 s.setString(1, xid.toLocalTransactionId());
305 if (s.executeUpdate() != 1) {
306 throw new XAException("Failed to remove prepared transaction: " + xid + ".");
307 }
308 }
309 finally {
310 try {
311 s.close();
312 }
313 catch (Throwable e) {
314 }
315 }
316 }
317
318
319 public void doAddXid(Connection c, ActiveMQXid xid, byte[] data) throws SQLException, XAException {
320 PreparedStatement s = null;
321 try {
322
323 s = c.prepareStatement(statementProvider.getAddMessageStatment());
324 s.setString(1, xid.toLocalTransactionId());
325 setBinaryData(s, 2, data);
326 if (s.executeUpdate() != 1) {
327 throw new XAException("Failed to store prepared transaction: " + xid);
328 }
329
330 }
331 finally {
332 try {
333 s.close();
334 }
335 catch (Throwable e) {
336 }
337 }
338 }
339
340 public void doLoadPreparedTransactions(Connection c, TransactionManager transactionManager) throws SQLException {
341 PreparedStatement s = null;
342 ResultSet rs = null;
343 try {
344
345 s = c.prepareStatement(statementProvider.getFindAllTxStatment());
346 rs = s.executeQuery();
347
348 while (rs.next()) {
349 String id = rs.getString(1);
350 byte data[] = this.getBinaryData(rs, 2);
351 try {
352 ActiveMQXid xid = new ActiveMQXid(id);
353 Transaction transaction = XATransactionCommand.fromBytes(data);
354 transactionManager.loadTransaction(xid, transaction);
355 }
356 catch (Exception e) {
357 log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
358 }
359 }
360 }
361 finally {
362 try {
363 rs.close();
364 }
365 catch (Throwable e) {
366 }
367 try {
368 s.close();
369 }
370 catch (Throwable e) {
371 }
372 }
373 }
374
375 /***
376 * @throws JMSException
377 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long)
378 */
379 public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException {
380 PreparedStatement s = null;
381 try {
382 s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
383 s.setLong(1, seq);
384 s.setString(2, subscriptionID);
385 s.setString(3, destinationName);
386
387 if (s.executeUpdate() != 1) {
388 throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
389 }
390 }
391 finally {
392 try {
393 s.close();
394 }
395 catch (Throwable e) {
396 }
397 }
398 }
399
400 /***
401 * @throws JMSException
402 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler)
403 */
404 public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException {
405 PreparedStatement s = null;
406 ResultSet rs = null;
407 try {
408
409 s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
410 s.setString(1, destinationName);
411 s.setString(2, subscriptionID);
412 rs = s.executeQuery();
413
414 while (rs.next()) {
415 long seq = rs.getLong(1);
416 String msgid = rs.getString(2);
417 listener.onMessage(seq, msgid);
418 }
419
420 }
421 finally {
422 try {
423 rs.close();
424 }
425 catch (Throwable e) {
426 }
427 try {
428 s.close();
429 }
430 catch (Throwable e) {
431 }
432 }
433 }
434
435 /***
436 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.codehaus.activemq.service.SubscriberEntry)
437 */
438 public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException {
439 PreparedStatement s = null;
440 try {
441 s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment());
442 s.setInt(1, subscriberEntry.getSubscriberID());
443 s.setString(2, subscriberEntry.getClientID());
444 s.setString(3, subscriberEntry.getConsumerName());
445 s.setString(4, subscriberEntry.getSelector());
446 s.setString(5, sub);
447 s.setString(6, destinationName);
448
449
450 if (s.executeUpdate() != 1) {
451 s.close();
452 s = c.prepareStatement(statementProvider.getCreateDurableSubStatment());
453 s.setInt(1, subscriberEntry.getSubscriberID());
454 s.setString(2, subscriberEntry.getClientID());
455 s.setString(3, subscriberEntry.getConsumerName());
456 s.setString(4, subscriberEntry.getSelector());
457 s.setString(5, sub);
458 s.setString(6, destinationName);
459 s.setLong(7, -1);
460
461 if (s.executeUpdate() != 1) {
462 log.error("Failed to store durable subscription for: " + sub);
463 }
464 }
465 }
466 finally {
467 try {
468 s.close();
469 }
470 catch (Throwable e) {
471 }
472 }
473 }
474
475 /***
476 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object)
477 */
478 public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException {
479 PreparedStatement s = null;
480 ResultSet rs = null;
481 try {
482
483 s = c.prepareStatement(statementProvider.getFindDurableSubStatment());
484 s.setString(1, sub);
485 s.setString(2, destinationName);
486 rs = s.executeQuery();
487
488 if (!rs.next()) {
489 return null;
490 }
491
492 SubscriberEntry answer = new SubscriberEntry();
493 answer.setSubscriberID(rs.getInt(1));
494 answer.setClientID(rs.getString(2));
495 answer.setConsumerName(rs.getString(3));
496 answer.setDestination(rs.getString(4));
497
498 return answer;
499
500 }
501 finally {
502 try {
503 rs.close();
504 }
505 catch (Throwable e) {
506 }
507 try {
508 s.close();
509 }
510 catch (Throwable e) {
511 }
512 }
513 }
514
515 }