package org.apache.activemq.bugs;

import java.util.HashSet;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4485Test.class */
public class AMQ4485Test extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class);
    BrokerService broker;
    ActiveMQConnectionFactory factory;
    final int messageCount = 20;
    int memoryLimit = 40960;
    final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + getClass().getName());
    final Vector<Throwable> exceptions = new Vector<>();
    final CountDownLatch slowSendResume = new CountDownLatch(1);

    protected void configureBroker(long j) throws Exception {
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setAdvisorySupport(false);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setMemoryLimit(j);
        policyEntry.setProducerFlowControl(false);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.bugs.AMQ4485Test.1
            public void send(ProducerBrokerExchange producerBrokerExchange, final Message message) throws Exception {
                if (message.isInTransaction() && message.getProperty("NUM") != null) {
                    final Integer num = (Integer) message.getProperty("NUM");
                    AMQ4485Test.this.broker.getBroker().getAdaptor(TransactionBroker.class).getTransaction(producerBrokerExchange.getConnectionContext(), message.getTransactionId(), false).addSynchronization(new Synchronization() { // from class: org.apache.activemq.bugs.AMQ4485Test.1.1
                        public void afterCommit() throws Exception {
                            AMQ4485Test.LOG.error("AfterCommit, NUM:" + num + ", " + message.getMessageId() + ", tx: " + message.getTransactionId());
                            if (num.intValue() == 5) {
                                AMQ4485Test.LOG.error("Pausing on latch in afterCommit for: " + num + ", " + message.getMessageId());
                                AMQ4485Test.this.slowSendResume.await(20L, TimeUnit.SECONDS);
                                AMQ4485Test.LOG.error("resuming on latch afterCommit for: " + num + ", " + message.getMessageId());
                            } else if (21 == num.intValue()) {
                                AMQ4485Test.LOG.error("releasing latch. " + num + ", " + message.getMessageId());
                                AMQ4485Test.this.slowSendResume.countDown();
                                TimeUnit.SECONDS.sleep(5L);
                                AMQ4485Test.LOG.error("resuming afterCommit for: " + num + ", " + message.getMessageId());
                            }
                        }
                    });
                }
                super.send(producerBrokerExchange, message);
            }
        }});
    }

    public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception {
        HashSet hashSet = new HashSet();
        final Vector vector = new Vector();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 1; i <= 20; i++) {
            vector.add(send(i, 1, true));
            hashSet.add(Integer.valueOf(i));
        }
        for (int i2 = 0; i2 < 20; i2++) {
            final int i3 = i2;
            newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4485Test.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ((Session) vector.get(i3)).commit();
                    } catch (Exception e) {
                        AMQ4485Test.this.exceptions.add(e);
                    }
                }
            });
        }
        TimeUnit.SECONDS.sleep(3L);
        LOG.info("Big send to blow available destination usage before slow send resumes");
        send(21, 35840, true).commit();
        Connection createConnection = this.factory.createConnection();
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(this.destination);
        for (int i4 = 1; i4 <= 21; i4++) {
            ActiveMQBytesMessage activeMQBytesMessage = (BytesMessage) createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
            assertNotNull("Got message: " + i4 + ", " + hashSet, activeMQBytesMessage);
            LOG.info("got: " + hashSet + ", " + activeMQBytesMessage.getMessageId() + ", NUM=" + activeMQBytesMessage.getProperty("NUM"));
            hashSet.remove(activeMQBytesMessage.getProperty("NUM"));
        }
    }

    private Session send(int i, int i2, boolean z) throws Exception {
        Connection createConnection = this.factory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(z, z ? 0 : 1);
        MessageProducer createProducer = createSession.createProducer(this.destination);
        ActiveMQBytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[i2]);
        createBytesMessage.setIntProperty("NUM", i);
        createProducer.send(createBytesMessage);
        LOG.info("Sent:" + createBytesMessage.getJMSMessageID() + " session tx: " + createBytesMessage.getTransactionId());
        return createSession;
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.broker.setBrokerName("thisOne");
        configureBroker(this.memoryLimit);
        this.broker.start();
        this.factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
        this.factory.setWatchTopicAdvisories(false);
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
            this.broker = null;
        }
    }
}
