package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2413Test.class */
public class AMQ2413Test extends CombinationTestSupport implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2413Test.class);
    BrokerService broker;
    private ActiveMQConnectionFactory factory;
    private static final int HANG_THRESHOLD = 60;
    private static final int SEND_COUNT = 1000;
    private static final int RECEIVER_THINK_TIME = 1;
    private static final int CONSUMER_COUNT = 1;
    private static final int PRODUCER_COUNT = 50;
    private static final int TO_SEND = 20;
    Semaphore receivedMessages;
    public int deliveryMode = 1;
    public int ackMode = 3;
    public boolean useVMCursor = false;
    public boolean useOptimizeAcks = false;
    private ArrayList<Service> services = new ArrayList<>(51);
    AtomicInteger count = new AtomicInteger(0);
    AtomicBoolean running = new AtomicBoolean(false);
    HashMap<ProducerId, boolean[]> tracker = new HashMap<>();

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2413Test$Service.class */
    private interface Service {
        void start() throws Exception;

        void close();
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2413Test$TestConsumer.class */
    private class TestConsumer implements Runnable, Service {
        ActiveMQConnection connection;
        Session session;
        MessageConsumer consumer;
        Thread thread;

        TestConsumer() throws Exception {
            AMQ2413Test.this.factory.setOptimizeAcknowledge(false);
            this.connection = AMQ2413Test.this.factory.createConnection();
            if (AMQ2413Test.this.useOptimizeAcks) {
                this.connection.setOptimizeAcknowledge(true);
            }
            this.session = this.connection.createSession(false, AMQ2413Test.this.ackMode);
            this.consumer = this.session.createConsumer(this.session.createQueue("AMQ2401Test"));
            this.consumer.setMessageListener(AMQ2413Test.this);
        }

        @Override // org.apache.activemq.bugs.AMQ2413Test.Service
        public void start() throws Exception {
            this.connection.start();
        }

        @Override // org.apache.activemq.bugs.AMQ2413Test.Service
        public void close() {
            try {
                this.connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (AMQ2413Test.this.running.get()) {
                try {
                    AMQ2413Test.this.onMessage(this.consumer.receive());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2413Test$TestProducer.class */
    private class TestProducer implements Runnable, Service {
        Thread thread;
        BytesMessage message;
        int id;
        Connection connection;
        Session session;
        MessageProducer producer;

        TestProducer(int i) throws Exception {
            this.id = i;
            this.thread = new Thread(this, "TestProducer-" + i);
            this.connection = AMQ2413Test.this.factory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(false, 3);
            this.producer = this.session.createProducer(this.session.createQueue("AMQ2401Test"));
        }

        @Override // org.apache.activemq.bugs.AMQ2413Test.Service
        public void start() {
            this.thread.start();
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 1;
            while (i <= 20) {
                try {
                    if (i % 100 == 0) {
                        Logger logger = AMQ2413Test.LOG;
                        StringBuilder sb = new StringBuilder();
                        Thread thread = this.thread;
                        logger.info(sb.append(Thread.currentThread().getName()).append(" Sending message ").append(i).toString());
                    }
                    this.message = this.session.createBytesMessage();
                    this.message.writeBytes(new byte[1024]);
                    this.producer.setDeliveryMode(AMQ2413Test.this.deliveryMode);
                    this.producer.send(this.message);
                    i++;
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            Logger logger2 = AMQ2413Test.LOG;
            StringBuilder sb2 = new StringBuilder();
            Thread thread2 = this.thread;
            logger2.info(sb2.append(Thread.currentThread().getName()).append(" Sent: ").append(i - 1).toString());
        }

        @Override // org.apache.activemq.bugs.AMQ2413Test.Service
        public void close() {
            try {
                this.connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    public void initCombos() {
        addCombinationValues("deliveryMode", new Object[]{2, 1});
        addCombinationValues("ackMode", new Object[]{3, 1});
        addCombinationValues("useVMCursor", new Object[]{true, false});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.getPersistenceAdapter().setConcurrentStoreAndDispatchQueues(false);
        this.broker.addConnector("tcp://0.0.0.0:2401");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(1048576L);
        policyEntry.setProducerFlowControl(true);
        if (this.useVMCursor) {
            policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        }
        policyEntry.setQueue(">");
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.count.set(0);
        this.receivedMessages = new Semaphore(0);
        this.factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401");
        setAutoFail(true);
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.running.set(false);
        Iterator<Service> it = this.services.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
        super.tearDown();
    }

    public void testReceipt() throws Exception {
        this.running.set(true);
        Service service = null;
        Service service2 = null;
        for (int i = 0; i < 1; i++) {
            try {
                TestConsumer testConsumer = new TestConsumer();
                testConsumer.start();
                this.services.add(testConsumer);
            } catch (Throwable th) {
                if (0 != 0) {
                    service.close();
                }
                if (0 != 0) {
                    service2.close();
                }
                throw th;
            }
        }
        for (int i2 = 0; i2 < 50; i2++) {
            TestProducer testProducer = new TestProducer(i2);
            testProducer.start();
            this.services.add(testProducer);
        }
        waitForMessageReceipt();
        if (0 != 0) {
            service.close();
        }
        if (0 != 0) {
            service2.close();
        }
    }

    public void onMessage(Message message) {
        this.receivedMessages.release();
        if (this.count.incrementAndGet() % 100 == 0) {
            LOG.info("Received message " + this.count);
        }
        track(message);
        try {
            Thread.currentThread();
            Thread.sleep(1L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void track(Message message) {
        try {
            MessageId messageId = new MessageId(message.getJMSMessageID());
            ProducerId producerId = messageId.getProducerId();
            int producerSequenceId = (int) messageId.getProducerSequenceId();
            boolean[] zArr = this.tracker.get(producerId);
            if (zArr == null) {
                boolean[] zArr2 = new boolean[21];
                zArr2[producerSequenceId] = true;
                this.tracker.put(producerId, zArr2);
            } else {
                assertTrue("not already received: " + messageId, !zArr[producerSequenceId]);
                zArr[producerSequenceId] = true;
            }
        } catch (Exception e) {
            LOG.error(e.toString());
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x002d, code lost:
    
        verifyTracking();
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x0060, code lost:
    
        throw new java.util.concurrent.TimeoutException("@count=" + r5.count.get() + " Message not received for more than " + org.apache.activemq.bugs.AMQ2413Test.HANG_THRESHOLD + " seconds");
     */
    /* JADX WARN: Code restructure failed: missing block: B:8:0x0027, code lost:
    
        if (r5.count.get() != 1000) goto L9;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void waitForMessageReceipt() throws java.lang.InterruptedException, java.util.concurrent.TimeoutException {
        /*
            r5 = this;
        L0:
            r0 = r5
            java.util.concurrent.atomic.AtomicInteger r0 = r0.count     // Catch: java.lang.Throwable -> L6c
            int r0 = r0.get()     // Catch: java.lang.Throwable -> L6c
            r1 = 1000(0x3e8, float:1.401E-42)
            if (r0 >= r1) goto L61
            r0 = r5
            java.util.concurrent.Semaphore r0 = r0.receivedMessages     // Catch: java.lang.Throwable -> L6c
            r1 = 60
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.SECONDS     // Catch: java.lang.Throwable -> L6c
            boolean r0 = r0.tryAcquire(r1, r2)     // Catch: java.lang.Throwable -> L6c
            if (r0 != 0) goto L0
            r0 = r5
            java.util.concurrent.atomic.AtomicInteger r0 = r0.count     // Catch: java.lang.Throwable -> L6c
            int r0 = r0.get()     // Catch: java.lang.Throwable -> L6c
            r1 = 1000(0x3e8, float:1.401E-42)
            if (r0 != r1) goto L2d
            goto L61
        L2d:
            r0 = r5
            r0.verifyTracking()     // Catch: java.lang.Throwable -> L6c
            java.util.concurrent.TimeoutException r0 = new java.util.concurrent.TimeoutException     // Catch: java.lang.Throwable -> L6c
            r1 = r0
            java.lang.StringBuilder r2 = new java.lang.StringBuilder     // Catch: java.lang.Throwable -> L6c
            r3 = r2
            r3.<init>()     // Catch: java.lang.Throwable -> L6c
            java.lang.String r3 = "@count="
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L6c
            r3 = r5
            java.util.concurrent.atomic.AtomicInteger r3 = r3.count     // Catch: java.lang.Throwable -> L6c
            int r3 = r3.get()     // Catch: java.lang.Throwable -> L6c
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L6c
            java.lang.String r3 = " Message not received for more than "
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L6c
            r3 = 60
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L6c
            java.lang.String r3 = " seconds"
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L6c
            java.lang.String r2 = r2.toString()     // Catch: java.lang.Throwable -> L6c
            r1.<init>(r2)     // Catch: java.lang.Throwable -> L6c
            throw r0     // Catch: java.lang.Throwable -> L6c
        L61:
            r0 = r5
            java.util.concurrent.atomic.AtomicBoolean r0 = r0.running
            r1 = 0
            r0.set(r1)
            goto L77
        L6c:
            r6 = move-exception
            r0 = r5
            java.util.concurrent.atomic.AtomicBoolean r0 = r0.running
            r1 = 0
            r0.set(r1)
            r0 = r6
            throw r0
        L77:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.bugs.AMQ2413Test.waitForMessageReceipt():void");
    }

    private void verifyTracking() {
        Vector vector = new Vector();
        for (ProducerId producerId : this.tracker.keySet()) {
            boolean[] zArr = this.tracker.get(producerId);
            for (int i = 1; i < 21; i++) {
                if (!zArr[i]) {
                    vector.add(new MessageId(producerId, i));
                }
            }
        }
        assertTrue("No missing messages: " + vector, vector.isEmpty());
    }

    public static Test suite() {
        return suite(AMQ2413Test.class);
    }
}
