package org.apache.activemq.store.jdbc.adapter;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.ByteArrayOutputStream;

/* loaded from: input_file:activemq-core-5.6.1.fuse-71-SNAPSHOT.jar:org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.class */
public class BlobJDBCAdapter extends DefaultJDBCAdapter {
    @Override // org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter, org.apache.activemq.store.jdbc.JDBCAdapter
    public void setStatements(Statements statements) {
        statements.setAddMessageStatement("INSERT INTO " + statements.getFullMessageTableName() + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())");
        statements.setFindMessageByIdStatement("SELECT MSG FROM " + statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE");
        super.setStatements(statements);
    }

    @Override // org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter, org.apache.activemq.store.jdbc.JDBCAdapter
    public void doAddMessage(TransactionContext transactionContext, long j, MessageId messageId, ActiveMQDestination activeMQDestination, byte[] bArr, long j2, byte b, XATransactionId xATransactionId) throws SQLException, IOException {
        this.cleanupExclusiveLock.readLock().lock();
        try {
            PreparedStatement prepareStatement = transactionContext.getConnection().prepareStatement(this.statements.getAddMessageStatement());
            prepareStatement.setLong(1, j);
            prepareStatement.setString(2, messageId.getProducerId().toString());
            prepareStatement.setLong(3, messageId.getProducerSequenceId());
            prepareStatement.setString(4, activeMQDestination.getQualifiedName());
            prepareStatement.setLong(5, j2);
            prepareStatement.setLong(6, b);
            if (prepareStatement.executeUpdate() != 1) {
                throw new IOException("Failed to add broker message: " + messageId + " in container.");
            }
            prepareStatement.close();
            updateBlob(transactionContext.getConnection(), this.statements.getFindMessageByIdStatement(), j, bArr);
            if (xATransactionId != null) {
                byte[] encodedXidBytes = xATransactionId.getEncodedXidBytes();
                encodedXidBytes[0] = 43;
                updateBlob(transactionContext.getConnection(), this.statements.getFindXidByIdStatement(), j, encodedXidBytes);
            }
            this.cleanupExclusiveLock.readLock().unlock();
            close(prepareStatement);
        } catch (Throwable th) {
            this.cleanupExclusiveLock.readLock().unlock();
            close((PreparedStatement) null);
            throw th;
        }
    }

    private void updateBlob(Connection connection, String str, long j, byte[] bArr) throws SQLException, IOException {
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(this.statements.getFindMessageByIdStatement(), 1003, 1008);
            prepareStatement.setLong(1, j);
            ResultSet executeQuery = prepareStatement.executeQuery();
            if (!executeQuery.next()) {
                throw new IOException("Failed select blob for message: " + j + " in container.");
            }
            Blob blob = executeQuery.getBlob(1);
            blob.truncate(0L);
            blob.setBytes(1L, bArr);
            executeQuery.updateBlob(1, blob);
            executeQuery.updateRow();
            close(executeQuery);
            close(prepareStatement);
        } catch (Throwable th) {
            close((ResultSet) null);
            close((PreparedStatement) null);
            throw th;
        }
    }

    @Override // org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter, org.apache.activemq.store.jdbc.JDBCAdapter
    public byte[] doGetMessage(TransactionContext transactionContext, MessageId messageId) throws SQLException, IOException {
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        this.cleanupExclusiveLock.readLock().lock();
        try {
            preparedStatement = transactionContext.getConnection().prepareStatement(this.statements.getFindMessageStatement());
            preparedStatement.setString(1, messageId.getProducerId().toString());
            preparedStatement.setLong(2, messageId.getProducerSequenceId());
            resultSet = preparedStatement.executeQuery();
            if (!resultSet.next()) {
                this.cleanupExclusiveLock.readLock().unlock();
                close(resultSet);
                close(preparedStatement);
                return null;
            }
            Blob blob = resultSet.getBlob(1);
            InputStream binaryStream = blob.getBinaryStream();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream((int) blob.length());
            while (true) {
                int read = binaryStream.read();
                if (read < 0) {
                    binaryStream.close();
                    byteArrayOutputStream.close();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    this.cleanupExclusiveLock.readLock().unlock();
                    close(resultSet);
                    close(preparedStatement);
                    return byteArray;
                }
                byteArrayOutputStream.write(read);
            }
        } catch (Throwable th) {
            this.cleanupExclusiveLock.readLock().unlock();
            close(resultSet);
            close(preparedStatement);
            throw th;
        }
    }
}
