package org.apache.activemq.network;

import java.io.File;
import java.net.MalformedURLException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.QueryExp;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/network/NetworkBrokerDetachTest.class */
public class NetworkBrokerDetachTest {
    private static final String BROKER_NAME = "broker";
    private static final String REM_BROKER_NAME = "networkedBroker";
    private static final String DESTINATION_NAME = "testQ";
    private static final int NUM_CONSUMERS = 1;
    protected static final Log LOG = LogFactory.getLog(NetworkBrokerDetachTest.class);
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final boolean dynamicOnly = false;
    protected BrokerService broker;
    protected BrokerService networkedBroker;

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName(BROKER_NAME);
        configureBroker(brokerService);
        brokerService.addConnector("tcp://localhost:61617");
        configureNetworkConnector(brokerService.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"));
        return brokerService;
    }

    protected BrokerService createNetworkedBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName(REM_BROKER_NAME);
        configureBroker(brokerService);
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.addConnector("tcp://localhost:62617");
        configureNetworkConnector(brokerService.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"));
        return brokerService;
    }

    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(false);
        networkConnector.setNetworkTTL(2);
        networkConnector.setDynamicOnly(false);
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + brokerService.getBrokerName() + "NetworBrokerDetatchTest"));
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
    }

    @Before
    public void init() throws Exception {
        this.broker = createBroker();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        this.networkedBroker = createNetworkedBroker();
        this.networkedBroker.setDeleteAllMessagesOnStartup(true);
        this.networkedBroker.start();
    }

    @After
    public void cleanup() throws Exception {
        this.networkedBroker.stop();
        this.networkedBroker.waitUntilStopped();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test
    public void testNetworkedBrokerDetach() throws Exception {
        LOG.info("Creating Consumer on the networked broker ...");
        Connection createConnection = createConnectionFactory(this.networkedBroker).createConnection();
        Session createSession = createConnection.createSession(false, 1);
        ActiveMQDestination activeMQDestination = (ActiveMQDestination) createSession.createQueue(DESTINATION_NAME);
        for (int i = 0; i < 1; i++) {
            createSession.createConsumer(activeMQDestination);
        }
        Assert.assertTrue("got expected consumer count from mbean within time limit", verifyConsumerCount(1L, activeMQDestination, this.broker));
        LOG.info("Stopping Consumer on the networked broker ...");
        createConnection.close();
        Assert.assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0L, activeMQDestination, this.broker));
    }

    @Test
    public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        MessageListener messageListener = new MessageListener() { // from class: org.apache.activemq.network.NetworkBrokerDetachTest.1
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
            }
        };
        LOG.info("Creating durable consumer on each broker ...");
        ActiveMQTopic registerDurableConsumer = registerDurableConsumer(this.networkedBroker, messageListener);
        registerDurableConsumer(this.broker, messageListener);
        Assert.assertTrue("got expected consumer count from local broker mbean within time limit", verifyConsumerCount(2L, registerDurableConsumer, this.broker));
        Assert.assertTrue("got expected consumer count from network broker mbean within time limit", verifyConsumerCount(2L, registerDurableConsumer, this.networkedBroker));
        sendMessageTo(registerDurableConsumer, this.broker);
        Assert.assertTrue("Got one message on each", verifyMessageCount(2, atomicInteger));
        LOG.info("Stopping brokerTwo...");
        this.networkedBroker.stop();
        this.networkedBroker.waitUntilStopped();
        LOG.info("restarting  broker Two...");
        this.networkedBroker = createNetworkedBroker();
        this.networkedBroker.start();
        LOG.info("Recreating durable Consumer on the broker after restart...");
        registerDurableConsumer(this.networkedBroker, messageListener);
        TimeUnit.SECONDS.sleep(5L);
        sendMessageTo(registerDurableConsumer, this.broker);
        Assert.assertTrue("got expected consumer count from local broker mbean within time limit", verifyConsumerCount(2L, registerDurableConsumer, this.broker));
        Assert.assertTrue("got expected consumer count from network broker mbean within time limit", verifyConsumerCount(2L, registerDurableConsumer, this.networkedBroker));
        Assert.assertTrue("got no inactive subs on broker", verifyDurableConsumerCount(0L, this.broker));
        Assert.assertTrue("got no inactive subs on other broker", verifyDurableConsumerCount(0L, this.networkedBroker));
        Assert.assertTrue("Got two more messages after restart", verifyMessageCount(4, atomicInteger));
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue("still Got just two more messages", verifyMessageCount(4, atomicInteger));
    }

    private boolean verifyMessageCount(final int i, final AtomicInteger atomicInteger) throws Exception {
        return Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.NetworkBrokerDetachTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return i == atomicInteger.get();
            }
        });
    }

    private ActiveMQTopic registerDurableConsumer(BrokerService brokerService, MessageListener messageListener) throws Exception {
        Connection createConnection = createConnectionFactory(brokerService).createConnection();
        createConnection.setClientID("DurableOne");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        ActiveMQTopic createTopic = createSession.createTopic(DESTINATION_NAME);
        createSession.createDurableSubscriber(createTopic, "SubOne" + brokerService.getBrokerName()).setMessageListener(messageListener);
        return createTopic;
    }

    private void sendMessageTo(ActiveMQTopic activeMQTopic, BrokerService brokerService) throws Exception {
        Connection createConnection = createConnectionFactory(brokerService).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createProducer(activeMQTopic).send(createSession.createTextMessage("Hi"));
        createConnection.close();
    }

    protected ConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) brokerService.getTransportConnectors().get(0)).getServer().getConnectURI().toString());
        activeMQConnectionFactory.setOptimizedMessageDispatch(true);
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setUseCompression(false);
        activeMQConnectionFactory.setDispatchAsync(false);
        activeMQConnectionFactory.setUseAsyncSend(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(false);
        activeMQConnectionFactory.setWatchTopicAdvisories(true);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setQueuePrefetch(100);
        activeMQPrefetchPolicy.setTopicPrefetch(1000);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        return activeMQConnectionFactory;
    }

    private boolean verifyConsumerCount(final long j, final ActiveMQDestination activeMQDestination, final BrokerService brokerService) throws Exception {
        return Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.NetworkBrokerDetachTest.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                boolean z = false;
                try {
                    Object attribute = brokerService.getManagementContext().getAttribute(NetworkBrokerDetachTest.this.getObjectName(brokerService.getBrokerName(), activeMQDestination.isQueue() ? "Queue" : "Topic", "Destination=" + activeMQDestination.getPhysicalName()), "ConsumerCount");
                    if (attribute != null) {
                        NetworkBrokerDetachTest.LOG.info("Consumers for " + activeMQDestination.getPhysicalName() + " on " + brokerService + " : " + attribute);
                        if (j == ((Long) attribute).longValue()) {
                            z = true;
                        }
                    }
                } catch (Exception e) {
                }
                return z;
            }
        });
    }

    private boolean verifyDurableConsumerCount(final long j, final BrokerService brokerService) throws Exception {
        return Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.NetworkBrokerDetachTest.4
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                Set queryNames;
                boolean z = false;
                if (NetworkBrokerDetachTest.this.getMBeanServerConnection() != null && (queryNames = brokerService.getManagementContext().queryNames(NetworkBrokerDetachTest.this.getObjectName(brokerService.getBrokerName(), "Subscription", "active=false,*"), (QueryExp) null)) != null) {
                    NetworkBrokerDetachTest.LOG.info("inactive durable subs on " + brokerService + " : " + queryNames);
                    if (j == queryNames.size()) {
                        z = true;
                    }
                }
                return z;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
        MBeanServerConnection mBeanServerConnection = null;
        try {
            mBeanServerConnection = JMXConnectorFactory.connect(new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"), (Map) null).getMBeanServerConnection();
        } catch (Exception e) {
            LOG.warn("getMBeanServer ex: " + e);
        }
        Assume.assumeNotNull(new Object[]{mBeanServerConnection});
        return mBeanServerConnection;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ObjectName getObjectName(String str, String str2, String str3) throws Exception {
        return new ObjectName("org.apache.activemq:BrokerName=" + str + ",Type=" + str2 + "," + str3);
    }
}
