package org.apache.activemq.usecases;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TopicProducerFlowControlTest.class */
public class TopicProducerFlowControlTest extends TestCase implements MessageListener {
    private static final String brokerName = "testBroker";
    private static final String brokerUrl = "vm://testBroker";
    protected static final int destinationMemLimit = 2097152;
    private static final int numMessagesToSend = 50000;
    private BrokerService broker;
    private static final Logger LOG = LoggerFactory.getLogger(TopicProducerFlowControlTest.class);
    private static final AtomicLong produced = new AtomicLong();
    private static final AtomicLong consumed = new AtomicLong();

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName(brokerName);
        this.broker.setPersistent(false);
        this.broker.setSchedulerSupport(false);
        this.broker.setUseJmx(false);
        this.broker.setUseShutdownHook(false);
        this.broker.addConnector(brokerUrl);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setTopic(">");
        policyEntry.setMemoryLimit(2097152L);
        policyEntry.setProducerFlowControl(true);
        policyEntry.setAdvisoryWhenFull(true);
        policyEntry.setBlockedProducerWarningInterval(2000L);
        policyMap.setPolicyEntries(Arrays.asList(policyEntry));
        setDestinationPolicy(this.broker, policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected void setDestinationPolicy(BrokerService brokerService, PolicyMap policyMap) {
        brokerService.setDestinationPolicy(policyMap);
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public void testTopicProducerFlowControl() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setProducerWindowSize(1024);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(5000);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Destination createDestination = createDestination(createSession);
        createSession.createConsumer(createDestination).setMessageListener(new TopicProducerFlowControlTest());
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        createSession.createConsumer(new ActiveMQTopic("ActiveMQ.Advisory.FULL.>")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.TopicProducerFlowControlTest.1
            public void onMessage(Message message) {
                try {
                    if (atomicInteger.get() % 100 == 0) {
                        TopicProducerFlowControlTest.LOG.info("Got full advisory, usageName: " + message.getStringProperty("usageName") + ", usageCount: " + message.getLongProperty("usageCount") + ", blockedCounter: " + atomicInteger.get());
                    }
                    atomicInteger.incrementAndGet();
                } catch (Exception e) {
                    e.printStackTrace();
                    TopicProducerFlowControlTest.LOG.error("missing advisory property", e);
                }
            }
        });
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        DefaultTestAppender defaultTestAppender = new DefaultTestAppender() { // from class: org.apache.activemq.usecases.TopicProducerFlowControlTest.2
            public void doAppend(LoggingEvent loggingEvent) {
                if (loggingEvent.getLevel().equals(Level.INFO) && loggingEvent.getMessage().toString().contains("Usage Manager memory limit reached")) {
                    TopicProducerFlowControlTest.LOG.info("received  log message: " + loggingEvent.getMessage());
                    atomicInteger2.incrementAndGet();
                }
            }
        };
        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Topic.class);
        logger.addAppender(defaultTestAppender);
        try {
            final Session createSession2 = activeMQConnectionFactory.createConnection().createSession(false, 1);
            final MessageProducer createProducer = createSession2.createProducer(createDestination);
            new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.TopicProducerFlowControlTest.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (long j = 0; j < 50000; j++) {
                        try {
                            try {
                                createProducer.send(createSession2.createTextMessage("test"));
                                long incrementAndGet = TopicProducerFlowControlTest.produced.incrementAndGet();
                                if (incrementAndGet % DurableSubProcessWithRestartTest.BROKER_RESTART == 0) {
                                    TopicProducerFlowControlTest.LOG.info("Produced " + incrementAndGet + " messages");
                                }
                            } catch (Throwable th) {
                                th.printStackTrace();
                                try {
                                    createProducer.close();
                                    createSession2.close();
                                    return;
                                } catch (Exception e) {
                                    return;
                                }
                            }
                        } finally {
                            try {
                                createProducer.close();
                                createSession2.close();
                            } catch (Exception e2) {
                            }
                        }
                    }
                }
            }.start();
            Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TopicProducerFlowControlTest.4
                public boolean isSatisified() throws Exception {
                    return TopicProducerFlowControlTest.consumed.get() == 50000;
                }
            }, 300000L);
            assertEquals("Didn't produce all messages", 50000L, produced.get());
            assertEquals("Didn't consume all messages", 50000L, consumed.get());
            assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TopicProducerFlowControlTest.5
                public boolean isSatisified() throws Exception {
                    return atomicInteger.get() > 0;
                }
            }, 5000L));
            LOG.info("BlockedCount: " + atomicInteger.get() + ", Warnings:" + atomicInteger2.get());
            assertTrue("got a few warnings", atomicInteger2.get() > 1);
            assertTrue("warning limited", atomicInteger2.get() < atomicInteger.get());
            logger.removeAppender(defaultTestAppender);
        } catch (Throwable th) {
            logger.removeAppender(defaultTestAppender);
            throw th;
        }
    }

    protected Destination createDestination(Session session) throws Exception {
        return new ActiveMQTopic("test");
    }

    public void onMessage(Message message) {
        long incrementAndGet = consumed.incrementAndGet();
        if (incrementAndGet % 100 == 0) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        if (incrementAndGet % DurableSubProcessWithRestartTest.BROKER_RESTART == 0) {
            LOG.info("\tConsumed " + incrementAndGet + " messages");
        }
    }
}
