View Javadoc

1   /***
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbc.adapter;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQXid;
23  import org.codehaus.activemq.service.SubscriberEntry;
24  import org.codehaus.activemq.service.Transaction;
25  import org.codehaus.activemq.service.TransactionManager;
26  import org.codehaus.activemq.service.impl.XATransactionCommand;
27  import org.codehaus.activemq.store.jdbc.JDBCAdapter;
28  import org.codehaus.activemq.store.jdbc.StatementProvider;
29  import org.codehaus.activemq.util.LongSequenceGenerator;
30  
31  import javax.jms.JMSException;
32  import javax.transaction.xa.XAException;
33  import java.sql.Connection;
34  import java.sql.PreparedStatement;
35  import java.sql.ResultSet;
36  import java.sql.SQLException;
37  import java.sql.Statement;
38  import java.util.List;
39  
40  /***
41   * Implements all the default JDBC operations that are used
42   * by the JDBCPersistenceAdapter.
43   * <p/>
44   * Subclassing is encouraged to override the default
45   * implementation of methods to account for differences
46   * in JDBC Driver implementations.
47   * <p/>
48   * The JDBCAdapter inserts and extracts BLOB data using the
49   * getBytes()/setBytes() operations.
50   * <p/>
51   * The databases/JDBC drivers that use this adapter are:
52   * <ul>
53   * <li></li>
54   * </ul>
55   *
56   * @version $Revision: 1.7 $
57   */
58  public class DefaultJDBCAdapter implements JDBCAdapter {
59  
60      private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
61  
62      final protected CachingStatementProvider statementProvider;
63      protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
64  
65      protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
66          s.setBytes(index, data);
67      }
68  
69      protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
70          return rs.getBytes(index);
71      }
72  
73      /***
74       * @param provider
75       */
76      public DefaultJDBCAdapter(StatementProvider provider) {
77          this.statementProvider = new CachingStatementProvider(provider);
78      }
79  
80      public DefaultJDBCAdapter() {
81          this(new DefaultStatementProvider());
82      }
83  
84      public LongSequenceGenerator getSequenceGenerator() {
85          return sequenceGenerator;
86      }
87  
88      public void doCreateTables(Connection c) throws SQLException {
89          Statement s = null;
90          try {
91              s = c.createStatement();
92              String[] createStatments = statementProvider.getCreateSchemaStatments();
93              for (int i = 0; i < createStatments.length; i++) {
94                  // This will fail usually since the tables will be
95                  // created allready.
96                  try {
97                      boolean rc = s.execute(createStatments[i]);
98                  }
99                  catch (SQLException e) {
100                     log.debug("Statment failed: " + createStatments[i], e);
101                 }
102             }
103             c.commit();
104         }
105         finally {
106             try {
107                 s.close();
108             }
109             catch (Throwable e) {
110             }
111         }
112     }
113 
114     public void initSequenceGenerator(Connection c) {
115         PreparedStatement s = null;
116         ResultSet rs = null;
117         try {
118             s = c.prepareStatement(statementProvider.getFindLastSequenceId());
119             rs = s.executeQuery();
120             if (rs.next()) {
121                 sequenceGenerator.setLastSequenceId(rs.getLong(1));
122             }
123         }
124         catch (SQLException e) {
125             log.warn("Failed to find last sequence number: " + e, e);
126         }
127         finally {
128             try {
129                 rs.close();
130             }
131             catch (Throwable e) {
132             }
133             try {
134                 s.close();
135             }
136             catch (Throwable e) {
137             }
138         }
139     }
140 
141     public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, JMSException {
142         PreparedStatement s = null;
143         try {
144             s = c.prepareStatement(statementProvider.getAddMessageStatment());
145             s.setLong(1, seq);
146             s.setString(2, destinationName);
147             s.setString(3, messageID);
148             setBinaryData(s, 4, data);
149             if (s.executeUpdate() != 1) {
150                 throw new JMSException("Failed to broker message: " + messageID + " in container.  ");
151             }
152         }
153         finally {
154             try {
155                 s.close();
156             }
157             catch (Throwable e) {
158             }
159         }
160     }
161 
162 	public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException {
163         PreparedStatement s = null;
164         ResultSet rs = null;
165         try {
166 
167             s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment());
168             s.setString(1, messageID);
169             rs = s.executeQuery();
170 
171             if (!rs.next()) {
172                 return null;
173             }
174             return new Long( rs.getLong(1) );
175 
176         }
177         finally {
178             try {
179                 rs.close();
180             }
181             catch (Throwable e) {
182             }
183             try {
184                 s.close();
185             }
186             catch (Throwable e) {
187             }
188         }
189 	}
190 
191     public byte[] doGetMessage(Connection c, long seq) throws SQLException {
192         PreparedStatement s = null;
193         ResultSet rs = null;
194         try {
195 
196             s = c.prepareStatement(statementProvider.getFindMessageStatment());
197             s.setLong(1, seq);
198             rs = s.executeQuery();
199 
200             if (!rs.next()) {
201                 return null;
202             }
203             return getBinaryData(rs, 1);
204 
205         }
206         finally {
207             try {
208                 rs.close();
209             }
210             catch (Throwable e) {
211             }
212             try {
213                 s.close();
214             }
215             catch (Throwable e) {
216             }
217         }
218     }
219 
220     public void doRemoveMessage(Connection c, long seq) throws SQLException {
221         PreparedStatement s = null;
222         try {
223             s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
224             s.setLong(1, seq);
225             if (s.executeUpdate() != 1) {
226                 log.error("Could not delete sequenece number for: " + seq);
227             }
228         }
229         finally {
230             try {
231                 s.close();
232             }
233             catch (Throwable e) {
234             }
235         }
236     }
237 
238     public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException {
239         PreparedStatement s = null;
240         ResultSet rs = null;
241         try {
242 
243             s = c.prepareStatement(statementProvider.getFindAllMessagesStatment());
244             s.setString(1, destinationName);
245             rs = s.executeQuery();
246 
247             while (rs.next()) {
248                 long seq = rs.getLong(1);
249                 String msgid = rs.getString(2);
250                 listener.onMessage(seq, msgid);
251             }
252 
253         }
254         finally {
255             try {
256                 rs.close();
257             }
258             catch (Throwable e) {
259             }
260             try {
261                 s.close();
262             }
263             catch (Throwable e) {
264             }
265         }
266     }
267 
268     public void doGetXids(Connection c, List list) throws SQLException {
269         PreparedStatement s = null;
270         ResultSet rs = null;
271         try {
272             s = c.prepareStatement(statementProvider.getFindAllXidStatment());
273             rs = s.executeQuery();
274 
275             while (rs.next()) {
276                 String xid = rs.getString(1);
277                 try {
278                     list.add(new ActiveMQXid(xid));
279                 }
280                 catch (JMSException e) {
281                     log.error("Failed to recover prepared transaction due to invalid xid: " + xid, e);
282                 }
283             }
284 
285         }
286         finally {
287             try {
288                 rs.close();
289             }
290             catch (Throwable e) {
291             }
292             try {
293                 s.close();
294             }
295             catch (Throwable e) {
296             }
297         }
298     }
299 
300     public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
301         PreparedStatement s = null;
302         try {
303             s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
304             s.setString(1, xid.toLocalTransactionId());
305             if (s.executeUpdate() != 1) {
306                 throw new XAException("Failed to remove prepared transaction: " + xid + ".");
307             }
308         }
309         finally {
310             try {
311                 s.close();
312             }
313             catch (Throwable e) {
314             }
315         }
316     }
317 
318 
319     public void doAddXid(Connection c, ActiveMQXid xid, byte[] data) throws SQLException, XAException {
320         PreparedStatement s = null;
321         try {
322 
323             s = c.prepareStatement(statementProvider.getAddMessageStatment());
324             s.setString(1, xid.toLocalTransactionId());
325             setBinaryData(s, 2, data);
326             if (s.executeUpdate() != 1) {
327                 throw new XAException("Failed to store prepared transaction: " + xid);
328             }
329 
330         }
331         finally {
332             try {
333                 s.close();
334             }
335             catch (Throwable e) {
336             }
337         }
338     }
339 
340     public void doLoadPreparedTransactions(Connection c, TransactionManager transactionManager) throws SQLException {
341         PreparedStatement s = null;
342         ResultSet rs = null;
343         try {
344 
345             s = c.prepareStatement(statementProvider.getFindAllTxStatment());
346             rs = s.executeQuery();
347 
348             while (rs.next()) {
349                 String id = rs.getString(1);
350                 byte data[] = this.getBinaryData(rs, 2);
351                 try {
352                     ActiveMQXid xid = new ActiveMQXid(id);
353                     Transaction transaction = XATransactionCommand.fromBytes(data);
354                     transactionManager.loadTransaction(xid, transaction);
355                 }
356                 catch (Exception e) {
357                     log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
358                 }
359             }
360         }
361         finally {
362             try {
363                 rs.close();
364             }
365             catch (Throwable e) {
366             }
367             try {
368                 s.close();
369             }
370             catch (Throwable e) {
371             }
372         }
373     }
374 
375     /***
376      * @throws JMSException
377      * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long)
378      */
379     public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException {
380         PreparedStatement s = null;
381         try {
382             s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
383             s.setLong(1, seq);
384             s.setString(2, subscriptionID);
385             s.setString(3, destinationName);
386 
387             if (s.executeUpdate() != 1) {
388                 throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
389             }
390         }
391         finally {
392             try {
393                 s.close();
394             }
395             catch (Throwable e) {
396             }
397         }
398     }
399 
400     /***
401      * @throws JMSException
402      * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler)
403      */
404     public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException {
405         PreparedStatement s = null;
406         ResultSet rs = null;
407         try {
408 
409             s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
410             s.setString(1, destinationName);
411             s.setString(2, subscriptionID);
412             rs = s.executeQuery();
413 
414             while (rs.next()) {
415                 long seq = rs.getLong(1);
416                 String msgid = rs.getString(2);
417                 listener.onMessage(seq, msgid);
418             }
419 
420         }
421         finally {
422             try {
423                 rs.close();
424             }
425             catch (Throwable e) {
426             }
427             try {
428                 s.close();
429             }
430             catch (Throwable e) {
431             }
432         }
433     }
434 
435     /***
436      * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.codehaus.activemq.service.SubscriberEntry)
437      */
438     public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException {
439         PreparedStatement s = null;
440         try {
441             s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment());
442             s.setInt(1, subscriberEntry.getSubscriberID());
443             s.setString(2, subscriberEntry.getClientID());
444             s.setString(3, subscriberEntry.getConsumerName());
445             s.setString(4, subscriberEntry.getSelector());
446             s.setString(5, sub);
447             s.setString(6, destinationName);
448             
449             // If the sub was not there then we need to create it.
450             if (s.executeUpdate() != 1) {
451                 s.close();
452                 s = c.prepareStatement(statementProvider.getCreateDurableSubStatment());
453                 s.setInt(1, subscriberEntry.getSubscriberID());
454                 s.setString(2, subscriberEntry.getClientID());
455                 s.setString(3, subscriberEntry.getConsumerName());
456                 s.setString(4, subscriberEntry.getSelector());
457                 s.setString(5, sub);
458                 s.setString(6, destinationName);
459                 s.setLong(7, -1);
460 
461                 if (s.executeUpdate() != 1) {
462                     log.error("Failed to store durable subscription for: " + sub);
463                 }
464             }
465         }
466         finally {
467             try {
468                 s.close();
469             }
470             catch (Throwable e) {
471             }
472         }
473     }
474 
475     /***
476      * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object)
477      */
478     public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException {
479         PreparedStatement s = null;
480         ResultSet rs = null;
481         try {
482 
483             s = c.prepareStatement(statementProvider.getFindDurableSubStatment());
484             s.setString(1, sub);
485             s.setString(2, destinationName);
486             rs = s.executeQuery();
487 
488             if (!rs.next()) {
489                 return null;
490             }
491 
492             SubscriberEntry answer = new SubscriberEntry();
493             answer.setSubscriberID(rs.getInt(1));
494             answer.setClientID(rs.getString(2));
495             answer.setConsumerName(rs.getString(3));
496             answer.setDestination(rs.getString(4));
497 
498             return answer;
499 
500         }
501         finally {
502             try {
503                 rs.close();
504             }
505             catch (Throwable e) {
506             }
507             try {
508                 s.close();
509             }
510             catch (Throwable e) {
511             }
512         }
513     }
514 
515 }