package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/PfcTimeoutTest.class */
public class PfcTimeoutTest {
    private static final Logger LOG = LoggerFactory.getLogger(PfcTimeoutTest.class);
    private static final String TRANSPORT_URL = "tcp://0.0.0.0:0";
    private static final String DESTINATION = "testQ1";

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setProducerFlowControl(true);
        policyEntry.setMemoryLimit(10240L);
        policyEntry.setCursorMemoryHighWaterMark(140);
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setQueue(">");
        arrayList.add(policyEntry);
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector(TRANSPORT_URL);
        brokerService.start();
        return brokerService;
    }

    @Test
    public void testTransactedSendWithTimeout() throws Exception {
        BrokerService createBroker = createBroker();
        createBroker.waitUntilStarted();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            sendMessages(createBroker, countDownLatch, 5000, 3);
            Assert.assertTrue(countDownLatch.await(5000 * 2, TimeUnit.MILLISECONDS));
            createBroker.stop();
            createBroker.waitUntilStopped();
        } catch (Throwable th) {
            createBroker.stop();
            createBroker.waitUntilStopped();
            throw th;
        }
    }

    @Test
    public void testTransactedSendWithTimeoutRollbackUsage() throws Exception {
        BrokerService createBroker = createBroker();
        createBroker.waitUntilStarted();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            int sendMessages = sendMessages(createBroker, countDownLatch, 5000, 3);
            Assert.assertTrue(countDownLatch.await(5000 * 2, TimeUnit.MILLISECONDS));
            consumeMessages(createBroker, sendMessages);
            QueueView queueView = getQueueView(createBroker, DESTINATION);
            long queueSize = queueView.getQueueSize();
            long cursorMemoryUsage = queueView.getCursorMemoryUsage();
            LOG.info("queueSize after test = " + queueSize);
            LOG.info("memoryUsage after test = " + cursorMemoryUsage);
            Assert.assertEquals("queue size after test ", 0L, queueSize);
            Assert.assertEquals("memory size after test ", 0L, cursorMemoryUsage);
            createBroker.stop();
            createBroker.waitUntilStopped();
        } catch (Throwable th) {
            createBroker.stop();
            createBroker.waitUntilStopped();
            throw th;
        }
    }

    private int sendMessages(BrokerService brokerService, CountDownLatch countDownLatch, int i, int i2) throws Exception {
        int i3 = 0;
        ActiveMQConnectionFactory newConnectionFactory = newConnectionFactory(brokerService);
        newConnectionFactory.setSendTimeout(i);
        ActiveMQConnection createConnection = newConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        try {
            try {
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(DESTINATION));
                TextMessage createTextMessage = createSession.createTextMessage(createTextMessage(5000));
                for (int i4 = 0; i4 < i2; i4++) {
                    createProducer.send(createTextMessage);
                    createSession.commit();
                    i3++;
                }
                LOG.info(" Finished after producing : " + i3);
                int i5 = i3;
                if (createConnection != null) {
                    createConnection.close();
                }
                return i5;
            } catch (Exception e) {
                LOG.info("Exception received producing ", e);
                LOG.info("finishing after exception :" + i3);
                LOG.info("rolling back current transaction ");
                countDownLatch.countDown();
                createSession.rollback();
                int i6 = i3;
                if (createConnection != null) {
                    createConnection.close();
                }
                return i6;
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    private String createTextMessage(int i) {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i2 = 0; i2 < i; i2++) {
            stringBuffer.append("9");
        }
        return stringBuffer.toString();
    }

    private ActiveMQConnectionFactory newConnectionFactory(BrokerService brokerService) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory;
    }

    private int consumeMessages(BrokerService brokerService, int i) throws Exception {
        int i2 = 0;
        ActiveMQConnection createConnection = newConnectionFactory(brokerService).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        try {
            try {
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(DESTINATION));
                for (int i3 = 0; i3 < i; i3++) {
                    createConsumer.receive(1000L);
                    i2++;
                }
                LOG.info(" Finished after consuming  : " + i2);
                int i4 = i2;
                if (createConnection != null) {
                    createConnection.close();
                }
                return i4;
            } catch (Exception e) {
                LOG.info("Exception received producing ", e);
                LOG.info("finishing after exception :" + i2);
                int i5 = i2;
                if (createConnection != null) {
                    createConnection.close();
                }
                return i5;
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    private QueueView getQueueView(BrokerService brokerService, String str) throws Exception {
        Map queueViews = brokerService.getAdminView().getBroker().getQueueViews();
        Iterator it = queueViews.keySet().iterator();
        while (it.hasNext()) {
            QueueView queueView = (DestinationView) queueViews.get((ObjectName) it.next());
            if (queueView instanceof QueueView) {
                QueueView queueView2 = queueView;
                if (queueView2.getName().equals(str)) {
                    return queueView2;
                }
            }
        }
        return null;
    }
}
