1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbc;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.DefaultWireFormat;
23 import org.codehaus.activemq.message.WireFormat;
24 import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
25 import org.codehaus.activemq.store.MessageStore;
26 import org.codehaus.activemq.store.PersistenceAdapter;
27 import org.codehaus.activemq.store.PreparedTransactionStore;
28 import org.codehaus.activemq.store.TopicMessageStore;
29 import org.codehaus.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
30 import org.codehaus.activemq.util.FactoryFinder;
31 import org.codehaus.activemq.util.JMSExceptionHelper;
32
33 import javax.jms.JMSException;
34 import javax.sql.DataSource;
35 import java.sql.Connection;
36 import java.sql.SQLException;
37 import java.util.Map;
38
39 /***
40 * A {@link PersistenceAdapter} implementation using JDBC for
41 * persistence storage.
42 *
43 * @version $Revision: 1.8 $
44 */
45 public class JDBCPersistenceAdapter extends PersistenceAdapterSupport {
46
47 private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
48 private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/codehaus/activemq/store/jdbc/");
49
50 private WireFormat wireFormat = new DefaultWireFormat();
51 private DataSource dataSource;
52 private JDBCAdapter adapter;
53
54
55 public JDBCPersistenceAdapter() {
56 }
57
58 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
59 this.dataSource = ds;
60 this.wireFormat = wireFormat;
61 }
62
63 public Map getInitialDestinations() {
64 return null; /*** TODO */
65 }
66
67 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
68 if (adapter == null) {
69 throw new IllegalStateException("Not started");
70 }
71 return new JDBCMessageStore(this, adapter, wireFormat, destinationName);
72 }
73
74 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
75 if (adapter == null) {
76 throw new IllegalStateException("Not started");
77 }
78 return new JDBCTopicMessageStore(this, adapter, wireFormat, destinationName);
79 }
80
81 public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
82 if (adapter == null) {
83 throw new IllegalStateException("Not started");
84 }
85 return new JDBCPreparedTransactionStore(this, adapter, wireFormat);
86 }
87
88 public void beginTransaction() throws JMSException {
89 try {
90 Connection c = dataSource.getConnection();
91 c.setAutoCommit(false);
92 TransactionContext.pushConnection(c);
93 }
94 catch (SQLException e) {
95 throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
96 }
97 }
98
99 public void commitTransaction() throws JMSException {
100 Connection c = TransactionContext.popConnection();
101 if (c == null) {
102 log.warn("Commit while no transaction in progress");
103 }
104 else {
105 try {
106 c.commit();
107 }
108 catch (SQLException e) {
109 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
110 }
111 finally {
112 try {
113 c.close();
114 }
115 catch (Throwable e) {
116 }
117 }
118 }
119 }
120
121 public void rollbackTransaction() {
122 Connection c = TransactionContext.popConnection();
123 try {
124 c.rollback();
125 }
126 catch (SQLException e) {
127 log.warn("Cannot rollback transaction due to: " + e, e);
128 }
129 finally {
130 try {
131 c.close();
132 }
133 catch (Throwable e) {
134 }
135 }
136 }
137
138
139 public void start() throws JMSException {
140 beginTransaction();
141 try {
142 Connection c=null;
143 try {
144 c = getConnection();
145
146
147
148 adapter = null;
149 String database = null;
150
151 database = c.getMetaData().getDriverName();
152 database = database.replaceAll(" ", "_");
153
154 log.debug("Database type: [" + database + "]");
155 try {
156 adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(database);
157 }
158 catch (Throwable e) {
159 log.warn("Unrecognized database type (" + database + "). Will use default JDBC implementation.");
160 log.debug("Reason: "+e,e);
161 }
162 }
163 catch (SQLException e1) {
164 returnConnection(c);
165 }
166
167
168
169 if (adapter == null) {
170 adapter = new DefaultJDBCAdapter();
171 }
172
173 try {
174 adapter.doCreateTables(c);
175 }
176 catch (SQLException e) {
177 log.warn("Cannot create tables due to: " + e, e);
178 }
179 adapter.initSequenceGenerator(c);
180
181 }
182 finally {
183 commitTransaction();
184 }
185 }
186
187
188 public synchronized void stop() throws JMSException {
189 }
190
191 public DataSource getDataSource() {
192 return dataSource;
193 }
194 public void setDataSource(DataSource dataSource) {
195 this.dataSource = dataSource;
196 }
197 public WireFormat getWireFormat() {
198 return wireFormat;
199 }
200 public void setWireFormat(WireFormat wireFormat) {
201 this.wireFormat = wireFormat;
202 }
203
204 public Connection getConnection() throws SQLException {
205 Connection answer = TransactionContext.peekConnection();
206 if(answer==null) {
207 answer = dataSource.getConnection();
208 answer.setAutoCommit(true);
209 }
210 return answer;
211 }
212
213 public void returnConnection(Connection connection) {
214 if( connection==null )
215 return;
216 Connection peek = TransactionContext.peekConnection();
217 if(peek!=connection) {
218 try { connection.close(); } catch (SQLException e) {}
219 }
220 }
221 }