package org.apache.activemq.transport.stomp;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/stomp/StompVirtualTopicTest.class */
public class StompVirtualTopicTest {
    private static final Logger LOG = LoggerFactory.getLogger(StompVirtualTopicTest.class);
    private static final int NUM_MSGS = 100000;
    private BrokerService broker = null;
    private String failMsg = null;
    private URI brokerUri;

    /* loaded from: input_file:org/apache/activemq/transport/stomp/StompVirtualTopicTest$StompConsumer.class */
    class StompConsumer implements Runnable {
        private StompVirtualTopicTest parent;
        final Logger log = LoggerFactory.getLogger(StompConsumer.class);
        private CountDownLatch latch = new CountDownLatch(1);
        private HashSet<String> received = new HashSet<>();
        private HashSet<String> dups = new HashSet<>();

        public StompConsumer(StompVirtualTopicTest stompVirtualTopicTest) {
            this.parent = null;
            this.parent = stompVirtualTopicTest;
        }

        public void awaitStartCompleted() {
            try {
                this.latch.await();
            } catch (InterruptedException e) {
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            StompVirtualTopicTest.LOG.info("Running Stomp Consumer");
            StompConnection stompConnection = new StompConnection();
            int i = 0;
            try {
                try {
                    stompConnection.open("localhost", StompVirtualTopicTest.this.brokerUri.getPort());
                    stompConnection.sendFrame("CONNECT\nlogin: system\npasscode: manager\n\n��");
                    Assert.assertTrue(stompConnection.receive().toString().startsWith("CONNECTED"));
                    stompConnection.subscribe("/queue/Consumer.A.VirtualTopic.FOO", "auto");
                    Thread.sleep(2000L);
                    this.latch.countDown();
                    i = 0;
                    while (i < StompVirtualTopicTest.NUM_MSGS) {
                        StompFrame receive = stompConnection.receive(15000L);
                        this.log.trace("Received msg with content: " + receive.getBody());
                        if (!this.received.add(receive.getBody())) {
                            this.dups.add(receive.getBody());
                        }
                        i++;
                    }
                    try {
                        junit.framework.Assert.assertNull(stompConnection.receive(3000L));
                    } catch (Exception e) {
                        StompVirtualTopicTest.LOG.info("Correctly received " + e + " while trying to consume an additional msg. This is expected as the queue should be empty now.");
                    }
                    if (reportQueueStatistics() != 0) {
                        this.parent.setFail("QueueSize not 0 after test has finished.");
                    }
                    this.log.debug("Stomp Consumer Received " + i + " of " + StompVirtualTopicTest.NUM_MSGS + " messages. Check QueueSize in JMX and try to browse the queue.");
                    if (!this.dups.isEmpty()) {
                        Iterator<String> it = this.dups.iterator();
                        while (it.hasNext()) {
                            StompVirtualTopicTest.LOG.debug("Received duplicate message: " + it.next());
                        }
                        this.parent.setFail("Received 100000 messages but " + this.dups.size() + " were dups.");
                    }
                    try {
                        stompConnection.disconnect();
                        Thread.sleep(2000L);
                        stompConnection.close();
                    } catch (Exception e2) {
                        this.log.error("unexpected exception on sleep", e2);
                    }
                } catch (Throwable th) {
                    try {
                        stompConnection.disconnect();
                        Thread.sleep(2000L);
                        stompConnection.close();
                    } catch (Exception e3) {
                        this.log.error("unexpected exception on sleep", e3);
                    }
                    throw th;
                }
            } catch (Exception e4) {
                this.log.error(e4.getMessage() + " after consuming " + i + " msgs.");
                try {
                    reportQueueStatistics();
                } catch (Exception e5) {
                }
                this.parent.setFail("Stomp Consumer received " + i + " of " + StompVirtualTopicTest.NUM_MSGS + " messages. Check QueueSize in JMX and try to browse the queue.");
                try {
                    stompConnection.disconnect();
                    Thread.sleep(2000L);
                    stompConnection.close();
                } catch (Exception e6) {
                    this.log.error("unexpected exception on sleep", e6);
                }
            }
            this.log.info("Test Finished.");
        }

        private long reportQueueStatistics() throws Exception {
            QueueViewMBean queueViewMBean = (QueueViewMBean) StompVirtualTopicTest.this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:Type=Queue,Destination=Consumer.A.VirtualTopic.FOO,BrokerName=localhost"), QueueViewMBean.class, true);
            StompVirtualTopicTest.LOG.info("Consumer.A.VirtualTopic.FOO Inflight: " + queueViewMBean.getInFlightCount() + ", enqueueCount: " + queueViewMBean.getEnqueueCount() + ", dequeueCount: " + queueViewMBean.getDequeueCount() + ", dispatchCount: " + queueViewMBean.getDispatchCount());
            return queueViewMBean.getQueueSize();
        }
    }

    @Before
    public void setUp() throws Exception {
        LOG.info("Starting up");
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerUri = new URI(((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService createBroker = BrokerFactory.createBroker(new URI("broker://()/localhost"));
        createBroker.setUseJmx(true);
        createBroker.setDeleteAllMessagesOnStartup(true);
        createBroker.addConnector("stomp://localhost:0?transport.closeAsync=false");
        File file = new File("target/activemq-data/StompVirtualTopicTest");
        createBroker.setDataDirectoryFile(file);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File(file, "kahadb"));
        createBroker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        applyMemoryLimitPolicy(createBroker);
        return createBroker;
    }

    private void applyMemoryLimitPolicy(BrokerService brokerService) {
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(5818230784L);
        systemUsage.getStoreUsage().setLimit(6442450944L);
        systemUsage.getTempUsage().setLimit(3221225472L);
        brokerService.setSystemUsage(systemUsage);
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setProducerFlowControl(false);
        policyEntry.setMemoryLimit(10485760L);
        policyEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
        arrayList.add(policyEntry);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testStompOnVirtualTopics() throws Exception {
        LOG.info("Running Stomp Producer");
        StompConsumer stompConsumer = new StompConsumer(this);
        Thread thread = new Thread(stompConsumer);
        thread.start();
        stompConsumer.awaitStartCompleted();
        Thread.sleep(500L);
        StompConnection stompConnection = new StompConnection();
        stompConnection.open("localhost", this.brokerUri.getPort());
        stompConnection.sendFrame("CONNECT\nlogin: system\npasscode: manager\n\n��");
        Assert.assertTrue(stompConnection.receive().toString().startsWith("CONNECTED"));
        for (int i = 0; i < 99999; i++) {
            stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "}");
        }
        LOG.info("Sending last packet with receipt header");
        HashMap hashMap = new HashMap();
        hashMap.put("receipt", "1234");
        stompConnection.appendHeaders(hashMap);
        stompConnection.sendFrame("SEND\ndestination:/topic/VirtualTopic.FOO\nreceipt: msg-1\n\n\nHello World {99999}��");
        Assert.assertTrue(stompConnection.receiveFrame().contains("RECEIPT"));
        try {
            Thread.sleep(6000L);
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
        }
        stompConnection.disconnect();
        Thread.sleep(2000L);
        stompConnection.close();
        LOG.info("Stomp Producer finished. Waiting for consumer to join.");
        thread.join();
        LOG.info("Test finished.");
        if (null != this.failMsg) {
            LOG.error(this.failMsg);
            junit.framework.Assert.fail(this.failMsg);
        }
    }

    protected void setFail(String str) {
        this.failMsg = str;
    }
}
