package org.apache.activemq.usecases;

import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Wait;

/* loaded from: input_file:org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.class */
public class QueueMemoryFullMultiBrokersTest extends JmsMultipleBrokersTestSupport {
    public static final int BROKER_COUNT = 2;
    public static final int MESSAGE_COUNT = 2000;

    public void testQueueNetworkWithConsumerFull() throws Exception {
        bridgeAllBrokers();
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        sendMessages("Broker1", createDestination, 50);
        CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);
        createConsumer("Broker2", (Destination) createDestination, countDownLatch);
        assertConsumersConnect("Broker1", createDestination, 1, Wait.MAX_WAIT_MILLIS);
        sendMessages("Broker1", createDestination, 1950);
        assertTrue("Missing " + countDownLatch.getCount() + " messages", countDownLatch.await(45L, TimeUnit.SECONDS));
        RegionBroker regionBroker = this.brokers.get("Broker1").broker.getRegionBroker();
        Thread.sleep(2000L);
        Queue queue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(createDestination)).iterator().next();
        assertTrue("All messages are consumed and acked from source:" + queue, queue.getMessages().isEmpty());
        assertEquals("messages source:" + queue, 0L, queue.getDestinationStatistics().getMessages().getCount());
        assertEquals("inflight source:" + queue, 0L, queue.getDestinationStatistics().getInflight().getCount());
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        this.messageSize = 1024;
        for (int i = 1; i <= 2; i++) {
            createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false"));
        }
        applyMemoryLimitPolicy(this.brokers.get("Broker2").broker);
    }

    private void applyMemoryLimitPolicy(BrokerService brokerService) {
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(51200L);
        brokerService.setSystemUsage(systemUsage);
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setMemoryLimit(4096L);
        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        arrayList.add(policyEntry);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
    }
}
