/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.jms.JMSSecurityException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.net.ServerSocketFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.LinkStealingTest;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.transport.TransportListener;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverStateTrackingTest {
    private static final Logger LOG = LoggerFactory.getLogger(LinkStealingTest.class);
    private BrokerService brokerService;
    private int serverPort;
    private ActiveMQConnectionFactory cf;
    private String connectionURI;
    private ActiveMQConnection connection;
    private final AtomicLong consumerCounter = new AtomicLong();
    private final AtomicLong producerCounter = new AtomicLong();

    @Before
    public void setUp() throws Exception {
        this.serverPort = this.getProxyPort();
        this.createAuthenticatingBroker();
        this.connectionURI = "failover:(tcp://0.0.0.0:" + this.serverPort + ")?jms.watchTopicAdvisories=false";
        this.cf = new ActiveMQConnectionFactory(this.connectionURI);
        this.brokerService.start();
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.connection = null;
        }
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService = null;
        }
    }

    @Test
    public void testUnauthorizedConsumerIsNotRecreated() throws Exception {
        final CountDownLatch connectionDropped = new CountDownLatch(1);
        final CountDownLatch connectionRestored = new CountDownLatch(1);
        this.connection = (ActiveMQConnection)this.cf.createConnection();
        this.connection.addTransportListener(new TransportListener(){

            public void transportResumed() {
                if (connectionDropped.getCount() == 0L) {
                    connectionRestored.countDown();
                }
            }

            public void transportInterupted() {
                connectionDropped.countDown();
            }

            public void onException(IOException error) {
            }

            public void onCommand(Object command) {
            }
        });
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("testQueue");
        try {
            session.createConsumer((Destination)queue);
            Assert.fail((String)"Should have failed to create this consumer");
        }
        catch (JMSSecurityException jMSSecurityException) {
            // empty catch block
        }
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        Assert.assertTrue((String)"Connection should be interrupted", (boolean)connectionDropped.await(10L, TimeUnit.SECONDS));
        this.createTrackingBroker();
        this.brokerService.start();
        Assert.assertTrue((String)"Connection should be reconnected", (boolean)connectionRestored.await(10L, TimeUnit.SECONDS));
        try {
            session.createConsumer((Destination)queue);
        }
        catch (JMSSecurityException ex) {
            Assert.fail((String)"Should have been able to create this consumer");
        }
        Assert.assertEquals((long)1L, (long)this.consumerCounter.get());
    }

    @Test
    public void testUnauthorizedProducerIsNotRecreated() throws Exception {
        final CountDownLatch connectionDropped = new CountDownLatch(1);
        final CountDownLatch connectionRestored = new CountDownLatch(1);
        this.connection = (ActiveMQConnection)this.cf.createConnection();
        this.connection.addTransportListener(new TransportListener(){

            public void transportResumed() {
                LOG.debug("Connection restored");
                if (connectionDropped.getCount() == 0L) {
                    connectionRestored.countDown();
                }
            }

            public void transportInterupted() {
                LOG.debug("Connection interrupted");
                connectionDropped.countDown();
            }

            public void onException(IOException error) {
            }

            public void onCommand(Object command) {
            }
        });
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("testQueue");
        try {
            session.createProducer((Destination)queue);
            Assert.fail((String)"Should have failed to create this producer");
        }
        catch (JMSSecurityException jMSSecurityException) {
            // empty catch block
        }
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        Assert.assertTrue((String)"Connection should be interrupted", (boolean)connectionDropped.await(10L, TimeUnit.SECONDS));
        this.createTrackingBroker();
        this.brokerService.start();
        Assert.assertTrue((String)"Connection should be reconnected", (boolean)connectionRestored.await(10L, TimeUnit.SECONDS));
        try {
            session.createProducer((Destination)queue);
        }
        catch (JMSSecurityException ex) {
            Assert.fail((String)"Should have been able to create this producer");
        }
        Assert.assertEquals((long)1L, (long)this.producerCounter.get());
    }

    private void createAuthenticatingBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
                throw new SecurityException();
            }

            public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
                throw new SecurityException();
            }
        }});
        this.brokerService.addConnector("tcp://0.0.0.0:" + this.serverPort);
    }

    private void createTrackingBroker() throws Exception {
        this.consumerCounter.set(0L);
        this.producerCounter.set(0L);
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
                FailoverStateTrackingTest.this.consumerCounter.incrementAndGet();
                return this.getNext().addConsumer(context, info);
            }

            public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
                FailoverStateTrackingTest.this.producerCounter.incrementAndGet();
                this.getNext().addProducer(context, info);
            }
        }});
        this.brokerService.addConnector("tcp://0.0.0.0:" + this.serverPort);
    }

    protected int getProxyPort() {
        int proxyPort = 61616;
        try (ServerSocket ss = ServerSocketFactory.getDefault().createServerSocket(0);){
            proxyPort = ss.getLocalPort();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        return proxyPort;
    }
}

