package org.apache.activemq.artemis.tests.integration.client;

import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MultipleThreadFilterOneTest.class */
public class MultipleThreadFilterOneTest extends ActiveMQTestBase {
    private static final int PAGE_MAX = 102400;
    private static final int PAGE_SIZE = 10240;
    final String ADDRESS = "ADDRESS";
    final int numberOfMessages = 2000;
    final int nThreads = 4;
    private boolean isNetty = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MultipleThreadFilterOneTest$SomeConsumer.class */
    public class SomeConsumer extends Thread {
        final ClientSessionFactory factory;
        final ServerLocator locator;
        final ClientSession consumerSession;
        ClientConsumer consumer;
        final int nr;
        final AtomicInteger errors = new AtomicInteger(0);

        SomeConsumer(int i) throws Exception {
            this.locator = MultipleThreadFilterOneTest.this.createNonHALocator(MultipleThreadFilterOneTest.this.isNetty);
            this.factory = this.locator.createSessionFactory();
            this.consumerSession = this.factory.createSession(false, false);
            this.consumerSession.createQueue(new QueueConfiguration("Q" + i).setAddress("ADDRESS").setFilterString("prodNR=" + i));
            this.consumer = this.consumerSession.createConsumer("Q" + i);
            this.consumerSession.start();
            this.nr = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.consumerSession.start();
                for (int i = 0; i < 2000; i++) {
                    ClientMessage receive = this.consumer.receive(15000L);
                    Assert.assertNotNull(receive);
                    Assert.assertEquals(this.nr, receive.getIntProperty("prodNR").intValue());
                    receive.acknowledge();
                    if (i % 500 == 0) {
                        MultipleThreadFilterOneTest.this.instanceLog.debug("Consumed " + i);
                        this.consumerSession.commit();
                    }
                }
                Assert.assertNull(this.consumer.receiveImmediate());
                this.consumerSession.commit();
            } catch (Throwable th) {
                th.printStackTrace();
                this.errors.incrementAndGet();
            } finally {
                close();
            }
        }

        public void close() {
            try {
                this.consumerSession.close();
                this.locator.close();
            } catch (Throwable th) {
                th.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MultipleThreadFilterOneTest$SomeProducer.class */
    public class SomeProducer extends Thread {
        final ClientSessionFactory factory;
        final ServerLocator locator;
        final ClientSession prodSession;
        public final AtomicInteger errors = new AtomicInteger(0);

        SomeProducer() throws Exception {
            this.locator = MultipleThreadFilterOneTest.this.createNonHALocator(MultipleThreadFilterOneTest.this.isNetty);
            this.factory = this.locator.createSessionFactory();
            this.prodSession = this.factory.createSession(false, false);
            sendMessages(1000);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    sendMessages(1000);
                } catch (Throwable th) {
                    th.printStackTrace();
                    this.errors.incrementAndGet();
                    try {
                        this.prodSession.close();
                        this.locator.close();
                    } catch (Throwable th2) {
                        th2.printStackTrace();
                    }
                }
            } finally {
                try {
                    this.prodSession.close();
                    this.locator.close();
                } catch (Throwable th3) {
                    th3.printStackTrace();
                }
            }
        }

        private void sendMessages(int i) throws ActiveMQException {
            ClientProducer createProducer = this.prodSession.createProducer("ADDRESS");
            for (int i2 = 0; i2 < i; i2++) {
                ClientMessage createMessage = this.prodSession.createMessage(true);
                createMessage.putIntProperty("prodNR", i2 % 4);
                createProducer.send(createMessage);
                if (i2 % 100 == 0) {
                    this.prodSession.commit();
                }
            }
            this.prodSession.commit();
            createProducer.close();
        }
    }

    @Test
    public void testSendingNetty() throws Exception {
        testSending(true, false);
    }

    @Test
    public void testSendingNettyPaging() throws Exception {
        testSending(true, true);
    }

    @Test
    public void testSendingInVM() throws Exception {
        testSending(false, false);
    }

    @Test
    public void testSendingInVMPaging() throws Exception {
        testSending(false, true);
    }

    private void testSending(boolean z, boolean z2) throws Exception {
        this.isNetty = z;
        ActiveMQServer createServer = z2 ? createServer(true, createDefaultConfig(z), PAGE_SIZE, 102400L, new HashMap()) : createServer(true, z);
        createServer.getConfiguration().setMessageExpiryScanPeriod(1000L);
        createServer.start();
        SomeConsumer[] someConsumerArr = new SomeConsumer[4];
        SomeProducer[] someProducerArr = new SomeProducer[4];
        SomeConsumer[] someConsumerArr2 = null;
        for (int i = 0; i < 4; i++) {
            try {
                someConsumerArr[i] = new SomeConsumer(i);
            } catch (Throwable th) {
                createServer.stop();
                throw th;
            }
        }
        for (int i2 = 0; i2 < 4; i2++) {
            someProducerArr[i2] = new SomeProducer();
        }
        if (1 != 0) {
            someConsumerArr2 = new SomeConsumer[20];
            for (int i3 = 0; i3 < 20; i3++) {
                someConsumerArr2[i3] = new SomeConsumer(i3 + 4);
            }
        }
        for (int i4 = 0; i4 < 4; i4++) {
            someConsumerArr[i4].start();
            someProducerArr[i4].start();
        }
        for (SomeProducer someProducer : someProducerArr) {
            someProducer.join();
            Assert.assertEquals(0L, r0.errors.get());
        }
        for (SomeConsumer someConsumer : someConsumerArr) {
            someConsumer.join();
            Assert.assertEquals(0L, r0.errors.get());
        }
        if (1 != 0) {
            for (SomeConsumer someConsumer2 : someConsumerArr2) {
                someConsumer2.close();
            }
        }
        waitForNotPaging(createServer.locateQueue(new SimpleString("Q1")));
        createServer.stop();
    }
}
