package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.class */
public class ActiveMQSlowConsumerManualTest {
    private static final int PORT = 12345;
    private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
    private static final String URL = "nio://localhost:12345?socket.tcpNoDelay=true";

    @Test(timeout = 60000)
    public void testDefaultSettings() throws Exception {
        runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
    }

    @Test(timeout = 60000)
    public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
        runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
    }

    @Test(timeout = 60000)
    public void testBounded() throws Exception {
        runTest("testBounded", 30, 5, 25, false, false, false, false);
    }

    @Test(timeout = 60000)
    public void testBoundedWithOptimiseAcknowledge() throws Exception {
        runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false);
    }

    public void runTest(String str, int i, int i2, int i3, boolean z, boolean z2, boolean z3, boolean z4) throws Exception {
        BrokerService createBroker = createBroker(z4);
        createBroker.setDestinationPolicy(buildPolicy(TOPIC, i2, i3, z, z2));
        createBroker.start();
        Session buildSession = buildSession("SlowConsumer", URL, z3);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final ArrayList arrayList = i <= 1000 ? new ArrayList() : null;
        MessageConsumer createSubscriber = createSubscriber(buildSession, new MessageListener() { // from class: org.apache.activemq.bugs.ActiveMQSlowConsumerManualTest.1
            public void onMessage(Message message) {
                try {
                    atomicInteger.incrementAndGet();
                    int parseInt = Integer.parseInt(((TextMessage) message).getText());
                    if (arrayList != null) {
                        arrayList.add(Integer.valueOf(parseInt));
                    }
                    if (parseInt % NetworkedSyncTest.MESSAGE_COUNT == 0) {
                        System.out.println("SlowConsumer: Receive " + parseInt);
                    }
                    countDownLatch.await();
                } catch (Exception e) {
                }
            }
        });
        Session buildSession2 = buildSession("FastConsumer", URL, z3);
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        final ArrayList arrayList2 = i <= 1000 ? new ArrayList() : null;
        MessageConsumer createSubscriber2 = createSubscriber(buildSession2, new MessageListener() { // from class: org.apache.activemq.bugs.ActiveMQSlowConsumerManualTest.2
            public void onMessage(Message message) {
                try {
                    atomicInteger2.incrementAndGet();
                    TimeUnit.MILLISECONDS.sleep(5L);
                    int parseInt = Integer.parseInt(((TextMessage) message).getText());
                    if (arrayList2 != null) {
                        arrayList2.add(Integer.valueOf(parseInt));
                    }
                    if (parseInt % NetworkedSyncTest.MESSAGE_COUNT == 0) {
                        System.out.println("FastConsumer: Receive " + parseInt);
                    }
                } catch (Exception e) {
                }
            }
        });
        Thread.sleep(500L);
        AtomicInteger atomicInteger3 = new AtomicInteger();
        ArrayList arrayList3 = i <= 1000 ? new ArrayList() : null;
        Session buildSession3 = buildSession("Publisher", URL, z3);
        MessageProducer createPublisher = createPublisher(buildSession3);
        for (int i4 = 0; i4 < i; i4++) {
            atomicInteger3.incrementAndGet();
            if (arrayList3 != null) {
                arrayList3.add(Integer.valueOf(i4));
            }
            if (i4 % NetworkedSyncTest.MESSAGE_COUNT == 0) {
                System.out.println("Publisher: Send " + i4);
            }
            createPublisher.send(buildSession3.createTextMessage(Integer.toString(i4)));
        }
        Thread.sleep(500L);
        System.out.println(str + ": Publisher Sent: " + atomicInteger3 + " " + arrayList3);
        System.out.println(str + ": Whilst slow consumer blocked:");
        System.out.println("\t\t- SlowConsumer Received: " + atomicInteger + " " + arrayList);
        System.out.println("\t\t- FastConsumer Received: " + atomicInteger2 + " " + arrayList2);
        countDownLatch.countDown();
        Thread.sleep(500L);
        System.out.println(str + ": After slow consumer unblocked:");
        System.out.println("\t\t- SlowConsumer Received: " + atomicInteger + " " + arrayList);
        System.out.println("\t\t- FastConsumer Received: " + atomicInteger2 + " " + arrayList2);
        System.out.println();
        createPublisher.close();
        buildSession3.close();
        createSubscriber.close();
        buildSession.close();
        createSubscriber2.close();
        buildSession2.close();
        createBroker.stop();
        Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", arrayList3, arrayList2);
    }

    private static BrokerService createBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("TestBroker");
        brokerService.setPersistent(z);
        brokerService.addConnector(URL);
        return brokerService;
    }

    private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException {
        MessageConsumer createConsumer = session.createConsumer(TOPIC);
        createConsumer.setMessageListener(messageListener);
        return createConsumer;
    }

    private static MessageProducer createPublisher(Session session) throws JMSException {
        MessageProducer createProducer = session.createProducer(TOPIC);
        createProducer.setDeliveryMode(1);
        return createProducer;
    }

    private static Session buildSession(String str, String str2, boolean z) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(str2);
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setDisableTimeStampsByDefault(true);
        activeMQConnectionFactory.setOptimizeAcknowledge(z);
        if (z) {
            activeMQConnectionFactory.setOptimizeAcknowledgeTimeOut(1L);
        }
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.setClientID(str);
        Session createSession = createConnection.createSession(false, 1);
        createConnection.start();
        return createSession;
    }

    private static PolicyMap buildPolicy(ActiveMQTopic activeMQTopic, int i, int i2, boolean z, boolean z2) {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        if (z) {
            policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
        }
        if (z2) {
            policyEntry.setProducerFlowControl(false);
        }
        if (i > 0) {
            policyEntry.setTopicPrefetch(i);
        }
        if (i2 > 0) {
            ConstantPendingMessageLimitStrategy constantPendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
            constantPendingMessageLimitStrategy.setLimit(i2);
            policyEntry.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
        }
        policyMap.put(activeMQTopic, policyEntry);
        return policyMap;
    }
}
