package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest.class */
public class RequestReplyTempDestRemovalAdvisoryRaceTest extends JmsMultipleBrokersTestSupport {
    private static final String BROKER_A = "BrokerA";
    private static final String BROKER_B = "BrokerB";
    private static final String BROKER_C = "BrokerC";
    private static final int NUM_RESPONDENTS = 1;
    private static final int NUM_SENDS = 1;
    private static final int RANDOM_SLEEP_FOR_RESPONDENT_MS = 0;
    private static final int RANDOM_SLEEP_FOR_SENDER_MS = 1;
    private static final String QUEUE_NAME = "foo.queue";
    Appender slowDownAppender;
    CountDownLatch consumerDemandExists;
    private static final Logger LOG = LoggerFactory.getLogger(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
    private static String[] TEST_ITERATIONS = {"foo.queue0", "foo.queue1", "foo.queue2", "foo.queue3"};
    final AtomicLong messageCount = new AtomicLong(0);
    final AtomicLong respondentSendError = new AtomicLong(0);
    final AtomicLong responseReceived = new AtomicLong(0);
    final AtomicLong sendsWithNoConsumers = new AtomicLong(0);
    final AtomicLong forwardFailures = new AtomicLong(0);
    protected final AtomicBoolean shutdown = new AtomicBoolean(false);
    HashSet<NetworkConnector> networkConnectors = new HashSet<>();
    HashSet<Connection> advisoryConsumerConnections = new HashSet<>();
    protected boolean useDuplex = false;

    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest$EchoRespondent.class */
    class EchoRespondent extends MessageClient implements Runnable {
        public EchoRespondent(ActiveMQConnectionFactory activeMQConnectionFactory) throws Exception {
            super(activeMQConnectionFactory, 0);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("RESPONDENT LISTENING");
                while (!RequestReplyTempDestRemovalAdvisoryRaceTest.this.shutdown.get()) {
                    ActiveMQTextMessage receive = this.consumer.receive(1000L);
                    if (receive instanceof TextMessage) {
                        ActiveMQTextMessage activeMQTextMessage = receive;
                        try {
                            RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("RESPONDENT: Received a message: [" + activeMQTextMessage.getText() + "]" + Arrays.asList(activeMQTextMessage.getBrokerPath()));
                            TextMessage createTextMessage = this.session.createTextMessage("reply: " + activeMQTextMessage.getText());
                            Destination jMSReplyTo = receive.getJMSReplyTo();
                            TimeUnit.MILLISECONDS.sleep(this.timeToSleep);
                            RequestReplyTempDestRemovalAdvisoryRaceTest.this.consumerDemandExists.await(5L, TimeUnit.SECONDS);
                            try {
                                this.producer.send(jMSReplyTo, createTextMessage);
                                RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("RESPONDENT: sent reply:" + createTextMessage.getJMSMessageID() + " back to: " + jMSReplyTo);
                            } catch (JMSException e) {
                                RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.error("RESPONDENT: could not send reply message: " + e.getLocalizedMessage(), e);
                                RequestReplyTempDestRemovalAdvisoryRaceTest.this.respondentSendError.incrementAndGet();
                            }
                        } catch (JMSException e2) {
                            RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.error("RESPONDENT: could not create the reply message: " + e2.getLocalizedMessage(), e2);
                        } catch (InterruptedException e3) {
                            RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("RESPONDENT could not generate a random number");
                        }
                    }
                }
            } catch (JMSException e4) {
                RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("RESPONDENT: Could not set the message listener on the respondent");
            }
        }

        @Override // org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.MessageClient
        protected void initProducer() throws JMSException {
            this.producer = this.session.createProducer((Destination) null);
            this.producer.setDeliveryMode(1);
        }

        @Override // org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.MessageClient
        protected void initConsumer() throws JMSException {
            this.consumer = this.session.createConsumer(new ActiveMQQueue(RequestReplyTempDestRemovalAdvisoryRaceTest.QUEUE_NAME));
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest$MessageClient.class */
    abstract class MessageClient {
        protected Connection connection;
        protected Session session;
        protected MessageConsumer consumer;
        protected MessageProducer producer;
        protected Random random = new Random(System.currentTimeMillis());
        protected int timeToSleep;

        public MessageClient(ActiveMQConnectionFactory activeMQConnectionFactory, int i) throws Exception {
            this.connection = activeMQConnectionFactory.createConnection();
            this.session = this.connection.createSession(false, 1);
            this.timeToSleep = i;
            preInit();
            initProducer();
            initConsumer();
            this.connection.start();
        }

        protected void preInit() throws JMSException {
        }

        protected abstract void initProducer() throws JMSException;

        protected abstract void initConsumer() throws JMSException;
    }

    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyTempDestRemovalAdvisoryRaceTest$MessageSender.class */
    class MessageSender extends MessageClient implements Runnable {
        protected Destination tempDest;

        public MessageSender(ActiveMQConnectionFactory activeMQConnectionFactory) throws Exception {
            super(activeMQConnectionFactory, 1);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    TextMessage createTextMessage = this.session.createTextMessage("request: message #" + RequestReplyTempDestRemovalAdvisoryRaceTest.this.messageCount.getAndIncrement());
                    createTextMessage.setJMSReplyTo(this.tempDest);
                    this.producer.send(createTextMessage);
                    RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("SENDER: Message [" + createTextMessage.getText() + "] has been sent.");
                    TextMessage receive = this.consumer.receive(this.timeToSleep);
                    if (receive instanceof TextMessage) {
                        try {
                            RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("SENDER: Got a response from echo service!" + receive.getText());
                            RequestReplyTempDestRemovalAdvisoryRaceTest.this.responseReceived.incrementAndGet();
                        } catch (JMSException e) {
                            RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.error("SENDER: might want to see why i'm getting non-text messages..." + receive, e);
                        }
                    } else {
                        RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("SENDER: Did not get a response this time");
                    }
                } catch (JMSException e2) {
                    RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.error("SENDER: Could not complete message sending properly: " + e2.getMessage());
                    try {
                        this.producer.close();
                        this.consumer.close();
                        this.session.close();
                        this.connection.close();
                    } catch (JMSException e3) {
                        e3.printStackTrace();
                    }
                }
            } finally {
                try {
                    this.producer.close();
                    this.consumer.close();
                    this.session.close();
                    this.connection.close();
                } catch (JMSException e4) {
                    e4.printStackTrace();
                }
            }
        }

        @Override // org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.MessageClient
        protected void preInit() throws JMSException {
            this.tempDest = this.session.createTemporaryTopic();
        }

        @Override // org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.MessageClient
        protected void initProducer() throws JMSException {
            this.producer = this.session.createProducer(new ActiveMQQueue(RequestReplyTempDestRemovalAdvisoryRaceTest.QUEUE_NAME));
        }

        @Override // org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.MessageClient
        protected void initConsumer() throws JMSException {
            this.consumer = this.session.createConsumer(this.tempDest);
            RequestReplyTempDestRemovalAdvisoryRaceTest.LOG.info("consumer for: " + this.tempDest + ", " + this.consumer);
        }
    }

    public static Test suite() {
        return suite(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
    }

    public void initCombos() {
        addCombinationValues("QUEUE_NAME", TEST_ITERATIONS);
    }

    public void testTempDestRaceDuplex() throws Exception {
        this.useDuplex = true;
        bridgeBrokers(BROKER_A, BROKER_B, false, 3);
        bridgeBrokers(BROKER_B, BROKER_C, false, 3);
        startAllBrokers();
        waitForBridgeFormation(1);
        HashSet hashSet = new HashSet();
        Iterator<NetworkConnector> it = this.networkConnectors.iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next().activeBridges());
        }
        LOG.info("Bridges start:" + hashSet);
        slowDownAdvisoryDispatch();
        noConsumerAdvisory();
        forwardFailureAdvisory();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(50);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokers.get(BROKER_A).broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        for (int i = 0; i < 1; i++) {
            newFixedThreadPool.execute(new EchoRespondent(activeMQConnectionFactory));
        }
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(this.brokers.get(BROKER_C).broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        for (int i2 = 0; i2 < 1; i2++) {
            newCachedThreadPool.execute(new MessageSender(activeMQConnectionFactory2));
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(15L);
        LOG.info("shutting down");
        this.shutdown.compareAndSet(false, true);
        HashSet hashSet2 = new HashSet();
        Iterator<NetworkConnector> it2 = this.networkConnectors.iterator();
        while (it2.hasNext()) {
            hashSet2.addAll(it2.next().activeBridges());
        }
        LOG.info("Bridges end:" + hashSet2);
        assertEquals("no new bridges created", hashSet, hashSet2);
        LOG.info("received: " + this.responseReceived.get() + ", respondent error: " + this.respondentSendError.get() + ", noConsumerCount: " + this.sendsWithNoConsumers.get() + ", forwardFailures: " + this.forwardFailures.get());
        assertEquals("success or error", 1L, this.respondentSendError.get() + this.forwardFailures.get() + this.responseReceived.get() + this.sendsWithNoConsumers.get());
    }

    private void forwardFailureAdvisory() throws JMSException {
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            Connection createConnection = new ActiveMQConnectionFactory(it.next().broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false").createConnection();
            createConnection.start();
            createConnection.createSession(false, 1).createConsumer(AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic()).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.1
                public void onMessage(Message message) {
                    RequestReplyTempDestRemovalAdvisoryRaceTest.this.forwardFailures.incrementAndGet();
                }
            });
        }
    }

    private void noConsumerAdvisory() throws JMSException {
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            Connection createConnection = new ActiveMQConnectionFactory(it.next().broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false").createConnection();
            createConnection.start();
            createConnection.createSession(false, 1).createConsumer(AdvisorySupport.getNoTopicConsumersAdvisoryTopic(new ActiveMQTempTopic(">"))).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.2
                public void onMessage(Message message) {
                    RequestReplyTempDestRemovalAdvisoryRaceTest.this.sendsWithNoConsumers.incrementAndGet();
                }
            });
        }
    }

    public void testTempDestRace() throws Exception {
        bridgeBrokers(BROKER_A, BROKER_B, false, 3);
        bridgeBrokers(BROKER_B, BROKER_A, false, 3);
        bridgeBrokers(BROKER_B, BROKER_C, false, 3);
        bridgeBrokers(BROKER_C, BROKER_B, false, 3);
        startAllBrokers();
        waitForBridgeFormation(1);
        HashSet hashSet = new HashSet();
        Iterator<NetworkConnector> it = this.networkConnectors.iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next().activeBridges());
        }
        slowDownAdvisoryDispatch();
        noConsumerAdvisory();
        forwardFailureAdvisory();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(50);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokers.get(BROKER_A).broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        for (int i = 0; i < 1; i++) {
            newFixedThreadPool.execute(new EchoRespondent(activeMQConnectionFactory));
        }
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(this.brokers.get(BROKER_C).broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        for (int i2 = 0; i2 < 1; i2++) {
            newCachedThreadPool.execute(new MessageSender(activeMQConnectionFactory2));
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(10L);
        LOG.info("shutting down");
        this.shutdown.compareAndSet(false, true);
        HashSet hashSet2 = new HashSet();
        Iterator<NetworkConnector> it2 = this.networkConnectors.iterator();
        while (it2.hasNext()) {
            hashSet2.addAll(it2.next().activeBridges());
        }
        assertEquals("no new bridges created", hashSet, hashSet2);
        LOG.info("received: " + this.responseReceived.get() + ", respondent error: " + this.respondentSendError.get() + ", noConsumerCount: " + this.sendsWithNoConsumers.get() + ", forwardFailures: " + this.forwardFailures.get());
        assertEquals("success or error", 1L, this.respondentSendError.get() + this.forwardFailures.get() + this.responseReceived.get() + this.sendsWithNoConsumers.get());
    }

    private void slowDownAdvisoryDispatch() throws Exception {
        org.apache.log4j.Logger.getLogger(DemandForwardingBridgeSupport.class).setLevel(Level.DEBUG);
        this.slowDownAppender = new DefaultTestAppender() { // from class: org.apache.activemq.usecases.RequestReplyTempDestRemovalAdvisoryRaceTest.3
            public void doAppend(LoggingEvent loggingEvent) {
                if (Level.DEBUG.equals(loggingEvent.getLevel())) {
                    String obj = loggingEvent.getMessage().toString();
                    if (obj.startsWith(RequestReplyTempDestRemovalAdvisoryRaceTest.BROKER_B) && obj.contains("remove local subscription")) {
                        try {
                            RequestReplyTempDestRemovalAdvisoryRaceTest.this.consumerDemandExists.countDown();
                            System.err.println("Sleeping on receipt of remove info debug message: " + obj);
                            TimeUnit.SECONDS.sleep(2L);
                        } catch (Exception e) {
                        }
                    }
                }
            }
        };
        org.apache.log4j.Logger.getRootLogger().addAppender(this.slowDownAppender);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setUp();
        this.responseReceived.set(0L);
        this.respondentSendError.set(0L);
        this.forwardFailures.set(0L);
        this.sendsWithNoConsumers.set(0L);
        this.networkConnectors.clear();
        this.advisoryConsumerConnections.clear();
        this.consumerDemandExists = new CountDownLatch(1);
        createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
        createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
        createBroker(new URI("broker:(tcp://localhost:0)/BrokerC?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setSendAdvisoryIfNoConsumers(true);
        SharedDeadLetterStrategy sharedDeadLetterStrategy = new SharedDeadLetterStrategy();
        sharedDeadLetterStrategy.setProcessNonPersistent(true);
        policyEntry.setDeadLetterStrategy(sharedDeadLetterStrategy);
        policyMap.put(new ActiveMQTempTopic(">"), policyEntry);
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            it.next().broker.setDestinationPolicy(policyMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        if (this.slowDownAppender != null) {
            org.apache.log4j.Logger.getRootLogger().removeAppender(this.slowDownAppender);
        }
        Iterator<Connection> it = this.advisoryConsumerConnections.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        super.tearDown();
    }

    protected NetworkConnector bridgeBrokers(String str, String str2, boolean z, int i) throws Exception {
        NetworkConnector bridgeBrokers = super.bridgeBrokers(str, str2, z, i, true);
        bridgeBrokers.setBridgeTempDestinations(true);
        bridgeBrokers.setAdvisoryForFailedForward(true);
        bridgeBrokers.setDuplex(this.useDuplex);
        bridgeBrokers.setAlwaysSyncSend(true);
        this.networkConnectors.add(bridgeBrokers);
        return bridgeBrokers;
    }
}
