View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbc;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.DefaultWireFormat;
23  import org.codehaus.activemq.message.WireFormat;
24  import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
25  import org.codehaus.activemq.store.MessageStore;
26  import org.codehaus.activemq.store.PersistenceAdapter;
27  import org.codehaus.activemq.store.PreparedTransactionStore;
28  import org.codehaus.activemq.store.TopicMessageStore;
29  import org.codehaus.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
30  import org.codehaus.activemq.util.FactoryFinder;
31  import org.codehaus.activemq.util.JMSExceptionHelper;
32  
33  import javax.jms.JMSException;
34  import javax.sql.DataSource;
35  import java.sql.Connection;
36  import java.sql.SQLException;
37  import java.util.Map;
38  
39  /***
40   * A {@link PersistenceAdapter} implementation using JDBC for
41   * persistence storage.
42   *
43   * @version $Revision: 1.8 $
44   */
45  public class JDBCPersistenceAdapter extends PersistenceAdapterSupport {
46  
47      private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
48      private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/codehaus/activemq/store/jdbc/");
49  
50      private WireFormat wireFormat = new DefaultWireFormat();
51      private DataSource dataSource;
52      private JDBCAdapter adapter;
53  
54  
55      public JDBCPersistenceAdapter() {
56      }
57      
58      public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
59          this.dataSource = ds;
60          this.wireFormat = wireFormat;
61      }
62  
63      public Map getInitialDestinations() {
64          return null;  /*** TODO */
65      }
66  
67      public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
68          if (adapter == null) {
69              throw new IllegalStateException("Not started");
70          }
71          return new JDBCMessageStore(this, adapter, wireFormat, destinationName);
72      }
73  
74      public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
75          if (adapter == null) {
76              throw new IllegalStateException("Not started");
77          }
78          return new JDBCTopicMessageStore(this, adapter, wireFormat, destinationName);
79      }
80  
81      public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
82          if (adapter == null) {
83              throw new IllegalStateException("Not started");
84          }
85          return new JDBCPreparedTransactionStore(this, adapter, wireFormat);
86      }
87  
88      public void beginTransaction() throws JMSException {
89          try {
90              Connection c = dataSource.getConnection();           
91              c.setAutoCommit(false);
92              TransactionContext.pushConnection(c);
93          }
94          catch (SQLException e) {
95              throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
96          }
97      }
98  
99      public void commitTransaction() throws JMSException {
100         Connection c = TransactionContext.popConnection();
101         if (c == null) {
102             log.warn("Commit while no transaction in progress");
103         }
104         else {
105             try {
106                 c.commit();
107             }
108             catch (SQLException e) {
109                 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
110             }
111             finally {
112                 try {
113                     c.close();
114                 }
115                 catch (Throwable e) {
116                 }
117             }
118         }
119     }
120 
121     public void rollbackTransaction() {
122         Connection c = TransactionContext.popConnection();
123         try {
124             c.rollback();
125         }
126         catch (SQLException e) {
127             log.warn("Cannot rollback transaction due to: " + e, e);
128         }
129         finally {
130             try {
131                 c.close();
132             }
133             catch (Throwable e) {
134             }
135         }
136     }
137 
138 
139     public void start() throws JMSException {
140         beginTransaction();
141         try {
142         	Connection c=null;
143             try {
144 	            c = getConnection();
145 	            
146 	            // Choose the right adapter depending on the
147 	            // databse connection.
148 	            adapter = null;
149 	            String database = null;
150 	            
151                 database = c.getMetaData().getDriverName();
152                 database = database.replaceAll(" ", "_");
153 
154                 log.debug("Database type: [" + database + "]");
155                 try {
156                     adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(database);
157                 }
158                 catch (Throwable e) {
159                     log.warn("Unrecognized database type (" + database + ").  Will use default JDBC implementation.");
160                     log.debug("Reason: "+e,e);
161                 }
162             }
163             catch (SQLException e1) {
164             	returnConnection(c);
165             }
166             
167             // Use the default JDBC adapter if the 
168             // Database type is not recognized.
169             if (adapter == null) {
170                 adapter = new DefaultJDBCAdapter();
171             }
172 
173             try {
174                 adapter.doCreateTables(c);
175             }
176             catch (SQLException e) {
177                 log.warn("Cannot create tables due to: " + e, e);
178             }
179             adapter.initSequenceGenerator(c);
180 
181         }
182         finally {
183             commitTransaction();
184         }
185     }
186 
187 
188 	public synchronized void stop() throws JMSException {
189     }
190 
191 	public DataSource getDataSource() {
192 		return dataSource;
193 	}
194 	public void setDataSource(DataSource dataSource) {
195 		this.dataSource = dataSource;
196 	}
197 	public WireFormat getWireFormat() {
198 		return wireFormat;
199 	}
200 	public void setWireFormat(WireFormat wireFormat) {
201 		this.wireFormat = wireFormat;
202 	}
203 		
204 	public Connection getConnection() throws SQLException {
205 		Connection answer = TransactionContext.peekConnection();
206 		if(answer==null) {
207 			answer = dataSource.getConnection();
208 			answer.setAutoCommit(true);
209 		}
210 		return answer;
211 	}
212 
213 	public void returnConnection(Connection connection) {
214 		if( connection==null )
215 			return;
216 		Connection peek = TransactionContext.peekConnection();
217 		if(peek!=connection) {
218 			try { connection.close(); } catch (SQLException e) {}
219 		}
220 	}
221 }