/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class AMQ4952Test
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class);
    protected static final int MESSAGE_COUNT = 1;
    protected BrokerService consumerBroker;
    protected BrokerService producerBroker;
    protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store");
    private final CountDownLatch stopConsumerBroker = new CountDownLatch(1);
    private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1);
    private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1);
    private EmbeddedDataSource localDataSource;
    @Parameterized.Parameter(value=0)
    public boolean enableCursorAudit;

    @Parameterized.Parameters(name="enableAudit={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList({Boolean.TRUE}, {Boolean.FALSE});
    }

    @Test
    public void testConsumerBrokerRestart() throws Exception {
        Callable consumeMessageTask = new Callable(){

            public Object call() throws Exception {
                int receivedMessageCount = 0;
                ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false");
                Connection consumerConnection = consumerFactory.createConnection();
                try {
                    consumerConnection.setClientID("consumer");
                    consumerConnection.start();
                    Session consumerSession = consumerConnection.createSession(false, 2);
                    MessageConsumer messageConsumer = consumerSession.createConsumer((Destination)AMQ4952Test.this.QUEUE_NAME);
                    while (true) {
                        TextMessage textMsg;
                        if ((textMsg = (TextMessage)messageConsumer.receive(5000L)) == null) {
                            Integer n = receivedMessageCount;
                            return n;
                        }
                        LOG.info("*** receivedMessageCount {} message has MessageID {} ", (Object)(++receivedMessageCount), (Object)textMsg.getJMSMessageID());
                        if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) {
                            LOG.info("Waiting for restart...");
                            AMQ4952Test.this.consumerRestartedAndMessageForwarded.await(90L, TimeUnit.SECONDS);
                        }
                        textMsg.acknowledge();
                    }
                }
                finally {
                    consumerConnection.close();
                }
            }
        };
        Runnable consumerBrokerResetTask = new Runnable(){

            @Override
            public void run() {
                try {
                    AMQ4952Test.this.stopConsumerBroker.await();
                    LOG.info("********* STOPPING CONSUMER BROKER");
                    AMQ4952Test.this.consumerBroker.stop();
                    AMQ4952Test.this.consumerBroker.waitUntilStopped();
                    LOG.info("***** STARTING CONSUMER BROKER");
                    AMQ4952Test.this.consumerBroker = AMQ4952Test.this.createConsumerBroker(false);
                    LOG.info("***** CONSUMER BROKER STARTED!!");
                    AMQ4952Test.this.consumerBrokerRestarted.countDown();
                    TestCase.assertTrue((String)"message forwarded on time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                        public boolean isSatisified() throws Exception {
                            LOG.info("ProducerBroker totalMessageCount: " + AMQ4952Test.this.producerBroker.getAdminView().getTotalMessageCount());
                            return AMQ4952Test.this.producerBroker.getAdminView().getTotalMessageCount() == 0L;
                        }
                    }));
                    AMQ4952Test.this.consumerRestartedAndMessageForwarded.countDown();
                }
                catch (Exception e) {
                    LOG.error("Exception when stopping/starting the consumerBroker ", (Throwable)e);
                }
            }
        };
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.execute(consumerBrokerResetTask);
        Future numberOfConsumedMessage = executor.submit(consumeMessageTask);
        this.produceMessages();
        int totalMessagesConsumed = (Integer)numberOfConsumedMessage.get();
        StringBuffer contents = new StringBuffer();
        boolean messageInStore = this.isMessageInJDBCStore((DataSource)this.localDataSource, contents);
        LOG.debug("****number of messages received " + totalMessagesConsumed);
        AMQ4952Test.assertEquals((String)"number of messages received", (int)2, (int)totalMessagesConsumed);
        AMQ4952Test.assertEquals((String)"messages left in store", (boolean)true, (boolean)messageInStore);
        AMQ4952Test.assertTrue((String)("message is in dlq: " + contents.toString()), (boolean)contents.toString().contains("DLQ"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void produceMessages() throws JMSException {
        ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false");
        Connection producerConnection = producerFactory.createConnection();
        try {
            producerConnection.setClientID("producer");
            producerConnection.start();
            Session producerSession = producerConnection.createSession(false, 1);
            MessageProducer remoteProducer = producerSession.createProducer((Destination)this.QUEUE_NAME);
            for (int i = 0; 1 > i; ++i) {
                String payload = "test msg " + i;
                TextMessage msg = producerSession.createTextMessage(payload);
                remoteProducer.send((javax.jms.Message)msg);
            }
        }
        finally {
            producerConnection.close();
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.doSetUp();
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
        super.tearDown();
    }

    protected void doTearDown() throws Exception {
        try {
            this.producerBroker.stop();
        }
        catch (Exception ex) {
            // empty catch block
        }
        try {
            this.consumerBroker.stop();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    protected void doSetUp() throws Exception {
        this.producerBroker = this.createProducerBroker();
        this.consumerBroker = this.createConsumerBroker(true);
    }

    protected BrokerService createProducerBroker() throws Exception {
        String[] networkToPorts = new String[]{"2006"};
        HashMap<String, String> networkProps = new HashMap<String, String>();
        networkProps.put("networkTTL", "10");
        networkProps.put("conduitSubscriptions", "true");
        networkProps.put("decreaseNetworkConsumerPriority", "true");
        networkProps.put("dynamicOnly", "true");
        BrokerService broker = new BrokerService();
        broker.getManagementContext().setCreateConnector(false);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setBrokerName("BP");
        broker.setAdvisorySupport(false);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:2003"));
        ArrayList<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
        transportConnectors.add(transportConnector);
        broker.setTransportConnectors(transportConnectors);
        if (networkToPorts != null && networkToPorts.length > 0) {
            StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
            NetworkConnector nc = broker.addNetworkConnector(builder.toString());
            if (networkProps != null) {
                IntrospectionSupport.setProperties((Object)nc, networkProps);
            }
            nc.setStaticallyIncludedDestinations(Arrays.asList(this.QUEUE_NAME));
        }
        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
        EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
        remoteDataSource.setDatabaseName("derbyDBRemoteBroker");
        remoteDataSource.setCreateDatabase("create");
        jdbc.setDataSource((DataSource)remoteDataSource);
        broker.setPersistenceAdapter((PersistenceAdapter)jdbc);
        PolicyEntry policy = new PolicyEntry();
        policy.setQueue(">");
        policy.setEnableAudit(false);
        policy.setUseCache(false);
        policy.setExpireMessagesPeriod(0L);
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policy.setNetworkBridgeFilterFactory((NetworkBridgeFilterFactory)conditionalNetworkBridgeFilterFactory);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(pMap);
        broker.start();
        broker.waitUntilStarted();
        return broker;
    }

    protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception {
        String scheme = "tcp";
        String listenPort = "2006";
        BrokerService broker = new BrokerService();
        broker.getManagementContext().setCreateConnector(false);
        broker.setDeleteAllMessagesOnStartup(deleteMessages);
        broker.setBrokerName("BC");
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
        ArrayList<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
        transportConnectors.add(transportConnector);
        broker.setTransportConnectors(transportConnectors);
        PolicyEntry policy = new PolicyEntry();
        policy.setQueue(">");
        policy.setEnableAudit(this.enableCursorAudit);
        policy.setExpireMessagesPeriod(0L);
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policy.setNetworkBridgeFilterFactory((NetworkBridgeFilterFactory)conditionalNetworkBridgeFilterFactory);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(pMap);
        JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
        EmbeddedDataSource localDataSource = new EmbeddedDataSource();
        localDataSource.setDatabaseName("derbyDBLocalBroker");
        localDataSource.setCreateDatabase("create");
        localJDBCPersistentAdapter.setDataSource((DataSource)localDataSource);
        broker.setPersistenceAdapter((PersistenceAdapter)localJDBCPersistentAdapter);
        if (deleteMessages) {
            broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
        }
        this.localDataSource = localDataSource;
        broker.start();
        broker.waitUntilStarted();
        return broker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
        boolean tableHasData = false;
        String query = "select * from ACTIVEMQ_MSGS";
        java.sql.Connection conn = dataSource.getConnection();
        PreparedStatement s = conn.prepareStatement(query);
        ResultSet set = null;
        try {
            int i;
            StringBuffer headers = new StringBuffer();
            set = s.executeQuery();
            ResultSetMetaData metaData = set.getMetaData();
            for (i = 1; i <= metaData.getColumnCount(); ++i) {
                if (i == 1) {
                    headers.append("||");
                }
                headers.append(metaData.getColumnName(i) + "||");
            }
            LOG.error(headers.toString());
            while (set.next()) {
                tableHasData = true;
                for (i = 1; i <= metaData.getColumnCount(); ++i) {
                    if (i == 1) {
                        stringBuffer.append("|");
                    }
                    stringBuffer.append(set.getString(i) + "|");
                }
                LOG.error(stringBuffer.toString());
            }
        }
        finally {
            try {
                set.close();
            }
            catch (Throwable ignore) {}
            try {
                s.close();
            }
            catch (Throwable ignore) {}
            conn.close();
        }
        return tableHasData;
    }

    class MyTestBroker
    extends BrokerFilter {
        public MyTestBroker(Broker next) {
            super(next);
        }

        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
            super.send(producerExchange, messageSend);
            LOG.error("Stopping broker on send:  " + messageSend.getMessageId().getProducerSequenceId());
            AMQ4952Test.this.stopConsumerBroker.countDown();
            producerExchange.getConnectionContext().setDontSendReponse(true);
        }
    }

    class MyTestPlugin
    implements BrokerPlugin {
        MyTestPlugin() {
        }

        public Broker installPlugin(Broker broker) throws Exception {
            return new MyTestBroker(broker);
        }
    }
}

