package org.apache.activemq.bugs;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ3274Test.class */
public class AMQ3274Test {
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class);
    protected static int Next_broker_num = 0;
    protected int nextEchoId = 0;
    protected boolean testError = false;
    protected int echoResponseFill = 0;
    protected EmbeddedTcpBroker broker1 = new EmbeddedTcpBroker();
    protected EmbeddedTcpBroker broker2 = new EmbeddedTcpBroker();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ3274Test$EchoService.class */
    public class EchoService extends Thread {
        protected String destName;
        protected Connection jmsConn;
        protected Session sess;
        protected MessageConsumer msg_cons;
        protected boolean Shutdown_ind;
        protected Destination req_dest;
        protected Destination resp_dest;
        protected MessageProducer msg_prod;
        protected CountDownLatch waitShutdown;

        public EchoService(String str, Connection connection) throws Exception {
            this.destName = str;
            this.jmsConn = connection;
            this.Shutdown_ind = false;
            this.sess = this.jmsConn.createSession(false, 1);
            this.req_dest = this.sess.createQueue(this.destName);
            this.msg_cons = this.sess.createConsumer(this.req_dest);
            this.jmsConn.start();
            this.waitShutdown = new CountDownLatch(1);
        }

        public EchoService(AMQ3274Test aMQ3274Test, String str, String str2) throws Exception {
            this(str, (Connection) ActiveMQConnection.makeConnection(str2));
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    AMQ3274Test.LOG.info("STARTING ECHO SERVICE");
                    while (!this.Shutdown_ind) {
                        Message receive = this.msg_cons.receive(100L);
                        if (receive != null) {
                            if (AMQ3274Test.LOG.isDebugEnabled()) {
                                AMQ3274Test.LOG.debug("ECHO request message " + receive.toString());
                            }
                            this.resp_dest = receive.getJMSReplyTo();
                            if (this.resp_dest != null) {
                                this.msg_prod = this.sess.createProducer(this.resp_dest);
                                this.msg_prod.send(receive);
                                this.msg_prod.close();
                                this.msg_prod = null;
                            } else {
                                AMQ3274Test.LOG.warn("invalid request: no reply-to destination given");
                            }
                        }
                    }
                    AMQ3274Test.LOG.info("shutting down test echo service");
                    try {
                        this.jmsConn.stop();
                    } catch (JMSException e) {
                        AMQ3274Test.LOG.warn("error on shutting down JMS connection", e);
                    }
                    synchronized (this) {
                        this.waitShutdown.countDown();
                    }
                } catch (Exception e2) {
                    AMQ3274Test.LOG.error((String) null, e2);
                    AMQ3274Test.LOG.info("shutting down test echo service");
                    try {
                        this.jmsConn.stop();
                    } catch (JMSException e3) {
                        AMQ3274Test.LOG.warn("error on shutting down JMS connection", e3);
                    }
                    synchronized (this) {
                        this.waitShutdown.countDown();
                    }
                }
            } catch (Throwable th) {
                AMQ3274Test.LOG.info("shutting down test echo service");
                try {
                    this.jmsConn.stop();
                } catch (JMSException e4) {
                    AMQ3274Test.LOG.warn("error on shutting down JMS connection", e4);
                }
                synchronized (this) {
                    this.waitShutdown.countDown();
                    throw th;
                }
            }
        }

        public void shutdown() {
            CountDownLatch countDownLatch;
            synchronized (this) {
                countDownLatch = this.waitShutdown;
            }
            this.Shutdown_ind = true;
            try {
                if (countDownLatch == null) {
                    AMQ3274Test.LOG.info("echo service shutdown: service does not appear to be active");
                } else if (countDownLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                    AMQ3274Test.LOG.info("echo service shutdown complete");
                } else {
                    AMQ3274Test.LOG.warn("timeout waiting for echo service shutdown");
                }
            } catch (InterruptedException e) {
                AMQ3274Test.LOG.warn("interrupted while waiting for echo service shutdown");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ3274Test$EmbeddedTcpBroker.class */
    public class EmbeddedTcpBroker {
        protected BrokerService brokerSvc = new BrokerService();
        protected int brokerNum;
        protected String brokerName;
        protected String brokerId;
        protected int port;
        protected String tcpUrl;

        public EmbeddedTcpBroker() throws Exception {
            synchronized (getClass()) {
                this.brokerNum = AMQ3274Test.Next_broker_num;
                AMQ3274Test.Next_broker_num++;
            }
            this.brokerName = "broker" + this.brokerNum;
            this.brokerId = "b" + this.brokerNum;
            this.brokerSvc.setBrokerName(this.brokerName);
            this.brokerSvc.setBrokerId(this.brokerId);
            this.brokerSvc.setPersistent(false);
            this.brokerSvc.setUseJmx(false);
            this.tcpUrl = this.brokerSvc.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).getPublishableConnectString();
        }

        public Connection createConnection() throws URISyntaxException, JMSException {
            return ActiveMQConnection.makeConnection(this.tcpUrl);
        }

        public String getConnectionUrl() {
            return this.tcpUrl;
        }

        public void coreConnectTo(EmbeddedTcpBroker embeddedTcpBroker, boolean z) throws Exception {
            makeConnectionTo(embeddedTcpBroker, z, true);
            makeConnectionTo(embeddedTcpBroker, z, false);
        }

        public void start() throws Exception {
            this.brokerSvc.start();
        }

        public void stop() throws Exception {
            this.brokerSvc.stop();
        }

        protected void makeConnectionTo(EmbeddedTcpBroker embeddedTcpBroker, boolean z, boolean z2) throws Exception {
            String str;
            ActiveMQDestination createDestination;
            DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + embeddedTcpBroker.tcpUrl + ")"));
            discoveryNetworkConnector.setDuplex(z);
            if (z2) {
                discoveryNetworkConnector.setConduitSubscriptions(false);
            } else {
                discoveryNetworkConnector.setConduitSubscriptions(true);
            }
            discoveryNetworkConnector.setNetworkTTL(5);
            discoveryNetworkConnector.setSuppressDuplicateQueueSubscriptions(true);
            discoveryNetworkConnector.setDecreaseNetworkConsumerPriority(true);
            discoveryNetworkConnector.setBridgeTempDestinations(true);
            if (z2) {
                str = "queue";
                createDestination = ActiveMQDestination.createDestination(">", (byte) 1);
            } else {
                str = "topic";
                createDestination = ActiveMQDestination.createDestination(">", (byte) 2);
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(createDestination);
            discoveryNetworkConnector.setExcludedDestinations(arrayList);
            if (z) {
                discoveryNetworkConnector.setName(this.brokerId + "<-" + str + "->" + embeddedTcpBroker.brokerId);
            } else {
                discoveryNetworkConnector.setName(this.brokerId + "-" + str + "->" + embeddedTcpBroker.brokerId);
            }
            this.brokerSvc.addNetworkConnector(discoveryNetworkConnector);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ3274Test$MessageClient.class */
    public class MessageClient extends Thread {
        protected MessageConsumer msgCons;
        protected boolean shutdownInd;
        protected int expectedCount;
        protected boolean haveFirstSeq;
        protected int lastSeq = 0;
        protected int msgCount = 0;
        protected CountDownLatch shutdownLatch = new CountDownLatch(1);

        public MessageClient(MessageConsumer messageConsumer, int i) {
            this.msgCons = messageConsumer;
            this.expectedCount = i * (AMQ3274Test.this.echoResponseFill + 1);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            CountDownLatch countDownLatch;
            try {
                synchronized (this) {
                    countDownLatch = this.shutdownLatch;
                }
                this.shutdownInd = false;
                processMessages();
                countDownLatch.countDown();
            } catch (Exception e) {
                AMQ3274Test.LOG.error("message client error", e);
            }
        }

        public void waitShutdown(long j) {
            CountDownLatch countDownLatch;
            try {
                synchronized (this) {
                    countDownLatch = this.shutdownLatch;
                }
                if (countDownLatch != null) {
                    countDownLatch.await(j, TimeUnit.MILLISECONDS);
                } else {
                    AMQ3274Test.LOG.info("echo client shutdown: client does not appear to be active");
                }
            } catch (InterruptedException e) {
                AMQ3274Test.LOG.warn("wait for message client shutdown interrupted", e);
            }
        }

        public boolean shutdown() {
            boolean z;
            if (!this.shutdownInd) {
                this.shutdownInd = true;
            }
            waitShutdown(200L);
            synchronized (this) {
                z = this.shutdownLatch == null || this.shutdownLatch.getCount() == 0;
            }
            return z;
        }

        public int getNumMsgReceived() {
            return this.msgCount;
        }

        protected void processMessages() throws Exception {
            this.haveFirstSeq = false;
            while (!this.shutdownInd && !AMQ3274Test.this.testError) {
                Message receive = this.msgCons.receive(100L);
                if (receive != null) {
                    this.msgCount++;
                    checkMessage(receive);
                }
            }
        }

        protected void checkMessage(Message message) throws Exception {
            AMQ3274Test.LOG.debug("received message " + AMQ3274Test.fmtMsgInfo(message));
            if (message.propertyExists("SEQ")) {
                int intProperty = message.getIntProperty("SEQ");
                if (this.haveFirstSeq && intProperty != this.lastSeq + 1) {
                    AMQ3274Test.LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(this.lastSeq + 1) + " but have " + Integer.toString(intProperty));
                    AMQ3274Test.this.testError = true;
                }
                this.lastSeq = intProperty;
                if (this.msgCount > this.expectedCount) {
                    AMQ3274Test.LOG.warn("*** have more messages than expected; have " + this.msgCount + "; expect " + this.expectedCount);
                    AMQ3274Test.this.testError = true;
                }
            }
            if (message.propertyExists("end-of-response")) {
                AMQ3274Test.LOG.trace("received end-of-response message");
                this.shutdownInd = true;
            }
        }
    }

    public AMQ3274Test() throws Exception {
        this.broker1.coreConnectTo(this.broker2, true);
        this.broker2.coreConnectTo(this.broker1, true);
    }

    public void logMessage(String str) {
        System.out.println(str);
        System.out.flush();
    }

    public void testMessages(Session session, MessageProducer messageProducer, Destination destination, int i) throws Exception {
        MessageConsumer createConsumer = session.createConsumer(destination);
        MessageClient messageClient = new MessageClient(createConsumer, i);
        messageClient.start();
        for (int i2 = 0; i2 < i && !this.testError; i2++) {
            TextMessage createTextMessage = session.createTextMessage("MSG AAAA " + i2);
            createTextMessage.setIntProperty("SEQ", 100 + i2);
            createTextMessage.setStringProperty("TEST", "TOPO");
            createTextMessage.setJMSReplyTo(destination);
            if (i2 == i - 1) {
                createTextMessage.setBooleanProperty("end-of-response", true);
            }
            messageProducer.send(createTextMessage);
        }
        messageClient.waitShutdown(5000L);
        if (messageClient.shutdown()) {
            LOG.debug("Consumer client shutdown complete");
        } else {
            LOG.debug("Consumer client shutdown incomplete!!!");
        }
        int i3 = i * (this.echoResponseFill + 1);
        if (messageClient.getNumMsgReceived() == i3) {
            LOG.info("Have " + i3 + " messages, as-expected");
        } else {
            LOG.error("Have " + messageClient.getNumMsgReceived() + " messages; expected " + i3);
            this.testError = true;
        }
        createConsumer.close();
    }

    public void testOneDest(Connection connection, Session session, Destination destination, String str, String str2, int i) throws Exception {
        int i2;
        synchronized (this) {
            i2 = this.nextEchoId;
            this.nextEchoId++;
        }
        String str3 = "echo.queue." + i2;
        LOG.trace("destroying the echo queue in case an old one exists");
        removeQueue(connection, str3);
        EchoService echoService = new EchoService(this, str3, str);
        echoService.start();
        LOG.trace("Creating echo queue and producer");
        MessageProducer createProducer = session.createProducer(session.createQueue(str3));
        testMessages(session, createProducer, destination, i);
        echoService.shutdown();
        createProducer.close();
    }

    public void testTempTopic(String str, String str2) throws Exception {
        LOG.info("TESTING TEMP TOPICS " + str + " -> " + str2 + " (5 messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Creating destination");
        testOneDest(createConnection, createSession, createSession.createTemporaryTopic(), str, str2, 5);
        createSession.close();
        createConnection.close();
    }

    public void testTopic(String str, String str2) throws Exception {
        LOG.info("TESTING TOPICS " + str + " -> " + str2 + " (5 messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Removing existing Topic");
        removeTopic(createConnection, "topotest2.perm.topic");
        LOG.trace("Creating Topic, topotest2.perm.topic");
        testOneDest(createConnection, createSession, createSession.createTopic("topotest2.perm.topic"), str, str2, 5);
        removeTopic(createConnection, "topotest2.perm.topic");
        createSession.close();
        createConnection.close();
    }

    public void testTempQueue(String str, String str2) throws Exception {
        LOG.info("TESTING TEMP QUEUES " + str + " -> " + str2 + " (5 messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Creating destination");
        testOneDest(createConnection, createSession, createSession.createTemporaryQueue(), str, str2, 5);
        createSession.close();
        createConnection.close();
    }

    public void testQueue(String str, String str2) throws Exception {
        LOG.info("TESTING QUEUES " + str + " -> " + str2 + " (5 messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Removing existing Queue");
        removeQueue(createConnection, "topotest2.perm.queue");
        LOG.trace("Creating Queue, topotest2.perm.queue");
        testOneDest(createConnection, createSession, createSession.createQueue("topotest2.perm.queue"), str, str2, 5);
        removeQueue(createConnection, "topotest2.perm.queue");
        createSession.close();
        createConnection.close();
    }

    @Test
    public void run() throws Exception {
        this.testError = false;
        Thread thread = new Thread() { // from class: org.apache.activemq.bugs.AMQ3274Test.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AMQ3274Test.this.broker1.start();
                } catch (Exception e) {
                    AMQ3274Test.LOG.error((String) null, e);
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.activemq.bugs.AMQ3274Test.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AMQ3274Test.this.broker2.start();
                } catch (Exception e) {
                    AMQ3274Test.LOG.error((String) null, e);
                }
            }
        };
        thread.start();
        thread2.start();
        thread.join();
        thread2.join();
        if (!this.testError) {
            testTempTopic(this.broker1.getConnectionUrl(), this.broker2.getConnectionUrl());
        }
        if (!this.testError) {
            testTempQueue(this.broker1.getConnectionUrl(), this.broker2.getConnectionUrl());
        }
        if (!this.testError) {
            testTopic(this.broker1.getConnectionUrl(), this.broker2.getConnectionUrl());
        }
        if (!this.testError) {
            testQueue(this.broker1.getConnectionUrl(), this.broker2.getConnectionUrl());
        }
        Thread.sleep(100L);
        shutdown();
        Assert.assertTrue(!this.testError);
    }

    public void shutdown() throws Exception {
        this.broker1.stop();
        this.broker2.stop();
    }

    public static void main(String[] strArr) {
        try {
            new AMQ3274Test().run();
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error((String) null, e);
            System.exit(0);
        }
    }

    protected Connection createConnection(String str) throws Exception {
        return ActiveMQConnection.makeConnection(str);
    }

    protected static void removeQueue(Connection connection, String str) throws Exception {
        if (connection instanceof ActiveMQConnection) {
            ((ActiveMQConnection) connection).destroyDestination(ActiveMQDestination.createDestination(str, (byte) 1));
        }
    }

    protected static void removeTopic(Connection connection, String str) throws Exception {
        if (connection instanceof ActiveMQConnection) {
            ((ActiveMQConnection) connection).destroyDestination(ActiveMQDestination.createDestination(str, (byte) 2));
        }
    }

    public static String fmtMsgInfo(Message message) throws Exception {
        new StringBuilder();
        StringBuilder sb = new StringBuilder();
        if (message instanceof TextMessage) {
            sb.append(((TextMessage) message).getText());
        } else {
            sb.append("[");
            sb.append(message.getClass().getName());
            sb.append("]");
        }
        Enumeration propertyNames = message.getPropertyNames();
        while (propertyNames.hasMoreElements()) {
            String str = (String) propertyNames.nextElement();
            sb.append("; ");
            sb.append(str);
            sb.append("=");
            sb.append(message.getStringProperty(str));
        }
        return sb.toString();
    }
}
