package org.apache.qpid.server.store;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;

/* loaded from: input_file:org/apache/qpid/server/store/DerbyMessageStore.class */
public class DerbyMessageStore implements MessageStore {
    private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
    public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
    private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
    private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
    private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
    private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
    private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
    private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
    private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
    private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
    private static final int DB_VERSION = 1;
    private VirtualHost _virtualHost;
    private static Class<Driver> DRIVER_CLASS;
    private String _connectionURL;
    private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE QPID_DB_VERSION ( version int not null )";
    private static final String INSERT_INTO_DB_VERSION = "INSERT INTO QPID_DB_VERSION ( version ) VALUES ( ? )";
    private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE QPID_EXCHANGE ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
    private static final String CREATE_QUEUE_TABLE = "CREATE TABLE QPID_QUEUE ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
    private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE QPID_BINDINGS ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE QPID_QUEUE_ENTRY ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
    private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE QPID_MESSAGE_META_DATA ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE QPID_MESSAGE_CONTENT ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
    private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM QPID_QUEUE";
    private static final String FIND_QUEUE = "SELECT name, owner FROM QPID_QUEUE WHERE name = ?";
    private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM QPID_EXCHANGE";
    private static final String SELECT_FROM_BINDINGS = "SELECT queue_name, binding_key, arguments FROM QPID_BINDINGS WHERE exchange_name = ?";
    private static final String FIND_BINDING = "SELECT * FROM QPID_BINDINGS WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
    private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM QPID_MESSAGE_META_DATA WHERE message_id = ?";
    private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM QPID_MESSAGE_CONTENT WHERE message_id = ?";
    private static final String INSERT_INTO_EXCHANGE = "INSERT INTO QPID_EXCHANGE ( name, type, autodelete ) VALUES ( ?, ?, ? )";
    private static final String DELETE_FROM_EXCHANGE = "DELETE FROM QPID_EXCHANGE WHERE name = ?";
    private static final String INSERT_INTO_BINDINGS = "INSERT INTO QPID_BINDINGS ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
    private static final String DELETE_FROM_BINDINGS = "DELETE FROM QPID_BINDINGS WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
    private static final String INSERT_INTO_QUEUE = "INSERT INTO QPID_QUEUE (name, owner) VALUES (?, ?)";
    private static final String DELETE_FROM_QUEUE = "DELETE FROM QPID_QUEUE WHERE name = ?";
    private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO QPID_QUEUE_ENTRY (queue_name, message_id) values (?,?)";
    private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM QPID_QUEUE_ENTRY WHERE queue_name = ? AND message_id =?";
    private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO QPID_MESSAGE_CONTENT( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
    private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO QPID_MESSAGE_META_DATA( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
    private static final String SELECT_FROM_MESSAGE_META_DATA = "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM QPID_MESSAGE_META_DATA WHERE message_id = ?";
    private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content_chunk FROM QPID_MESSAGE_CONTENT WHERE message_id = ? and chunk_id = ?";
    private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM QPID_QUEUE_ENTRY";
    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
    private final AtomicLong _messageId = new AtomicLong(1);
    private AtomicBoolean _closed = new AtomicBoolean(false);
    private State _state = State.INITIAL;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/store/DerbyMessageStore$ConnectionWrapper.class */
    public static final class ConnectionWrapper {
        private final Connection _connection;
        private boolean _requiresCommit;

        public ConnectionWrapper(Connection connection) {
            this._connection = connection;
        }

        public void setRequiresCommit() {
            this._requiresCommit = true;
        }

        public boolean requiresCommit() {
            return this._requiresCommit;
        }

        public Connection getConnection() {
            return this._connection;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/store/DerbyMessageStore$ProcessAction.class */
    public static final class ProcessAction {
        private final AMQQueue _queue;
        private final StoreContext _context;
        private final AMQMessage _message;

        public ProcessAction(AMQQueue aMQQueue, StoreContext storeContext, AMQMessage aMQMessage) {
            this._queue = aMQQueue;
            this._context = storeContext;
            this._message = aMQMessage;
        }

        public void process() throws AMQException {
            this._queue.enqueue(this._context, this._message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/store/DerbyMessageStore$State.class */
    public enum State {
        INITIAL,
        CONFIGURING,
        RECOVERING,
        STARTED,
        CLOSING,
        CLOSED
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void configure(VirtualHost virtualHost, String str, VirtualHostConfiguration virtualHostConfiguration) throws Exception {
        stateTransition(State.INITIAL, State.CONFIGURING);
        initialiseDriver();
        this._virtualHost = virtualHost;
        _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
        virtualHost.getQueueRegistry();
        String string = virtualHostConfiguration.getStoreConfiguration().getString(ENVIRONMENT_PATH_PROPERTY, "derbyDB");
        File file = new File(string);
        if (!file.exists() && !file.mkdirs()) {
            throw new IllegalArgumentException("Environment path " + file + " could not be read or created. Ensure the path is correct and that the permissions are correct.");
        }
        createOrOpenDatabase(string);
        recover();
        stateTransition(State.RECOVERING, State.STARTED);
    }

    private static synchronized void initialiseDriver() throws ClassNotFoundException {
        if (DRIVER_CLASS == null) {
            DRIVER_CLASS = Class.forName(SQL_DRIVER_NAME);
        }
    }

    private void createOrOpenDatabase(String str) throws SQLException {
        this._connectionURL = "jdbc:derby:" + str + "/" + this._virtualHost.getName() + ";create=true";
        Connection newConnection = newConnection();
        createVersionTable(newConnection);
        createExchangeTable(newConnection);
        createQueueTable(newConnection);
        createBindingsTable(newConnection);
        createQueueEntryTable(newConnection);
        createMessageMetaDataTable(newConnection);
        createMessageContentTable(newConnection);
        newConnection.close();
    }

    private void createVersionTable(Connection connection) throws SQLException {
        if (tableExists(DB_VERSION_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_DB_VERSION_TABLE);
        createStatement.close();
        PreparedStatement prepareStatement = connection.prepareStatement(INSERT_INTO_DB_VERSION);
        prepareStatement.setInt(1, 1);
        prepareStatement.execute();
        prepareStatement.close();
    }

    private void createExchangeTable(Connection connection) throws SQLException {
        if (tableExists(EXCHANGE_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_EXCHANGE_TABLE);
        createStatement.close();
    }

    private void createQueueTable(Connection connection) throws SQLException {
        if (tableExists(QUEUE_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_QUEUE_TABLE);
        createStatement.close();
    }

    private void createBindingsTable(Connection connection) throws SQLException {
        if (tableExists(BINDINGS_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_BINDINGS_TABLE);
        createStatement.close();
    }

    private void createQueueEntryTable(Connection connection) throws SQLException {
        if (tableExists(QUEUE_ENTRY_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_QUEUE_ENTRY_TABLE);
        createStatement.close();
    }

    private void createMessageMetaDataTable(Connection connection) throws SQLException {
        if (tableExists(MESSAGE_META_DATA_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_MESSAGE_META_DATA_TABLE);
        createStatement.close();
    }

    private void createMessageContentTable(Connection connection) throws SQLException {
        if (tableExists(MESSAGE_CONTENT_TABLE_NAME, connection)) {
            return;
        }
        Statement createStatement = connection.createStatement();
        createStatement.execute(CREATE_MESSAGE_CONTENT_TABLE);
        createStatement.close();
    }

    private boolean tableExists(String str, Connection connection) throws SQLException {
        PreparedStatement prepareStatement = connection.prepareStatement(TABLE_EXISTANCE_QUERY);
        prepareStatement.setString(1, str);
        ResultSet executeQuery = prepareStatement.executeQuery();
        boolean next = executeQuery.next();
        executeQuery.close();
        prepareStatement.close();
        return next;
    }

    public void recover() throws AMQException {
        stateTransition(State.CONFIGURING, State.RECOVERING);
        _logger.info("Recovering persistent state...");
        StoreContext storeContext = new StoreContext();
        try {
            Map<AMQShortString, AMQQueue> loadQueues = loadQueues();
            recoverExchanges();
            try {
                beginTran(storeContext);
                deliverMessages(storeContext, loadQueues);
                _logger.info("Persistent state recovered successfully");
                commitTran(storeContext);
                if (inTran(storeContext)) {
                    abortTran(storeContext);
                }
            } catch (Throwable th) {
                if (inTran(storeContext)) {
                    abortTran(storeContext);
                }
                throw th;
            }
        } catch (SQLException e) {
            throw new AMQException("Error recovering persistent state: " + e, e);
        }
    }

    private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException {
        ResultSet executeQuery = newConnection().createStatement().executeQuery(SELECT_FROM_QUEUE);
        HashMap hashMap = new HashMap();
        while (executeQuery.next()) {
            String string = executeQuery.getString(1);
            String string2 = executeQuery.getString(2);
            AMQShortString aMQShortString = new AMQShortString(string);
            AMQQueue queue = this._virtualHost.getQueueRegistry().getQueue(aMQShortString);
            if (queue == null) {
                queue = AMQQueueFactory.createAMQQueueImpl(aMQShortString, true, string2 == null ? null : new AMQShortString(string2), false, this._virtualHost, null);
                this._virtualHost.getQueueRegistry().registerQueue(queue);
            }
            hashMap.put(aMQShortString, queue);
        }
        return hashMap;
    }

    private void recoverExchanges() throws AMQException, SQLException {
        Iterator<Exchange> it = loadExchanges().iterator();
        while (it.hasNext()) {
            recoverExchange(it.next());
        }
    }

    private List<Exchange> loadExchanges() throws AMQException, SQLException {
        ArrayList arrayList = new ArrayList();
        Connection connection = null;
        try {
            connection = newConnection();
            ResultSet executeQuery = connection.createStatement().executeQuery(SELECT_FROM_EXCHANGE);
            while (executeQuery.next()) {
                String string = executeQuery.getString(1);
                String string2 = executeQuery.getString(2);
                boolean z = executeQuery.getShort(3) != 0;
                AMQShortString aMQShortString = new AMQShortString(string);
                Exchange exchange = this._virtualHost.getExchangeRegistry().getExchange(aMQShortString);
                if (exchange == null) {
                    exchange = this._virtualHost.getExchangeFactory().createExchange(aMQShortString, new AMQShortString(string2), true, z, 0);
                    this._virtualHost.getExchangeRegistry().registerExchange(exchange);
                }
                arrayList.add(exchange);
            }
            if (connection != null) {
                connection.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void recoverExchange(Exchange exchange) throws AMQException, SQLException {
        _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
        QueueRegistry queueRegistry = this._virtualHost.getQueueRegistry();
        Connection connection = null;
        try {
            connection = newConnection();
            PreparedStatement prepareStatement = connection.prepareStatement(SELECT_FROM_BINDINGS);
            prepareStatement.setString(1, exchange.getName().toString());
            ResultSet executeQuery = prepareStatement.executeQuery();
            while (executeQuery.next()) {
                String string = executeQuery.getString(1);
                String string2 = executeQuery.getString(2);
                Blob blob = executeQuery.getBlob(3);
                AMQQueue queue = queueRegistry.getQueue(new AMQShortString(string));
                if (queue == null) {
                    _logger.error("Unkown queue: " + string + " cannot be bound to exchange: " + exchange.getName());
                } else {
                    _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + string + ", Routing Key: " + string2 + ", Arguments: " + blob + ")");
                    FieldTable fieldTable = null;
                    if (blob != null) {
                        fieldTable = new FieldTable(ByteBuffer.wrap(blob.getBytes(0L, (int) blob.length())), blob.length());
                    }
                    queue.bind(exchange, string2 == null ? null : new AMQShortString(string2), fieldTable);
                }
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void close() throws Exception {
        this._closed.getAndSet(true);
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void removeMessage(StoreContext storeContext, Long l) throws AMQException {
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        Connection connection = getConnection(storeContext);
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        if (_logger.isDebugEnabled()) {
            _logger.debug("Message Id: " + l + " Removing");
        }
        MessageMetaData messageMetaData = getMessageMetaData(storeContext, l);
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
            prepareStatement.setLong(1, l.longValue());
            connectionWrapper.setRequiresCommit();
            if (prepareStatement.executeUpdate() == 0) {
                if (orCreateTransaction) {
                    abortTran(storeContext);
                }
                throw new AMQException("Message metadata not found for message id " + l);
            }
            prepareStatement.close();
            if (_logger.isDebugEnabled()) {
                _logger.debug("Deleted metadata for message " + l);
            }
            PreparedStatement prepareStatement2 = connection.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
            prepareStatement2.setLong(1, l.longValue());
            int executeUpdate = prepareStatement2.executeUpdate();
            if (executeUpdate != messageMetaData.getContentChunkCount()) {
                if (orCreateTransaction) {
                    abortTran(storeContext);
                }
                throw new AMQException("Unexpected number of content chunks when deleting message.  Expected " + messageMetaData.getContentChunkCount() + " but found " + executeUpdate);
            }
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
        } catch (SQLException e) {
            if (connection != null && orCreateTransaction) {
                abortTran(storeContext);
            }
            throw new AMQException("Error writing AMQMessage with id " + l + " to database: " + e, e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void createExchange(Exchange exchange) throws AMQException {
        if (this._state != State.RECOVERING) {
            Connection connection = null;
            try {
                try {
                    connection = newConnection();
                    PreparedStatement prepareStatement = connection.prepareStatement(INSERT_INTO_EXCHANGE);
                    prepareStatement.setString(1, exchange.getName().toString());
                    prepareStatement.setString(2, exchange.getType().toString());
                    prepareStatement.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
                    prepareStatement.execute();
                    prepareStatement.close();
                    connection.commit();
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Throwable th) {
                    if (connection != null) {
                        connection.close();
                    }
                    throw th;
                }
            } catch (SQLException e) {
                throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
            }
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void removeExchange(Exchange exchange) throws AMQException {
        Connection connection = null;
        try {
            try {
                Connection newConnection = newConnection();
                PreparedStatement prepareStatement = newConnection.prepareStatement(DELETE_FROM_EXCHANGE);
                prepareStatement.setString(1, exchange.getName().toString());
                if (prepareStatement.executeUpdate() == 0) {
                    throw new AMQException("Exchange " + exchange.getName() + " not found");
                }
                newConnection.commit();
                prepareStatement.close();
                if (newConnection != null) {
                    try {
                        newConnection.close();
                    } catch (SQLException e) {
                        _logger.error(e);
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (SQLException e2) {
                        _logger.error(e2);
                    }
                }
                throw th;
            }
        } catch (SQLException e3) {
            throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e3, e3);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void bindQueue(Exchange exchange, AMQShortString aMQShortString, AMQQueue aMQQueue, FieldTable fieldTable) throws AMQException {
        if (this._state != State.RECOVERING) {
            Connection connection = null;
            try {
                try {
                    connection = newConnection();
                    PreparedStatement prepareStatement = connection.prepareStatement(FIND_BINDING);
                    prepareStatement.setString(1, exchange.getName().toString());
                    prepareStatement.setString(2, aMQQueue.getName().toString());
                    prepareStatement.setString(3, aMQShortString == null ? null : aMQShortString.toString());
                    if (!prepareStatement.executeQuery().next()) {
                        PreparedStatement prepareStatement2 = connection.prepareStatement(INSERT_INTO_BINDINGS);
                        prepareStatement2.setString(1, exchange.getName().toString());
                        prepareStatement2.setString(2, aMQQueue.getName().toString());
                        prepareStatement2.setString(3, aMQShortString == null ? null : aMQShortString.toString());
                        if (fieldTable != null) {
                            byte[] dataAsBytes = fieldTable.getDataAsBytes();
                            prepareStatement2.setBinaryStream(4, (InputStream) new ByteArrayInputStream(dataAsBytes), dataAsBytes.length);
                        } else {
                            prepareStatement2.setNull(4, 2004);
                        }
                        prepareStatement2.executeUpdate();
                        connection.commit();
                        prepareStatement2.close();
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e) {
                            _logger.error(e);
                        }
                    }
                } catch (SQLException e2) {
                    throw new AMQException("Error writing binding for AMQQueue with name " + aMQQueue.getName() + " to exchange " + exchange.getName() + " to database: " + e2, e2);
                }
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e3) {
                        _logger.error(e3);
                    }
                }
                throw th;
            }
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void unbindQueue(Exchange exchange, AMQShortString aMQShortString, AMQQueue aMQQueue, FieldTable fieldTable) throws AMQException {
        Connection connection = null;
        try {
            try {
                Connection newConnection = newConnection();
                PreparedStatement prepareStatement = newConnection.prepareStatement(DELETE_FROM_BINDINGS);
                prepareStatement.setString(1, exchange.getName().toString());
                prepareStatement.setString(2, aMQQueue.getName().toString());
                prepareStatement.setString(3, aMQShortString == null ? null : aMQShortString.toString());
                if (prepareStatement.executeUpdate() != 1) {
                    throw new AMQException("Queue binding for queue with name " + aMQQueue.getName() + " to exchange " + exchange.getName() + "  not found");
                }
                newConnection.commit();
                prepareStatement.close();
                if (newConnection != null) {
                    try {
                        newConnection.close();
                    } catch (SQLException e) {
                        _logger.error(e);
                    }
                }
            } catch (SQLException e2) {
                throw new AMQException("Error removing binding for AMQQueue with name " + aMQQueue.getName() + " to exchange " + exchange.getName() + " in database: " + e2, e2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    connection.close();
                } catch (SQLException e3) {
                    _logger.error(e3);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void createQueue(AMQQueue aMQQueue) throws AMQException {
        createQueue(aMQQueue, null);
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void createQueue(AMQQueue aMQQueue, FieldTable fieldTable) throws AMQException {
        _logger.debug("public void createQueue(AMQQueue queue = " + aMQQueue + "): called");
        if (this._state != State.RECOVERING) {
            try {
                Connection newConnection = newConnection();
                PreparedStatement prepareStatement = newConnection.prepareStatement(FIND_QUEUE);
                prepareStatement.setString(1, aMQQueue.getName().toString());
                if (!prepareStatement.executeQuery().next()) {
                    PreparedStatement prepareStatement2 = newConnection.prepareStatement(INSERT_INTO_QUEUE);
                    prepareStatement2.setString(1, aMQQueue.getName().toString());
                    prepareStatement2.setString(2, aMQQueue.getOwner() == null ? null : aMQQueue.getOwner().toString());
                    prepareStatement2.execute();
                    prepareStatement2.close();
                    newConnection.commit();
                    newConnection.close();
                }
            } catch (SQLException e) {
                throw new AMQException("Error writing AMQQueue with name " + aMQQueue.getName() + " to database: " + e, e);
            }
        }
    }

    private Connection newConnection() throws SQLException {
        return DriverManager.getConnection(this._connectionURL);
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void removeQueue(AMQQueue aMQQueue) throws AMQException {
        AMQShortString name = aMQQueue.getName();
        _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
        Connection connection = null;
        try {
            try {
                Connection newConnection = newConnection();
                PreparedStatement prepareStatement = newConnection.prepareStatement(DELETE_FROM_QUEUE);
                prepareStatement.setString(1, name.toString());
                if (prepareStatement.executeUpdate() == 0) {
                    throw new AMQException("Queue " + name + " not found");
                }
                newConnection.commit();
                prepareStatement.close();
                if (newConnection != null) {
                    try {
                        newConnection.close();
                    } catch (SQLException e) {
                        _logger.error(e);
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (SQLException e2) {
                        _logger.error(e2);
                    }
                }
                throw th;
            }
        } catch (SQLException e3) {
            throw new AMQException("Error writing deleting with name " + name + " from database: " + e3, e3);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void enqueueMessage(StoreContext storeContext, AMQQueue aMQQueue, Long l) throws AMQException {
        AMQShortString name = aMQQueue.getName();
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        Connection connection = getConnection(storeContext);
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
            prepareStatement.setString(1, name.toString());
            prepareStatement.setLong(2, l.longValue());
            prepareStatement.executeUpdate();
            connectionWrapper.requiresCommit();
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
            if (_logger.isDebugEnabled()) {
                _logger.debug("Enqueuing message " + l + " on queue " + name + "[Connection" + connection + "]");
            }
        } catch (SQLException e) {
            if (orCreateTransaction) {
                abortTran(storeContext);
            }
            _logger.error("Failed to enqueue: " + e, e);
            throw new AMQException("Error writing enqueued message with id " + l + " for queue " + name + " to database", e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void dequeueMessage(StoreContext storeContext, AMQQueue aMQQueue, Long l) throws AMQException {
        AMQShortString name = aMQQueue.getName();
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        Connection connection = getConnection(storeContext);
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
            prepareStatement.setString(1, name.toString());
            prepareStatement.setLong(2, l.longValue());
            int executeUpdate = prepareStatement.executeUpdate();
            connectionWrapper.requiresCommit();
            if (executeUpdate != 1) {
                throw new AMQException("Unable to find message with id " + l + " on queue " + name);
            }
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
            if (_logger.isDebugEnabled()) {
                _logger.debug("Dequeuing message " + l + " on queue " + name);
            }
        } catch (SQLException e) {
            if (orCreateTransaction) {
                abortTran(storeContext);
            }
            _logger.error("Failed to dequeue: " + e, e);
            throw new AMQException("Error deleting enqueued message with id " + l + " for queue " + name + " from database", e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void beginTran(StoreContext storeContext) throws AMQException {
        if (storeContext.getPayload() != null) {
            throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: " + storeContext.getPayload());
        }
        try {
            storeContext.setPayload(new ConnectionWrapper(newConnection()));
        } catch (SQLException e) {
            throw new AMQException("Error starting transaction: " + e, e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void commitTran(StoreContext storeContext) throws AMQException {
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        try {
            if (connectionWrapper == null) {
                throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
            }
            try {
                Connection connection = connectionWrapper.getConnection();
                if (connectionWrapper.requiresCommit()) {
                    connection.commit();
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("commit tran completed");
                    }
                }
                connection.close();
                storeContext.setPayload(null);
            } catch (SQLException e) {
                throw new AMQException("Error commit tx: " + e, e);
            }
        } catch (Throwable th) {
            storeContext.setPayload(null);
            throw th;
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void abortTran(StoreContext storeContext) throws AMQException {
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        if (connectionWrapper == null) {
            throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("abort tran called: " + connectionWrapper.getConnection());
        }
        try {
            try {
                Connection connection = connectionWrapper.getConnection();
                if (connectionWrapper.requiresCommit()) {
                    connection.rollback();
                }
                connection.close();
                storeContext.setPayload(null);
            } catch (SQLException e) {
                throw new AMQException("Error aborting transaction: " + e, e);
            }
        } catch (Throwable th) {
            storeContext.setPayload(null);
            throw th;
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public boolean inTran(StoreContext storeContext) {
        return storeContext.getPayload() != null;
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public Long getNewMessageId() {
        return Long.valueOf(this._messageId.getAndIncrement());
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void storeContentBodyChunk(StoreContext storeContext, Long l, int i, ContentChunk contentChunk, boolean z) throws AMQException {
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        Connection connection = getConnection(storeContext);
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
            prepareStatement.setLong(1, l.longValue());
            prepareStatement.setInt(2, i);
            byte[] bArr = new byte[contentChunk.getSize()];
            contentChunk.getData().duplicate().get(bArr);
            prepareStatement.setBinaryStream(3, (InputStream) new ByteArrayInputStream(bArr), bArr.length);
            prepareStatement.executeUpdate();
            connectionWrapper.requiresCommit();
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
        } catch (SQLException e) {
            if (orCreateTransaction) {
                abortTran(storeContext);
            }
            throw new AMQException("Error writing AMQMessage with id " + l + " to database: " + e, e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public void storeMessageMetaData(StoreContext storeContext, Long l, MessageMetaData messageMetaData) throws AMQException {
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        Connection connection = getConnection(storeContext);
        ConnectionWrapper connectionWrapper = (ConnectionWrapper) storeContext.getPayload();
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
            prepareStatement.setLong(1, l.longValue());
            prepareStatement.setString(2, messageMetaData.getMessagePublishInfo().getExchange().toString());
            prepareStatement.setString(3, messageMetaData.getMessagePublishInfo().getRoutingKey().toString());
            prepareStatement.setShort(4, messageMetaData.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
            prepareStatement.setShort(5, messageMetaData.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
            ContentHeaderBody contentHeaderBody = messageMetaData.getContentHeaderBody();
            byte[] bArr = new byte[contentHeaderBody.getSize()];
            contentHeaderBody.writePayload(ByteBuffer.wrap(bArr));
            prepareStatement.setBinaryStream(6, (InputStream) new ByteArrayInputStream(bArr), bArr.length);
            prepareStatement.setInt(7, messageMetaData.getContentChunkCount());
            prepareStatement.executeUpdate();
            connectionWrapper.requiresCommit();
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
        } catch (SQLException e) {
            if (orCreateTransaction) {
                abortTran(storeContext);
            }
            throw new AMQException("Error writing AMQMessage with id " + l + " to database: " + e, e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public MessageMetaData getMessageMetaData(StoreContext storeContext, Long l) throws AMQException {
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        try {
            PreparedStatement prepareStatement = getConnection(storeContext).prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
            prepareStatement.setLong(1, l.longValue());
            ResultSet executeQuery = prepareStatement.executeQuery();
            if (!executeQuery.next()) {
                if (orCreateTransaction) {
                    abortTran(storeContext);
                }
                throw new AMQException("Metadata not found for message with id " + l);
            }
            final AMQShortString aMQShortString = new AMQShortString(executeQuery.getString(1));
            final AMQShortString aMQShortString2 = executeQuery.getString(2) == null ? null : new AMQShortString(executeQuery.getString(2));
            final boolean z = executeQuery.getShort(3) != 0;
            final boolean z2 = executeQuery.getShort(4) != 0;
            MessagePublishInfo messagePublishInfo = new MessagePublishInfo() { // from class: org.apache.qpid.server.store.DerbyMessageStore.1
                public AMQShortString getExchange() {
                    return aMQShortString;
                }

                public void setExchange(AMQShortString aMQShortString3) {
                }

                public boolean isImmediate() {
                    return z2;
                }

                public boolean isMandatory() {
                    return z;
                }

                public AMQShortString getRoutingKey() {
                    return aMQShortString2;
                }
            };
            Blob blob = executeQuery.getBlob(5);
            ContentHeaderBody createFromBuffer = ContentHeaderBody.createFromBuffer(ByteBuffer.wrap(blob.getBytes(1L, (int) blob.length())), r0.length);
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
            return new MessageMetaData(messagePublishInfo, createFromBuffer, executeQuery.getInt(6));
        } catch (SQLException e) {
            if (orCreateTransaction) {
                abortTran(storeContext);
            }
            throw new AMQException("Error reading AMQMessage with id " + l + " from database: " + e, e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public ContentChunk getContentBodyChunk(StoreContext storeContext, Long l, int i) throws AMQException {
        boolean orCreateTransaction = getOrCreateTransaction(storeContext);
        try {
            PreparedStatement prepareStatement = getConnection(storeContext).prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
            prepareStatement.setLong(1, l.longValue());
            prepareStatement.setInt(2, i);
            ResultSet executeQuery = prepareStatement.executeQuery();
            if (!executeQuery.next()) {
                if (orCreateTransaction) {
                    abortTran(storeContext);
                }
                throw new AMQException("Message not found for message with id " + l);
            }
            Blob blob = executeQuery.getBlob(1);
            final int length = (int) blob.length();
            final ByteBuffer wrap = ByteBuffer.wrap(blob.getBytes(1L, length));
            ContentChunk contentChunk = new ContentChunk() { // from class: org.apache.qpid.server.store.DerbyMessageStore.2
                public int getSize() {
                    return length;
                }

                public ByteBuffer getData() {
                    return wrap;
                }

                public void reduceToFit() {
                }
            };
            if (orCreateTransaction) {
                commitTran(storeContext);
            }
            return contentChunk;
        } catch (SQLException e) {
            if (orCreateTransaction) {
                abortTran(storeContext);
            }
            throw new AMQException("Error reading AMQMessage with id " + l + " from database: " + e, e);
        }
    }

    @Override // org.apache.qpid.server.store.MessageStore
    public boolean isPersistent() {
        return true;
    }

    private void checkNotClosed() throws MessageStoreClosedException {
        if (this._closed.get()) {
            throw new MessageStoreClosedException();
        }
    }

    private void deliverMessages(StoreContext storeContext, Map<AMQShortString, AMQQueue> map) throws SQLException, AMQException {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        TreeMap treeMap = new TreeMap();
        boolean inTran = inTran(storeContext);
        Connection connection = null;
        try {
            try {
                connection = inTran ? getConnection(storeContext) : newConnection();
                MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
                long j = 1;
                NonTransactionalContext nonTransactionalContext = new NonTransactionalContext(this, new StoreContext(), null, null);
                ResultSet executeQuery = connection.createStatement().executeQuery(SELECT_FROM_QUEUE_ENTRY);
                while (executeQuery.next()) {
                    AMQShortString aMQShortString = new AMQShortString(executeQuery.getString(1));
                    AMQQueue aMQQueue = map.get(aMQShortString);
                    if (aMQQueue == null) {
                        aMQQueue = AMQQueueFactory.createAMQQueueImpl(aMQShortString, false, null, false, this._virtualHost, null);
                        this._virtualHost.getQueueRegistry().registerQueue(aMQQueue);
                        map.put(aMQShortString, aMQQueue);
                    }
                    long j2 = executeQuery.getLong(2);
                    j = Math.max(j, j2);
                    AMQMessage aMQMessage = (AMQMessage) hashMap.get(Long.valueOf(j2));
                    if (aMQMessage != null) {
                        aMQMessage.incrementReference();
                    } else {
                        aMQMessage = new AMQMessage(Long.valueOf(j2), this, messageHandleFactory, nonTransactionalContext);
                        hashMap.put(Long.valueOf(j2), aMQMessage);
                    }
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("On recovery, delivering " + aMQMessage.getMessageId() + " to " + aMQQueue.getName());
                    }
                    if (_logger.isInfoEnabled()) {
                        Integer num = (Integer) treeMap.get(aMQShortString);
                        if (num == null) {
                            num = 0;
                        }
                        treeMap.put(aMQShortString, Integer.valueOf(num.intValue() + 1));
                    }
                    arrayList.add(new ProcessAction(aMQQueue, storeContext, aMQMessage));
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((ProcessAction) it.next()).process();
                }
                this._messageId.set(j + 1);
                if (inTran && connection != null) {
                    connection.close();
                }
                if (_logger.isInfoEnabled()) {
                    _logger.info("Recovered message counts: " + treeMap);
                }
            } catch (SQLException e) {
                _logger.error("Error: " + e, e);
                throw e;
            }
        } catch (Throwable th) {
            if (inTran && connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private Connection getConnection(StoreContext storeContext) {
        return ((ConnectionWrapper) storeContext.getPayload()).getConnection();
    }

    private boolean getOrCreateTransaction(StoreContext storeContext) throws AMQException {
        if (((ConnectionWrapper) storeContext.getPayload()) != null) {
            return false;
        }
        beginTran(storeContext);
        return true;
    }

    private synchronized void stateTransition(State state, State state2) throws AMQException {
        if (this._state != state) {
            throw new AMQException("Cannot transition to the state: " + state2 + "; need to be in state: " + state + "; currently in state: " + this._state);
        }
        this._state = state2;
    }
}
