/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import java.io.PrintWriter;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.ft.SyncCreateDataSource;
import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCIOExceptionHandlerTest {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerTest.class);
    private static final String TRANSPORT_URL = "tcp://0.0.0.0:0";
    private ActiveMQConnectionFactory factory;
    private ReconnectingEmbeddedDataSource dataSource;
    private BrokerService broker;

    @After
    public void stopDB() {
        if (this.dataSource != null) {
            this.dataSource.stopDB();
        }
    }

    protected BrokerService createBroker(boolean withJMX) throws Exception {
        return this.createBroker("localhost", withJMX, true, true);
    }

    protected BrokerService createBroker(String name, boolean withJMX, boolean leaseLocker, boolean startStopConnectors) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName(name);
        broker.setUseJmx(withJMX);
        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource)jdbc.getDataSource();
        this.dataSource = new ReconnectingEmbeddedDataSource(new SyncCreateDataSource(embeddedDataSource));
        jdbc.setDataSource((DataSource)this.dataSource);
        jdbc.setLockKeepAlivePeriod(1000L);
        if (leaseLocker) {
            LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
            leaseDatabaseLocker.setHandleStartException(true);
            leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
            jdbc.setLocker((Locker)leaseDatabaseLocker);
        }
        broker.setPersistenceAdapter((PersistenceAdapter)jdbc);
        LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
        ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
        ioExceptionHandler.setStopStartConnectors(startStopConnectors);
        broker.setIoExceptionHandler((IOExceptionHandler)ioExceptionHandler);
        String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
        this.factory = new ActiveMQConnectionFactory(connectionUri);
        return broker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStartWithDatabaseDown() throws Exception {
        final AtomicBoolean connectorStarted = new AtomicBoolean(false);
        final AtomicBoolean connectorStopped = new AtomicBoolean(false);
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getMessage().toString().startsWith("JMX consoles can connect to")) {
                    connectorStarted.set(true);
                }
                if (event.getMessage().toString().equals("Stopping jmx connector")) {
                    connectorStopped.set(true);
                }
            }
        };
        org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
        Level previousLevel = rootLogger.getLevel();
        rootLogger.setLevel(Level.DEBUG);
        rootLogger.addAppender((Appender)appender);
        BrokerService broker = new BrokerService();
        broker.getManagementContext().setCreateConnector(true);
        broker.getManagementContext().setCreateMBeanServer(true);
        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource)jdbc.getDataSource();
        this.dataSource = new ReconnectingEmbeddedDataSource(new SyncCreateDataSource(embeddedDataSource));
        this.dataSource.stopDB();
        jdbc.setDataSource((DataSource)this.dataSource);
        jdbc.setLockKeepAlivePeriod(1000L);
        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
        leaseDatabaseLocker.setHandleStartException(true);
        leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
        jdbc.setLocker((Locker)leaseDatabaseLocker);
        broker.setPersistenceAdapter((PersistenceAdapter)jdbc);
        LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
        ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
        ioExceptionHandler.setStopStartConnectors(true);
        broker.setIoExceptionHandler((IOExceptionHandler)ioExceptionHandler);
        try {
            broker.start();
            Assert.fail((String)"Broker should have been stopped!");
        }
        catch (Exception e) {
            Thread.sleep(5000L);
            Assert.assertTrue((String)"Broker should have been stopped!", (boolean)broker.isStopped());
            Thread[] threads = ThreadExplorer.listThreads();
            for (int i = 0; i < threads.length; ++i) {
                if (!threads[i].getName().startsWith("IOExceptionHandler")) continue;
                Assert.fail((String)"IOExceptionHanlder still active");
            }
            if (connectorStarted.get() && !connectorStopped.get()) {
                Assert.fail((String)"JMX Server Connector should have been stopped!");
            }
        }
        finally {
            this.dataSource = null;
            broker = null;
            rootLogger.removeAppender((Appender)appender);
            rootLogger.setLevel(previousLevel);
        }
    }

    @Test
    public void testRecoverWithOutJMX() throws Exception {
        this.recoverFromDisconnectDB(false);
    }

    @Test
    public void testRecoverWithJMX() throws Exception {
        this.recoverFromDisconnectDB(true);
    }

    @Test
    public void testSlaveStoppedLease() throws Exception {
        this.testSlaveStopped(true);
    }

    @Test
    public void testSlaveStoppedDefault() throws Exception {
        this.testSlaveStopped(false);
    }

    public void testSlaveStopped(final boolean lease) throws Exception {
        final BrokerService master = this.createBroker("master", true, lease, false);
        master.start();
        master.waitUntilStarted();
        final AtomicReference slave = new AtomicReference();
        Thread slaveThread = new Thread(){

            @Override
            public void run() {
                try {
                    BrokerService broker = new BrokerService();
                    broker.setBrokerName("slave");
                    JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
                    jdbc.setDataSource((DataSource)JDBCIOExceptionHandlerTest.this.dataSource);
                    jdbc.setLockKeepAlivePeriod(1000L);
                    if (lease) {
                        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
                        leaseDatabaseLocker.setHandleStartException(true);
                        leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
                        jdbc.setLocker((Locker)leaseDatabaseLocker);
                    }
                    broker.setPersistenceAdapter((PersistenceAdapter)jdbc);
                    LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
                    ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
                    ioExceptionHandler.setStopStartConnectors(false);
                    broker.setIoExceptionHandler((IOExceptionHandler)ioExceptionHandler);
                    slave.set(broker);
                    broker.start();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        slaveThread.start();
        Thread.sleep(5000L);
        this.dataSource.stopDB();
        Assert.assertTrue((String)"Master hasn't been stopped", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return master.isStopped();
            }
        }));
        Assert.assertTrue((String)"Slave hasn't been stopped", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return ((BrokerService)slave.get()).isStopped();
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recoverFromDisconnectDB(boolean withJMX) throws Exception {
        try {
            this.broker = this.createBroker(withJMX);
            this.broker.start();
            this.broker.waitUntilStarted();
            this.dataSource.stopDB();
            TimeUnit.SECONDS.sleep(3L);
            this.checkTransportConnectorStopped();
            this.dataSource.restartDB();
            Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    LOG.debug("*** checking connector to start...");
                    try {
                        JDBCIOExceptionHandlerTest.this.checkTransportConnectorStarted();
                        return true;
                    }
                    catch (Throwable t) {
                        LOG.debug(t.toString());
                        return false;
                    }
                }
            });
        }
        finally {
            LOG.debug("*** broker is stopping...");
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    private void checkTransportConnectorStopped() {
        try {
            this.factory.createConnection();
            Assert.fail((String)"Transport connector should be stopped");
        }
        catch (Exception ex) {
            LOG.debug(" checkTransportConnectorStopped() threw", (Throwable)ex);
        }
    }

    private void checkTransportConnectorStarted() {
        try {
            Connection conn = this.factory.createConnection();
            conn.close();
        }
        catch (Exception ex) {
            LOG.debug("checkTransportConnectorStarted() threw", (Throwable)ex);
            Assert.fail((String)"Transport connector should have been started");
        }
    }

    public class ReconnectingEmbeddedDataSource
    implements DataSource {
        private SyncCreateDataSource realDatasource;

        public ReconnectingEmbeddedDataSource(SyncCreateDataSource datasource) {
            this.realDatasource = datasource;
        }

        @Override
        public PrintWriter getLogWriter() throws SQLException {
            return this.realDatasource.getLogWriter();
        }

        @Override
        public void setLogWriter(PrintWriter out) throws SQLException {
            this.realDatasource.setLogWriter(out);
        }

        @Override
        public void setLoginTimeout(int seconds) throws SQLException {
            this.realDatasource.setLoginTimeout(seconds);
        }

        @Override
        public int getLoginTimeout() throws SQLException {
            return this.realDatasource.getLoginTimeout();
        }

        @Override
        public <T> T unwrap(Class<T> iface) throws SQLException {
            return this.unwrap(iface);
        }

        @Override
        public boolean isWrapperFor(Class<?> iface) throws SQLException {
            return this.isWrapperFor(iface);
        }

        @Override
        public java.sql.Connection getConnection() throws SQLException {
            return this.realDatasource.getConnection();
        }

        @Override
        public java.sql.Connection getConnection(String username, String password) throws SQLException {
            return this.getConnection(username, password);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void restartDB() throws Exception {
            SyncCreateDataSource existingDataSource;
            EmbeddedDataSource newDatasource = (EmbeddedDataSource)DataSourceServiceSupport.createDataSource((String)JDBCIOExceptionHandlerTest.this.broker.getDataDirectoryFile().getCanonicalPath());
            newDatasource.getConnection();
            LOG.info("*** DB restarted now...");
            SyncCreateDataSource syncCreateDataSource = existingDataSource = this.realDatasource;
            synchronized (syncCreateDataSource) {
                this.realDatasource = new SyncCreateDataSource(newDatasource);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void stopDB() {
            LOG.info("***DB is being shutdown...");
            SyncCreateDataSource syncCreateDataSource = this.realDatasource;
            synchronized (syncCreateDataSource) {
                DataSourceServiceSupport.shutdownDefaultDataSource((DataSource)this.realDatasource.getDelegate());
            }
        }

        @Override
        public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
            return null;
        }
    }
}

