package org.apache.activemq.artemis.tests.integration.paging;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/MultipleProducersPagingTest.class */
public class MultipleProducersPagingTest extends ActiveMQTestBase {
    private static final int CONSUMER_WAIT_TIME_MS = 250;
    private static final int PRODUCERS = 5;
    private static final long MESSAGES_PER_PRODUCER = 2000;
    private static final long TOTAL_MSG = 10000;
    private ExecutorService executor;
    private CountDownLatch runnersLatch;
    private CyclicBarrier barrierLatch;
    private AtomicLong msgReceived;
    private AtomicLong msgSent;
    private final Set<Connection> connections = new HashSet();
    private EmbeddedJMS jmsServer;
    private ConnectionFactory cf;
    private Queue queue;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/MultipleProducersPagingTest$ConsumerRun.class */
    final class ConsumerRun implements Runnable {
        ConsumerRun() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    MessageConsumer createConsumer = MultipleProducersPagingTest.this.createSession().createConsumer(MultipleProducersPagingTest.this.queue);
                    MultipleProducersPagingTest.this.barrierLatch.await();
                    while (createConsumer.receive(250L) != null) {
                        MultipleProducersPagingTest.this.msgReceived.incrementAndGet();
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } finally {
                MultipleProducersPagingTest.this.runnersLatch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/MultipleProducersPagingTest$ProducerRun.class */
    final class ProducerRun implements Runnable {
        ProducerRun() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    Session createSession = MultipleProducersPagingTest.this.createSession();
                    MessageProducer createProducer = createSession.createProducer(MultipleProducersPagingTest.this.queue);
                    MultipleProducersPagingTest.this.barrierLatch.await();
                    for (int i = 0; i < MultipleProducersPagingTest.MESSAGES_PER_PRODUCER; i++) {
                        createProducer.send(createSession.createTextMessage(hashCode() + " counter " + i));
                        MultipleProducersPagingTest.this.msgSent.incrementAndGet();
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } finally {
                MultipleProducersPagingTest.this.runnersLatch.countDown();
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.executor = Executors.newCachedThreadPool();
        ConfigurationImpl connectorConfigurations = createBasicConfig().setPersistenceEnabled(false).setAddressesSettings(Collections.singletonMap("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(50000L).setMaxSizeBytes(404850L))).setAcceptorConfigurations(Collections.singleton(new TransportConfiguration(NettyAcceptorFactory.class.getName()))).setConnectorConfigurations(Collections.singletonMap("netty", new TransportConfiguration(NettyConnectorFactory.class.getName())));
        JMSConfigurationImpl jMSConfigurationImpl = new JMSConfigurationImpl();
        jMSConfigurationImpl.getConnectionFactoryConfigurations().add(new ConnectionFactoryConfigurationImpl().setName("cf").setConnectorNames(Arrays.asList("netty")).setBindings(new String[]{"/cf"}));
        jMSConfigurationImpl.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName("simple").setSelector("").setDurable(false).setBindings(new String[]{"/queue/simple"}));
        this.jmsServer = new EmbeddedJMS();
        this.jmsServer.setConfiguration(connectorConfigurations);
        this.jmsServer.setJmsConfiguration(jMSConfigurationImpl);
        this.jmsServer.start();
        this.cf = (ConnectionFactory) this.jmsServer.lookup("/cf");
        this.queue = (Queue) this.jmsServer.lookup("/queue/simple");
        this.barrierLatch = new CyclicBarrier(6);
        this.runnersLatch = new CountDownLatch(6);
        this.msgReceived = new AtomicLong(0L);
        this.msgSent = new AtomicLong(0L);
    }

    @Test
    public void testQueue() throws InterruptedException {
        this.executor.execute(new ConsumerRun());
        for (int i = 0; i < 5; i++) {
            this.executor.execute(new ProducerRun());
        }
        Assert.assertTrue("must take less than a minute to run", this.runnersLatch.await(1L, TimeUnit.MINUTES));
        Assert.assertEquals("number sent", TOTAL_MSG, this.msgSent.longValue());
        Assert.assertEquals("number received", TOTAL_MSG, this.msgReceived.longValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Session createSession() throws JMSException {
        Connection createConnection = this.cf.createConnection();
        this.connections.add(createConnection);
        createConnection.start();
        return createConnection.createSession(false, 1);
    }

    @After
    public void tearDown() throws Exception {
        this.executor.shutdown();
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.connections.clear();
        if (this.jmsServer != null) {
            this.jmsServer.stop();
        }
        super.tearDown();
    }
}
