/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.ft;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbRestartJDBCQueueTest
extends JmsTopicSendReceiveWithTwoConnectionsTest
implements ExceptionListener {
    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueTest.class);
    public boolean transactedSends = false;
    public int failureCount = 25;
    int inflightMessageCount = 0;
    EmbeddedDataSource sharedDs;
    BrokerService broker;
    final CountDownLatch restartDBLatch = new CountDownLatch(1);

    @Override
    protected void setUp() throws Exception {
        this.setAutoFail(true);
        this.topic = false;
        this.verbose = true;
        this.sharedDs = (EmbeddedDataSource)DataSourceServiceSupport.createDataSource((String)IOHelper.getDefaultDataDirectory());
        this.broker = new BrokerService();
        DefaultIOExceptionHandler handler = new DefaultIOExceptionHandler();
        handler.setIgnoreSQLExceptions(false);
        handler.setStopStartConnectors(true);
        this.broker.setIoExceptionHandler((IOExceptionHandler)handler);
        this.broker.addConnector("tcp://localhost:0");
        this.broker.setUseJmx(false);
        this.broker.setPersistent(true);
        this.broker.setDeleteAllMessagesOnStartup(true);
        JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
        persistenceAdapter.setDataSource((DataSource)this.sharedDs);
        persistenceAdapter.setUseLock(false);
        persistenceAdapter.setLockKeepAlivePeriod(500L);
        persistenceAdapter.getLocker().setLockAcquireSleepInterval(500L);
        this.broker.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
        this.broker.start();
        super.setUp();
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        this.broker.stop();
    }

    @After
    public void shutDownDerby() {
        DataSourceServiceSupport.shutdownDefaultDataSource((DataSource)this.sharedDs);
    }

    @Override
    protected Session createSendSession(Connection sendConnection) throws Exception {
        if (this.transactedSends) {
            return sendConnection.createSession(true, 0);
        }
        return sendConnection.createSession(false, 1);
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("failover://" + ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        f.setExceptionListener((ExceptionListener)this);
        return f;
    }

    @Override
    protected void messageSent() throws Exception {
        if (++this.inflightMessageCount == this.failureCount) {
            LOG.info("STOPPING DB!@!!!!");
            final EmbeddedDataSource ds = this.sharedDs;
            ds.setShutdownDatabase("shutdown");
            try {
                ds.getConnection();
            }
            catch (Exception exception) {
                // empty catch block
            }
            LOG.info("DB STOPPED!@!!!!");
            Thread dbRestartThread = new Thread("db-re-start-thread"){

                @Override
                public void run() {
                    LOG.info("Sleeping for 10 seconds before allowing db restart");
                    try {
                        DbRestartJDBCQueueTest.this.restartDBLatch.await(10L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ds.setShutdownDatabase("false");
                    LOG.info("DB RESTARTED!@!!!!");
                }
            };
            dbRestartThread.start();
        }
    }

    @Override
    protected void sendToProducer(MessageProducer producer, Destination producerDestination, Message message) throws JMSException {
        boolean sent = false;
        do {
            try {
                producer.send(producerDestination, message);
                if (this.transactedSends && ((this.inflightMessageCount + 1) % 10 == 0 || this.inflightMessageCount + 1 >= this.messageCount)) {
                    LOG.info("committing on send: " + this.inflightMessageCount + " message: " + message);
                    this.session.commit();
                }
                sent = true;
            }
            catch (JMSException e) {
                LOG.info("Exception on producer send:", (Throwable)e);
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        } while (!sent);
    }

    public void onException(JMSException exception) {
        LOG.error("exception on connection: ", (Throwable)exception);
    }
}

