/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.store;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreClosedException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DerbyMessageStore
implements MessageStore {
    private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
    public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
    private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
    private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
    private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
    private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
    private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
    private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
    private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
    private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
    private static final int DB_VERSION = 1;
    private VirtualHost _virtualHost;
    private static Class<Driver> DRIVER_CLASS;
    private final AtomicLong _messageId = new AtomicLong(1L);
    private AtomicBoolean _closed = new AtomicBoolean(false);
    private String _connectionURL;
    private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE QPID_DB_VERSION ( version int not null )";
    private static final String INSERT_INTO_DB_VERSION = "INSERT INTO QPID_DB_VERSION ( version ) VALUES ( ? )";
    private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE QPID_EXCHANGE ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
    private static final String CREATE_QUEUE_TABLE = "CREATE TABLE QPID_QUEUE ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
    private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE QPID_BINDINGS ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE QPID_QUEUE_ENTRY ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
    private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE QPID_MESSAGE_META_DATA ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE QPID_MESSAGE_CONTENT ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
    private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM QPID_QUEUE";
    private static final String FIND_QUEUE = "SELECT name, owner FROM QPID_QUEUE WHERE name = ?";
    private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM QPID_EXCHANGE";
    private static final String SELECT_FROM_BINDINGS = "SELECT queue_name, binding_key, arguments FROM QPID_BINDINGS WHERE exchange_name = ?";
    private static final String FIND_BINDING = "SELECT * FROM QPID_BINDINGS WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
    private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM QPID_MESSAGE_META_DATA WHERE message_id = ?";
    private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM QPID_MESSAGE_CONTENT WHERE message_id = ?";
    private static final String INSERT_INTO_EXCHANGE = "INSERT INTO QPID_EXCHANGE ( name, type, autodelete ) VALUES ( ?, ?, ? )";
    private static final String DELETE_FROM_EXCHANGE = "DELETE FROM QPID_EXCHANGE WHERE name = ?";
    private static final String INSERT_INTO_BINDINGS = "INSERT INTO QPID_BINDINGS ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
    private static final String DELETE_FROM_BINDINGS = "DELETE FROM QPID_BINDINGS WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
    private static final String INSERT_INTO_QUEUE = "INSERT INTO QPID_QUEUE (name, owner) VALUES (?, ?)";
    private static final String DELETE_FROM_QUEUE = "DELETE FROM QPID_QUEUE WHERE name = ?";
    private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO QPID_QUEUE_ENTRY (queue_name, message_id) values (?,?)";
    private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM QPID_QUEUE_ENTRY WHERE queue_name = ? AND message_id =?";
    private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO QPID_MESSAGE_CONTENT( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
    private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO QPID_MESSAGE_META_DATA( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
    private static final String SELECT_FROM_MESSAGE_META_DATA = "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM QPID_MESSAGE_META_DATA WHERE message_id = ?";
    private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content_chunk FROM QPID_MESSAGE_CONTENT WHERE message_id = ? and chunk_id = ?";
    private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM QPID_QUEUE_ENTRY";
    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
    private State _state = State.INITIAL;

    @Override
    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception {
        this.stateTransition(State.INITIAL, State.CONFIGURING);
        DerbyMessageStore.initialiseDriver();
        this._virtualHost = virtualHost;
        _logger.info((Object)("Configuring Derby message store for virtual host " + virtualHost.getName()));
        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
        String databasePath = config.getStoreConfiguration().getString(ENVIRONMENT_PATH_PROPERTY, "derbyDB");
        File environmentPath = new File(databasePath);
        if (!environmentPath.exists() && !environmentPath.mkdirs()) {
            throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + "Ensure the path is correct and that the permissions are correct.");
        }
        this.createOrOpenDatabase(databasePath);
        this.recover();
        this.stateTransition(State.RECOVERING, State.STARTED);
    }

    private static synchronized void initialiseDriver() throws ClassNotFoundException {
        if (DRIVER_CLASS == null) {
            DRIVER_CLASS = Class.forName(SQL_DRIVER_NAME);
        }
    }

    private void createOrOpenDatabase(String environmentPath) throws SQLException {
        this._connectionURL = "jdbc:derby:" + environmentPath + "/" + this._virtualHost.getName() + ";create=true";
        Connection conn = this.newConnection();
        this.createVersionTable(conn);
        this.createExchangeTable(conn);
        this.createQueueTable(conn);
        this.createBindingsTable(conn);
        this.createQueueEntryTable(conn);
        this.createMessageMetaDataTable(conn);
        this.createMessageContentTable(conn);
        conn.close();
    }

    private void createVersionTable(Connection conn) throws SQLException {
        if (!this.tableExists(DB_VERSION_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_DB_VERSION_TABLE);
            stmt.close();
            PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
            pstmt.setInt(1, 1);
            pstmt.execute();
            pstmt.close();
        }
    }

    private void createExchangeTable(Connection conn) throws SQLException {
        if (!this.tableExists(EXCHANGE_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_EXCHANGE_TABLE);
            stmt.close();
        }
    }

    private void createQueueTable(Connection conn) throws SQLException {
        if (!this.tableExists(QUEUE_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_QUEUE_TABLE);
            stmt.close();
        }
    }

    private void createBindingsTable(Connection conn) throws SQLException {
        if (!this.tableExists(BINDINGS_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_BINDINGS_TABLE);
            stmt.close();
        }
    }

    private void createQueueEntryTable(Connection conn) throws SQLException {
        if (!this.tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
            stmt.close();
        }
    }

    private void createMessageMetaDataTable(Connection conn) throws SQLException {
        if (!this.tableExists(MESSAGE_META_DATA_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
            stmt.close();
        }
    }

    private void createMessageContentTable(Connection conn) throws SQLException {
        if (!this.tableExists(MESSAGE_CONTENT_TABLE_NAME, conn)) {
            Statement stmt = conn.createStatement();
            stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
            stmt.close();
        }
    }

    private boolean tableExists(String tableName, Connection conn) throws SQLException {
        PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
        stmt.setString(1, tableName);
        ResultSet rs = stmt.executeQuery();
        boolean exists = rs.next();
        rs.close();
        stmt.close();
        return exists;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recover() throws AMQException {
        this.stateTransition(State.CONFIGURING, State.RECOVERING);
        _logger.info((Object)"Recovering persistent state...");
        StoreContext context = new StoreContext();
        try {
            Map<AMQShortString, AMQQueue> queues = this.loadQueues();
            this.recoverExchanges();
            try {
                this.beginTran(context);
                this.deliverMessages(context, queues);
                _logger.info((Object)"Persistent state recovered successfully");
                this.commitTran(context);
            }
            finally {
                if (this.inTran(context)) {
                    this.abortTran(context);
                }
            }
        }
        catch (SQLException e) {
            throw new AMQException("Error recovering persistent state: " + e, (Throwable)e);
        }
    }

    private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException {
        Connection conn = this.newConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
        HashMap<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
        while (rs.next()) {
            String queueName = rs.getString(1);
            String owner = rs.getString(2);
            AMQShortString queueNameShortString = new AMQShortString(queueName);
            AMQQueue q = this._virtualHost.getQueueRegistry().getQueue(queueNameShortString);
            if (q == null) {
                q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, this._virtualHost, null);
                this._virtualHost.getQueueRegistry().registerQueue(q);
            }
            queueMap.put(queueNameShortString, q);
        }
        return queueMap;
    }

    private void recoverExchanges() throws AMQException, SQLException {
        for (Exchange exchange : this.loadExchanges()) {
            this.recoverExchange(exchange);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Exchange> loadExchanges() throws AMQException, SQLException {
        ArrayList<Exchange> exchanges = new ArrayList<Exchange>();
        Connection conn = null;
        try {
            conn = this.newConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
            while (rs.next()) {
                String exchangeName = rs.getString(1);
                String type = rs.getString(2);
                boolean autoDelete = rs.getShort(3) != 0;
                AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
                Exchange exchange = this._virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
                if (exchange == null) {
                    exchange = this._virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
                    this._virtualHost.getExchangeRegistry().registerExchange(exchange);
                }
                exchanges.add(exchange);
            }
            ArrayList<Exchange> arrayList = exchanges;
            return arrayList;
        }
        finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recoverExchange(Exchange exchange) throws AMQException, SQLException {
        _logger.info((Object)("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "..."));
        QueueRegistry queueRegistry = this._virtualHost.getQueueRegistry();
        Connection conn = null;
        try {
            conn = this.newConnection();
            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
            stmt.setString(1, exchange.getName().toString());
            ResultSet rs = stmt.executeQuery();
            while (rs.next()) {
                String queueName = rs.getString(1);
                String bindingKey = rs.getString(2);
                Blob arguments = rs.getBlob(3);
                AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
                if (queue == null) {
                    _logger.error((Object)("Unkown queue: " + queueName + " cannot be bound to exchange: " + exchange.getName()));
                    continue;
                }
                _logger.info((Object)("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName + ", Routing Key: " + bindingKey + ", Arguments: " + arguments + ")"));
                FieldTable argumentsFT = null;
                if (arguments != null) {
                    byte[] argumentBytes = arguments.getBytes(0L, (int)arguments.length());
                    ByteBuffer buf = ByteBuffer.wrap((byte[])argumentBytes);
                    argumentsFT = new FieldTable(buf, arguments.length());
                }
                queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
            }
        }
        finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    @Override
    public void close() throws Exception {
        this._closed.getAndSet(true);
    }

    @Override
    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException {
        boolean localTx = this.getOrCreateTransaction(storeContext);
        Connection conn = this.getConnection(storeContext);
        ConnectionWrapper wrapper = (ConnectionWrapper)storeContext.getPayload();
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Message Id: " + messageId + " Removing"));
        }
        MessageMetaData mmd = this.getMessageMetaData(storeContext, messageId);
        try {
            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
            stmt.setLong(1, messageId);
            wrapper.setRequiresCommit();
            int results = stmt.executeUpdate();
            if (results == 0) {
                if (localTx) {
                    this.abortTran(storeContext);
                }
                throw new AMQException("Message metadata not found for message id " + messageId);
            }
            stmt.close();
            if (_logger.isDebugEnabled()) {
                _logger.debug((Object)("Deleted metadata for message " + messageId));
            }
            stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
            stmt.setLong(1, messageId);
            results = stmt.executeUpdate();
            if (results != mmd.getContentChunkCount()) {
                if (localTx) {
                    this.abortTran(storeContext);
                }
                throw new AMQException("Unexpected number of content chunks when deleting message.  Expected " + mmd.getContentChunkCount() + " but found " + results);
            }
            if (localTx) {
                this.commitTran(storeContext);
            }
        }
        catch (SQLException e) {
            if (conn != null && localTx) {
                this.abortTran(storeContext);
            }
            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void createExchange(Exchange exchange) throws AMQException {
        if (this._state != State.RECOVERING) {
            try {
                Connection conn = null;
                try {
                    conn = this.newConnection();
                    PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
                    stmt.setString(1, exchange.getName().toString());
                    stmt.setString(2, exchange.getType().toString());
                    stmt.setShort(3, exchange.isAutoDelete() ? (short)1 : 0);
                    stmt.execute();
                    stmt.close();
                    conn.commit();
                }
                finally {
                    if (conn != null) {
                        conn.close();
                    }
                }
            }
            catch (SQLException e) {
                throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, (Throwable)e);
            }
        }
    }

    @Override
    public void removeExchange(Exchange exchange) throws AMQException {
        Connection conn = null;
        try {
            conn = this.newConnection();
            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
            stmt.setString(1, exchange.getName().toString());
            int results = stmt.executeUpdate();
            if (results == 0) {
                throw new AMQException("Exchange " + exchange.getName() + " not found");
            }
            conn.commit();
            stmt.close();
        }
        catch (SQLException e) {
            throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, (Throwable)e);
        }
        finally {
            if (conn != null) {
                try {
                    conn.close();
                }
                catch (SQLException e) {
                    _logger.error((Object)e);
                }
            }
        }
    }

    @Override
    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException {
        if (this._state != State.RECOVERING) {
            Connection conn = null;
            try {
                conn = this.newConnection();
                PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
                stmt.setString(1, exchange.getName().toString());
                stmt.setString(2, queue.getName().toString());
                stmt.setString(3, routingKey == null ? null : routingKey.toString());
                ResultSet rs = stmt.executeQuery();
                if (!rs.next()) {
                    stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
                    stmt.setString(1, exchange.getName().toString());
                    stmt.setString(2, queue.getName().toString());
                    stmt.setString(3, routingKey == null ? null : routingKey.toString());
                    if (args != null) {
                        byte[] bytes = args.getDataAsBytes();
                        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
                        stmt.setBinaryStream(4, (InputStream)bis, bytes.length);
                    } else {
                        stmt.setNull(4, 2004);
                    }
                    stmt.executeUpdate();
                    conn.commit();
                    stmt.close();
                }
            }
            catch (SQLException e) {
                throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " + exchange.getName() + " to database: " + e, (Throwable)e);
            }
            finally {
                if (conn != null) {
                    try {
                        conn.close();
                    }
                    catch (SQLException e) {
                        _logger.error((Object)e);
                    }
                }
            }
        }
    }

    @Override
    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException {
        Connection conn = null;
        try {
            conn = this.newConnection();
            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
            stmt.setString(1, exchange.getName().toString());
            stmt.setString(2, queue.getName().toString());
            stmt.setString(3, routingKey == null ? null : routingKey.toString());
            if (stmt.executeUpdate() != 1) {
                throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange " + exchange.getName() + "  not found");
            }
            conn.commit();
            stmt.close();
        }
        catch (SQLException e) {
            throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange " + exchange.getName() + " in database: " + e, (Throwable)e);
        }
        finally {
            if (conn != null) {
                try {
                    conn.close();
                }
                catch (SQLException e) {
                    _logger.error((Object)e);
                }
            }
        }
    }

    @Override
    public void createQueue(AMQQueue queue) throws AMQException {
        this.createQueue(queue, null);
    }

    @Override
    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException {
        _logger.debug((Object)("public void createQueue(AMQQueue queue = " + queue + "): called"));
        if (this._state != State.RECOVERING) {
            try {
                Connection conn = this.newConnection();
                PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
                stmt.setString(1, queue.getName().toString());
                ResultSet rs = stmt.executeQuery();
                if (!rs.next()) {
                    stmt = conn.prepareStatement(INSERT_INTO_QUEUE);
                    stmt.setString(1, queue.getName().toString());
                    stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
                    stmt.execute();
                    stmt.close();
                    conn.commit();
                    conn.close();
                }
            }
            catch (SQLException e) {
                throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, (Throwable)e);
            }
        }
    }

    private Connection newConnection() throws SQLException {
        Connection connection = DriverManager.getConnection(this._connectionURL);
        return connection;
    }

    @Override
    public void removeQueue(AMQQueue queue) throws AMQException {
        AMQShortString name = queue.getName();
        _logger.debug((Object)("public void removeQueue(AMQShortString name = " + name + "): called"));
        Connection conn = null;
        try {
            conn = this.newConnection();
            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
            stmt.setString(1, name.toString());
            int results = stmt.executeUpdate();
            if (results == 0) {
                throw new AMQException("Queue " + name + " not found");
            }
            conn.commit();
            stmt.close();
        }
        catch (SQLException e) {
            throw new AMQException("Error writing deleting with name " + name + " from database: " + e, (Throwable)e);
        }
        finally {
            if (conn != null) {
                try {
                    conn.close();
                }
                catch (SQLException e) {
                    _logger.error((Object)e);
                }
            }
        }
    }

    @Override
    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException {
        AMQShortString name = queue.getName();
        boolean localTx = this.getOrCreateTransaction(context);
        Connection conn = this.getConnection(context);
        ConnectionWrapper connWrapper = (ConnectionWrapper)context.getPayload();
        try {
            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
            stmt.setString(1, name.toString());
            stmt.setLong(2, messageId);
            stmt.executeUpdate();
            connWrapper.requiresCommit();
            if (localTx) {
                this.commitTran(context);
            }
            if (_logger.isDebugEnabled()) {
                _logger.debug((Object)("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"));
            }
        }
        catch (SQLException e) {
            if (localTx) {
                this.abortTran(context);
            }
            _logger.error((Object)("Failed to enqueue: " + e), (Throwable)e);
            throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name + " to database", (Throwable)e);
        }
    }

    @Override
    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException {
        AMQShortString name = queue.getName();
        boolean localTx = this.getOrCreateTransaction(context);
        Connection conn = this.getConnection(context);
        ConnectionWrapper connWrapper = (ConnectionWrapper)context.getPayload();
        try {
            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
            stmt.setString(1, name.toString());
            stmt.setLong(2, messageId);
            int results = stmt.executeUpdate();
            connWrapper.requiresCommit();
            if (results != 1) {
                throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
            }
            if (localTx) {
                this.commitTran(context);
            }
            if (_logger.isDebugEnabled()) {
                _logger.debug((Object)("Dequeuing message " + messageId + " on queue " + name));
            }
        }
        catch (SQLException e) {
            if (localTx) {
                this.abortTran(context);
            }
            _logger.error((Object)("Failed to dequeue: " + e), (Throwable)e);
            throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name + " from database", (Throwable)e);
        }
    }

    @Override
    public void beginTran(StoreContext context) throws AMQException {
        if (context.getPayload() != null) {
            throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: " + context.getPayload());
        }
        try {
            Connection conn = this.newConnection();
            context.setPayload(new ConnectionWrapper(conn));
        }
        catch (SQLException e) {
            throw new AMQException("Error starting transaction: " + e, (Throwable)e);
        }
    }

    @Override
    public void commitTran(StoreContext context) throws AMQException {
        ConnectionWrapper connWrapper = (ConnectionWrapper)context.getPayload();
        if (connWrapper == null) {
            throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
        }
        try {
            Connection conn = connWrapper.getConnection();
            if (connWrapper.requiresCommit()) {
                conn.commit();
                if (_logger.isDebugEnabled()) {
                    _logger.debug((Object)"commit tran completed");
                }
            }
            conn.close();
        }
        catch (SQLException e) {
            throw new AMQException("Error commit tx: " + e, (Throwable)e);
        }
        finally {
            context.setPayload(null);
        }
    }

    @Override
    public void abortTran(StoreContext context) throws AMQException {
        ConnectionWrapper connWrapper = (ConnectionWrapper)context.getPayload();
        if (connWrapper == null) {
            throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("abort tran called: " + connWrapper.getConnection()));
        }
        try {
            Connection conn = connWrapper.getConnection();
            if (connWrapper.requiresCommit()) {
                conn.rollback();
            }
            conn.close();
        }
        catch (SQLException e) {
            throw new AMQException("Error aborting transaction: " + e, (Throwable)e);
        }
        finally {
            context.setPayload(null);
        }
    }

    @Override
    public boolean inTran(StoreContext context) {
        return context.getPayload() != null;
    }

    @Override
    public Long getNewMessageId() {
        return this._messageId.getAndIncrement();
    }

    @Override
    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException {
        boolean localTx = this.getOrCreateTransaction(context);
        Connection conn = this.getConnection(context);
        ConnectionWrapper connWrapper = (ConnectionWrapper)context.getPayload();
        try {
            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
            stmt.setLong(1, messageId);
            stmt.setInt(2, index);
            byte[] chunkData = new byte[contentBody.getSize()];
            contentBody.getData().duplicate().get(chunkData);
            ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
            stmt.setBinaryStream(3, (InputStream)bis, chunkData.length);
            stmt.executeUpdate();
            connWrapper.requiresCommit();
            if (localTx) {
                this.commitTran(context);
            }
        }
        catch (SQLException e) {
            if (localTx) {
                this.abortTran(context);
            }
            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, (Throwable)e);
        }
    }

    @Override
    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd) throws AMQException {
        boolean localTx = this.getOrCreateTransaction(context);
        Connection conn = this.getConnection(context);
        ConnectionWrapper connWrapper = (ConnectionWrapper)context.getPayload();
        try {
            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
            stmt.setLong(1, messageId);
            stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
            stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
            stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short)1 : 0);
            stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short)1 : 0);
            ContentHeaderBody headerBody = mmd.getContentHeaderBody();
            int bodySize = headerBody.getSize();
            byte[] underlying = new byte[bodySize];
            ByteBuffer buf = ByteBuffer.wrap((byte[])underlying);
            headerBody.writePayload(buf);
            ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
            stmt.setBinaryStream(6, (InputStream)bis, underlying.length);
            stmt.setInt(7, mmd.getContentChunkCount());
            stmt.executeUpdate();
            connWrapper.requiresCommit();
            if (localTx) {
                this.commitTran(context);
            }
        }
        catch (SQLException e) {
            if (localTx) {
                this.abortTran(context);
            }
            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, (Throwable)e);
        }
    }

    @Override
    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException {
        boolean localTx = this.getOrCreateTransaction(context);
        Connection conn = this.getConnection(context);
        try {
            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
            stmt.setLong(1, messageId);
            ResultSet rs = stmt.executeQuery();
            if (rs.next()) {
                final AMQShortString exchange = new AMQShortString(rs.getString(1));
                final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
                final boolean mandatory = rs.getShort(3) != 0;
                final boolean immediate = rs.getShort(4) != 0;
                MessagePublishInfo info = new MessagePublishInfo(){

                    public AMQShortString getExchange() {
                        return exchange;
                    }

                    public void setExchange(AMQShortString exchange2) {
                    }

                    public boolean isImmediate() {
                        return immediate;
                    }

                    public boolean isMandatory() {
                        return mandatory;
                    }

                    public AMQShortString getRoutingKey() {
                        return routingKey;
                    }
                };
                Blob dataAsBlob = rs.getBlob(5);
                byte[] dataAsBytes = dataAsBlob.getBytes(1L, (int)dataAsBlob.length());
                ByteBuffer buf = ByteBuffer.wrap((byte[])dataAsBytes);
                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer((ByteBuffer)buf, (long)dataAsBytes.length);
                if (localTx) {
                    this.commitTran(context);
                }
                return new MessageMetaData(info, chb, rs.getInt(6));
            }
            if (localTx) {
                this.abortTran(context);
            }
            throw new AMQException("Metadata not found for message with id " + messageId);
        }
        catch (SQLException e) {
            if (localTx) {
                this.abortTran(context);
            }
            throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, (Throwable)e);
        }
    }

    @Override
    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException {
        boolean localTx = this.getOrCreateTransaction(context);
        Connection conn = this.getConnection(context);
        try {
            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
            stmt.setLong(1, messageId);
            stmt.setInt(2, index);
            ResultSet rs = stmt.executeQuery();
            if (rs.next()) {
                Blob dataAsBlob = rs.getBlob(1);
                final int size = (int)dataAsBlob.length();
                byte[] dataAsBytes = dataAsBlob.getBytes(1L, size);
                final ByteBuffer buf = ByteBuffer.wrap((byte[])dataAsBytes);
                ContentChunk cb = new ContentChunk(){

                    public int getSize() {
                        return size;
                    }

                    public ByteBuffer getData() {
                        return buf;
                    }

                    public void reduceToFit() {
                    }
                };
                if (localTx) {
                    this.commitTran(context);
                }
                return cb;
            }
            if (localTx) {
                this.abortTran(context);
            }
            throw new AMQException("Message not found for message with id " + messageId);
        }
        catch (SQLException e) {
            if (localTx) {
                this.abortTran(context);
            }
            throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, (Throwable)e);
        }
    }

    @Override
    public boolean isPersistent() {
        return true;
    }

    private void checkNotClosed() throws MessageStoreClosedException {
        if (this._closed.get()) {
            throw new MessageStoreClosedException();
        }
    }

    private void deliverMessages(StoreContext context, Map<AMQShortString, AMQQueue> queues) throws SQLException, AMQException {
        HashMap<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
        ArrayList<ProcessAction> actions = new ArrayList<ProcessAction>();
        TreeMap<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
        boolean inLocaltran = this.inTran(context);
        Connection conn = null;
        try {
            conn = inLocaltran ? this.getConnection(context) : this.newConnection();
            MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
            long maxId = 1L;
            NonTransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
            while (rs.next()) {
                AMQShortString queueName = new AMQShortString(rs.getString(1));
                AMQQueue queue = queues.get(queueName);
                if (queue == null) {
                    queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, this._virtualHost, null);
                    this._virtualHost.getQueueRegistry().registerQueue(queue);
                    queues.put(queueName, queue);
                }
                long messageId = rs.getLong(2);
                maxId = Math.max(maxId, messageId);
                AMQMessage message = (AMQMessage)msgMap.get(messageId);
                if (message != null) {
                    message.incrementReference();
                } else {
                    message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
                    msgMap.put(messageId, message);
                }
                if (_logger.isDebugEnabled()) {
                    _logger.debug((Object)("On recovery, delivering " + message.getMessageId() + " to " + queue.getName()));
                }
                if (_logger.isInfoEnabled()) {
                    Integer count = (Integer)queueRecoveries.get(queueName);
                    if (count == null) {
                        count = 0;
                    }
                    count = count + 1;
                    queueRecoveries.put(queueName, count);
                }
                actions.add(new ProcessAction(queue, context, message));
            }
            for (ProcessAction action : actions) {
                action.process();
            }
            this._messageId.set(maxId + 1L);
        }
        catch (SQLException e) {
            _logger.error((Object)("Error: " + e), (Throwable)e);
            throw e;
        }
        finally {
            if (inLocaltran && conn != null) {
                conn.close();
            }
        }
        if (_logger.isInfoEnabled()) {
            _logger.info((Object)("Recovered message counts: " + queueRecoveries));
        }
    }

    private Connection getConnection(StoreContext context) {
        return ((ConnectionWrapper)context.getPayload()).getConnection();
    }

    private boolean getOrCreateTransaction(StoreContext context) throws AMQException {
        ConnectionWrapper tx = (ConnectionWrapper)context.getPayload();
        if (tx == null) {
            this.beginTran(context);
            return true;
        }
        return false;
    }

    private synchronized void stateTransition(State requiredState, State newState) throws AMQException {
        if (this._state != requiredState) {
            throw new AMQException("Cannot transition to the state: " + (Object)((Object)newState) + "; need to be in state: " + (Object)((Object)requiredState) + "; currently in state: " + (Object)((Object)this._state));
        }
        this._state = newState;
    }

    private static final class ProcessAction {
        private final AMQQueue _queue;
        private final StoreContext _context;
        private final AMQMessage _message;

        public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message) {
            this._queue = queue;
            this._context = context;
            this._message = message;
        }

        public void process() throws AMQException {
            this._queue.enqueue(this._context, this._message);
        }
    }

    private static final class ConnectionWrapper {
        private final Connection _connection;
        private boolean _requiresCommit;

        public ConnectionWrapper(Connection conn) {
            this._connection = conn;
        }

        public void setRequiresCommit() {
            this._requiresCommit = true;
        }

        public boolean requiresCommit() {
            return this._requiresCommit;
        }

        public Connection getConnection() {
            return this._connection;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static enum State {
        INITIAL,
        CONFIGURING,
        RECOVERING,
        STARTED,
        CLOSING,
        CLOSED;

    }
}

