1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbc.adapter;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.sql.Blob;
25 import java.sql.Connection;
26 import java.sql.PreparedStatement;
27 import java.sql.ResultSet;
28 import java.sql.SQLException;
29
30 import javax.jms.JMSException;
31
32 import org.codehaus.activemq.store.jdbc.StatementProvider;
33
34
35 /***
36 * This JDBCAdapter inserts and extracts BLOB data using the
37 * getBlob()/setBlob() operations. This is a little more involved
38 * since to insert a blob you have to:
39 *
40 * 1: insert empty blob.
41 * 2: select the blob
42 * 3: finally update the blob with data value.
43 *
44 * The databases/JDBC drivers that use this adapter are:
45 * <ul>
46 * <li></li>
47 * </ul>
48 *
49 * @version $Revision: 1.1 $
50 */
51 public class BlobJDBCAdapter extends DefaultJDBCAdapter {
52
53 public BlobJDBCAdapter() {
54 super();
55 }
56
57 public BlobJDBCAdapter(StatementProvider provider) {
58 super(provider);
59 }
60
61 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException,
62 JMSException {
63 PreparedStatement s = null;
64 ResultSet rs = null;
65 try {
66
67
68 s = c.prepareStatement(statementProvider.getAddMessageStatment());
69 s.setLong(1, seq);
70 s.setString(2, destinationName);
71 s.setString(3, messageID);
72 s.setString(4, " ");
73
74 if (s.executeUpdate() != 1)
75 throw new JMSException("Failed to broker message: " + messageID
76 + " in container.");
77 s.close();
78
79
80 s = c.prepareStatement(statementProvider.getFindMessageStatment());
81 s.setLong(1, seq);
82 rs = s.executeQuery();
83 if (!rs.next())
84 throw new JMSException("Failed to broker message: " + messageID
85 + " in container.");
86
87
88 Blob blob = rs.getBlob(1);
89 OutputStream stream = blob.setBinaryStream(data.length);
90 stream.write(data);
91 stream.close();
92 s.close();
93
94
95 s = c.prepareStatement(statementProvider.getUpdateMessageStatment());
96 s.setBlob(1, blob);
97 s.setLong(2, seq);
98
99 } catch (IOException e) {
100 throw (SQLException) new SQLException("BLOB could not be updated: "
101 + e).initCause(e);
102 } finally {
103 try {
104 rs.close();
105 } catch (Throwable e) {
106 }
107 try {
108 s.close();
109 } catch (Throwable e) {
110 }
111 }
112 }
113
114 public byte[] doGetMessage(Connection c, long seq) throws SQLException {
115 PreparedStatement s=null; ResultSet rs=null;
116 try {
117
118 s = c.prepareStatement(statementProvider.getFindMessageStatment());
119 s.setLong(1, seq);
120 rs = s.executeQuery();
121
122 if( !rs.next() )
123 return null;
124 Blob blob = rs.getBlob(1);
125 InputStream is = blob.getBinaryStream();
126
127 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
128 int ch;
129 while( (ch=is.read())>= 0 ) {
130 os.write(ch);
131 }
132 is.close();
133 os.close();
134
135 return os.toByteArray();
136
137 } catch (IOException e) {
138 throw (SQLException) new SQLException("BLOB could not be updated: "
139 + e).initCause(e);
140 } finally {
141 try { rs.close(); } catch (Throwable e) {}
142 try { s.close(); } catch (Throwable e) {}
143 }
144 }
145
146 }