package org.apache.activemq.broker.ft;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;

/* loaded from: input_file:org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.class */
public abstract class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWithTwoConnectionsTest {
    private static final transient Logger LOG = LoggerFactory.getLogger(QueueMasterSlaveTestSupport.class);
    protected BrokerService master;
    protected int inflightMessageCount;
    protected AtomicReference<BrokerService> slave = new AtomicReference<>();
    protected CountDownLatch slaveStarted = new CountDownLatch(1);
    protected int failureCount = 50;
    protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest, org.apache.activemq.JmsSendReceiveTestSupport
    public void setUp() throws Exception {
        setMaxTestTime(TimeUnit.MINUTES.toMillis(10L));
        setAutoFail(true);
        if (System.getProperty("basedir") == null) {
            System.setProperty("basedir", new File(".").getAbsolutePath());
        }
        this.messageCount = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP;
        this.failureCount = this.messageCount / 2;
        this.topic = isTopic();
        createMaster();
        createSlave();
        Thread.sleep(1000L);
        super.setUp();
    }

    protected String getSlaveXml() {
        return "org/apache/activemq/broker/ft/slave.xml";
    }

    protected String getMasterXml() {
        return "org/apache/activemq/broker/ft/master.xml";
    }

    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest
    protected void tearDown() throws Exception {
        super.tearDown();
        this.master.stop();
        this.master.waitUntilStopped();
        this.slaveStarted.await(5L, TimeUnit.SECONDS);
        BrokerService brokerService = this.slave.get();
        if (brokerService != null) {
            brokerService.stop();
        }
        this.master.stop();
    }

    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest, org.apache.activemq.TestSupport
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.uriString);
    }

    @Override // org.apache.activemq.JmsSendReceiveTestSupport
    protected void messageSent() throws Exception {
        int i = this.inflightMessageCount + 1;
        this.inflightMessageCount = i;
        if (i == this.failureCount) {
            Thread.sleep(1000L);
            LOG.error("MASTER STOPPED!@!!!!");
            this.master.stop();
        }
    }

    protected boolean isTopic() {
        return false;
    }

    protected void createMaster() throws Exception {
        BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
        brokerFactoryBean.afterPropertiesSet();
        this.master = brokerFactoryBean.getBroker();
        this.master.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createSlave() throws Exception {
        BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
        brokerFactoryBean.afterPropertiesSet();
        BrokerService broker = brokerFactoryBean.getBroker();
        broker.start();
        this.slave.set(broker);
        this.slaveStarted.countDown();
    }

    public void testVirtualTopicFailover() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
        assertNull("No message there yet", createConsumer.receive(1000L));
        createConsumer.close();
        assertTrue(!this.master.isSlave());
        this.master.stop();
        assertTrue("slave started", this.slaveStarted.await(15L, TimeUnit.SECONDS));
        assertTrue(!this.slave.get().isSlave());
        this.producer.send(new ActiveMQTopic("VirtualTopic.TA1"), this.session.createTextMessage("ForUWhenSlaveKicksIn"));
        TextMessage receive = this.session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")).receive(4000L);
        assertNotNull("Get message after failover", receive);
        assertEquals("correct message", "ForUWhenSlaveKicksIn", receive.getText());
    }

    public void testAdvisory() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
        this.master.stop();
        assertTrue("slave started", this.slaveStarted.await(15L, TimeUnit.SECONDS));
        LOG.info("slave started");
        Message receive = createConsumer.receive(5000L);
        LOG.info("received " + receive);
        assertNotNull("Didn't received advisory", receive);
    }
}
