package org.apache.activemq.artemis.tests.integration.client;

import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageConcurrencyTest.class */
public class MessageConcurrencyTest extends ActiveMQTestBase {
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    private ActiveMQServer server;
    private final SimpleString ADDRESS = new SimpleString("MessageConcurrencyTestAddress");
    private final SimpleString QUEUE_NAME = new SimpleString("MessageConcurrencyTestQueue");
    private ServerLocator locator;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageConcurrencyTest$Sender.class */
    private class Sender extends Thread {
        private final BlockingQueue<ClientMessage> queue = new LinkedBlockingQueue();
        private final ClientProducer producer;
        private final int numMessages;
        volatile boolean failed;

        Sender(int i, ClientProducer clientProducer) {
            this.numMessages = i;
            this.producer = clientProducer;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            for (int i = 0; i < this.numMessages; i++) {
                try {
                    this.producer.send(this.queue.take());
                } catch (Exception e) {
                    MessageConcurrencyTest.log.error("Failed to send message", e);
                    this.failed = true;
                    return;
                }
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(false);
        this.server.start();
        this.locator = createInVMNonHALocator();
    }

    @Test
    public void testMessageConcurrency() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession();
        HashSet hashSet = new HashSet();
        HashSet<Sender> hashSet2 = new HashSet();
        for (int i = 0; i < 100; i++) {
            ClientSession createSession2 = createSessionFactory.createSession();
            hashSet.add(createSession2);
            Sender sender = new Sender(1000, createSession2.createProducer(this.ADDRESS));
            hashSet2.add(sender);
            sender.start();
        }
        for (int i2 = 0; i2 < 1000; i2++) {
            byte[] randomBytes = RandomUtil.randomBytes(1000);
            ClientMessage createMessage = createSession.createMessage(false);
            createMessage.getBodyBuffer().writeBytes(randomBytes);
            Iterator it = hashSet2.iterator();
            while (it.hasNext()) {
                ((Sender) it.next()).queue.add(createMessage);
            }
        }
        for (Sender sender2 : hashSet2) {
            sender2.join();
            assertFalse(sender2.failed);
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            ((ClientSession) it2.next()).close();
        }
        createSession.close();
        createSessionFactory.close();
    }

    @Test
    public void testMessageConcurrencyAfterConsumption() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession();
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        createSession.createQueue(this.ADDRESS, this.QUEUE_NAME);
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE_NAME);
        createSession.start();
        HashSet hashSet = new HashSet();
        final HashSet<Sender> hashSet2 = new HashSet();
        for (int i = 0; i < 100; i++) {
            ClientSession createSession2 = createSessionFactory.createSession();
            hashSet.add(createSession2);
            Sender sender = new Sender(1000, createSession2.createProducer(this.ADDRESS));
            hashSet2.add(sender);
            sender.start();
        }
        createConsumer.setMessageHandler(new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.MessageConcurrencyTest.1
            public void onMessage(ClientMessage clientMessage) {
                Iterator it = hashSet2.iterator();
                while (it.hasNext()) {
                    ((Sender) it.next()).queue.add(clientMessage);
                }
            }
        });
        for (int i2 = 0; i2 < 1000; i2++) {
            byte[] randomBytes = RandomUtil.randomBytes(1000);
            ClientMessage createMessage = createSession.createMessage(false);
            createMessage.getBodyBuffer().writeBytes(randomBytes);
            createProducer.send(createMessage);
        }
        for (Sender sender2 : hashSet2) {
            sender2.join();
            assertFalse(sender2.failed);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((ClientSession) it.next()).close();
        }
        createConsumer.close();
        createSession.deleteQueue(this.QUEUE_NAME);
        createSession.close();
        createSessionFactory.close();
    }
}
