/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.Message;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverTxSlowAckTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTxSlowAckTest.class);
    private static final String QUEUE_IN = "IN";
    private static final String QUEUE_OUT = "OUT";
    private static final String MESSAGE_TEXT = "Test message ";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    final int prefetch = 1;
    BrokerService broker;

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup);
        this.broker.start();
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        return this.createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = new BrokerService();
        this.broker.addConnector(bindAddress);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setOptimizedDispatch(true);
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        return this.broker;
    }

    @Test
    public void testFailoverDuringAckRollsback() throws Exception {
        this.broker = this.createBroker(true);
        final ExecutorService executorService = Executors.newFixedThreadPool(2);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){
            int sendCount = 0;

            public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                ++this.sendCount;
                if (this.sendCount > 1) {
                    executorService.execute(new Runnable(){

                        @Override
                        public void run() {
                            LOG.info("Stopping broker before commit...");
                            try {
                                FailoverTxSlowAckTest.this.broker.stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        this.url = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        cf.setWatchTopicAdvisories(false);
        cf.setDispatchAsync(false);
        ActiveMQConnection connection = (ActiveMQConnection)cf.createConnection();
        connection.start();
        final Session producerSession = connection.createSession(false, 1);
        final Queue in = producerSession.createQueue("IN?consumer.prefetchSize=1");
        final Session consumerSession = connection.createSession(true, 1);
        Queue out = consumerSession.createQueue(QUEUE_OUT);
        final MessageProducer consumerProducer = consumerSession.createProducer((Destination)out);
        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
        final CountDownLatch messagesReceived = new CountDownLatch(1);
        final CountDownLatch brokerDisconnectedLatch = new CountDownLatch(1);
        final AtomicInteger receivedCount = new AtomicInteger();
        final AtomicBoolean gotDisconnect = new AtomicBoolean();
        final AtomicBoolean gotReconnected = new AtomicBoolean();
        MessageConsumer testConsumer = consumerSession.createConsumer((Destination)in);
        testConsumer.setMessageListener(new MessageListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onMessage(javax.jms.Message message) {
                LOG.info("consume one and commit");
                Assert.assertNotNull((String)"got message", (Object)message);
                receivedCount.incrementAndGet();
                messagesReceived.countDown();
                try {
                    TimeUnit.SECONDS.sleep(1L);
                    consumerProducer.send(message);
                    ((ActiveMQSession)consumerSession).getTransactionContext().addSynchronization(new Synchronization(){

                        public void beforeEnd() throws Exception {
                            LOG.info("waiting for failover reconnect");
                            gotDisconnect.set(Wait.waitFor((Wait.Condition)new Wait.Condition(){

                                public boolean isSatisified() throws Exception {
                                    return !((ActiveMQSession)consumerSession).getConnection().getTransport().isConnected();
                                }
                            }));
                            brokerDisconnectedLatch.countDown();
                            LOG.info("got disconnect");
                            gotReconnected.set(Wait.waitFor((Wait.Condition)new Wait.Condition(){

                                public boolean isSatisified() throws Exception {
                                    return ((ActiveMQSession)consumerSession).getConnection().getTransport().isConnected();
                                }
                            }));
                            LOG.info("got failover reconnect");
                        }
                    });
                    consumerSession.commit();
                    LOG.info("done commit");
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    commitDoneLatch.countDown();
                }
            }
        });
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("producer started");
                try {
                    FailoverTxSlowAckTest.this.produceMessage(producerSession, in, 1L);
                }
                catch (IllegalStateException illegalStateException) {
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    Assert.fail((String)("unexpceted ex on producer: " + (Object)((Object)e)));
                }
                LOG.info("producer done");
            }
        });
        this.broker.waitUntilStopped();
        brokerDisconnectedLatch.await();
        this.broker = this.createBroker(false, this.url);
        this.broker.start();
        Assert.assertTrue((String)"message was recieved ", (boolean)messagesReceived.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"tx complete through failover", (boolean)commitDoneLatch.await(40L, TimeUnit.SECONDS));
        Assert.assertEquals((String)"one delivery", (long)1L, (long)receivedCount.get());
        Assert.assertTrue((String)"got disconnect/reconnect", (boolean)gotDisconnect.get());
        Assert.assertTrue((String)"got reconnect", (boolean)gotReconnected.get());
        Assert.assertNull((String)"No message produced", (Object)this.receiveMessage(cf, out));
    }

    private javax.jms.Message receiveMessage(ActiveMQConnectionFactory cf, Queue destination) throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)cf.createConnection();
        connection.start();
        Session consumerSession = connection.createSession(true, 0);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)destination);
        javax.jms.Message msg = consumer.receive(4000L);
        consumerSession.commit();
        connection.close();
        return msg;
    }

    private void produceMessage(Session producerSession, Queue destination, long count) throws JMSException {
        MessageProducer producer = producerSession.createProducer((Destination)destination);
        int i = 0;
        while ((long)i < count) {
            TextMessage message = producerSession.createTextMessage(MESSAGE_TEXT + i);
            producer.send((javax.jms.Message)message, 2, 4, 500L);
            ++i;
        }
        producer.close();
    }
}

