package org.apache.activemq.bugs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4368Test.class */
public class AMQ4368Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory connectionFactory;
    private final Destination destination = new ActiveMQQueue("large_message_queue");
    private String connectionUri;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4368Test$Client.class */
    abstract class Client implements Runnable {
        private final String name;
        CountDownLatch startedLatch;
        Connection connection;
        Session session;
        final AtomicBoolean done = new AtomicBoolean();
        CountDownLatch doneLatch = new CountDownLatch(1);
        final AtomicLong size = new AtomicLong();

        Client(String str, CountDownLatch countDownLatch) {
            this.name = str;
            this.startedLatch = countDownLatch;
        }

        public void start() {
            AMQ4368Test.LOG.info("Starting: " + this.name);
            new Thread(this, this.name).start();
        }

        public void stopAsync() {
            this.done.set(true);
        }

        public void stop() throws InterruptedException {
            stopAsync();
            if (this.doneLatch.await(20L, TimeUnit.MILLISECONDS)) {
                return;
            }
            try {
                this.connection.close();
                this.doneLatch.await();
            } catch (Exception e) {
            }
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.connection = createConnection();
                    this.connection.start();
                    try {
                        this.session = createSession();
                        work();
                        try {
                            this.connection.close();
                        } catch (JMSException e) {
                        }
                        AMQ4368Test.LOG.info("Stopped: " + this.name);
                        this.doneLatch.countDown();
                    } catch (Throwable th) {
                        try {
                            this.connection.close();
                        } catch (JMSException e2) {
                        }
                        AMQ4368Test.LOG.info("Stopped: " + this.name);
                        throw th;
                    }
                } catch (Throwable th2) {
                    this.doneLatch.countDown();
                    throw th2;
                }
            } catch (Exception e3) {
                e3.printStackTrace();
                this.done.set(true);
                this.doneLatch.countDown();
            }
        }

        protected Session createSession() throws JMSException {
            return this.connection.createSession(false, 1);
        }

        protected Connection createConnection() throws JMSException {
            return AMQ4368Test.this.connectionFactory.createConnection();
        }

        protected abstract void work() throws Exception;
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4368Test$ConsumingClient.class */
    class ConsumingClient extends Client {
        public ConsumingClient(String str, CountDownLatch countDownLatch) {
            super(str, countDownLatch);
        }

        @Override // org.apache.activemq.bugs.AMQ4368Test.Client
        protected void work() throws Exception {
            MessageConsumer createConsumer = this.session.createConsumer(AMQ4368Test.this.destination);
            this.startedLatch.countDown();
            while (!this.done.get()) {
                if (createConsumer.receive(100L) != null) {
                    this.size.incrementAndGet();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4368Test$ProducingClient.class */
    class ProducingClient extends Client {
        ProducingClient(String str, CountDownLatch countDownLatch) {
            super(str, countDownLatch);
        }

        private String createMessage() {
            StringBuffer stringBuffer = new StringBuffer();
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= 1000000) {
                    return stringBuffer.toString();
                }
                stringBuffer.append("1234567890");
                j = j2 + 1;
            }
        }

        @Override // org.apache.activemq.bugs.AMQ4368Test.Client
        protected void work() throws Exception {
            String createMessage = createMessage();
            MessageProducer createProducer = this.session.createProducer(AMQ4368Test.this.destination);
            this.startedLatch.countDown();
            while (!this.done.get()) {
                createProducer.send(this.session.createTextMessage(createMessage));
                long incrementAndGet = this.size.incrementAndGet();
                if (incrementAndGet % 1000 == 0) {
                    AMQ4368Test.LOG.info("produced " + incrementAndGet + ".");
                }
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        this.connectionUri = this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).getPublishableConnectString();
        this.broker.start();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setUseCache(false);
        brokerService.setDestinationPolicy(new PolicyMap());
        brokerService.getDestinationPolicy().setDefaultEntry(policyEntry);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
        kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
        kahaDBPersistenceAdapter.setCleanupInterval(1000L);
        kahaDBPersistenceAdapter.deleteAllMessages();
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
        brokerService.getSystemUsage().getMemoryUsage().setLimit(104857600L);
        return brokerService;
    }

    @Test
    public void testENTMQ220() throws InterruptedException, JMSException {
        LOG.info("Start test.");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        ProducingClient producingClient = new ProducingClient("1", countDownLatch);
        ProducingClient producingClient2 = new ProducingClient("2", countDownLatch2);
        ConsumingClient consumingClient = new ConsumingClient("subscriber-1", countDownLatch3);
        try {
            producingClient.start();
            producingClient2.start();
            consumingClient.start();
            countDownLatch.await(15L, TimeUnit.SECONDS);
            countDownLatch2.await(15L, TimeUnit.SECONDS);
            countDownLatch3.await(15L, TimeUnit.SECONDS);
            long j = consumingClient.size.get();
            for (int i = 0; i < 10; i++) {
                Thread.sleep(2000L);
                long j2 = consumingClient.size.get();
                LOG.info("Listener 1: consumed: " + (j2 - j));
                Assert.assertTrue("No messages received on iteration: " + i, j2 > j);
                j = j2;
            }
            LOG.info("Stopping clients");
            producingClient.stop();
            producingClient2.stop();
            consumingClient.stop();
        } catch (Throwable th) {
            LOG.info("Stopping clients");
            producingClient.stop();
            producingClient2.stop();
            consumingClient.stop();
            throw th;
        }
    }
}
