package org.apache.activemq.artemis.tests.integration.server;

import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/server/RingQueueTest.class */
public class RingQueueTest extends ActiveMQTestBase {
    private ActiveMQServer server;
    private final SimpleString address = SimpleString.of("RingQueueTestAddress");
    private final SimpleString qName = SimpleString.of("RingQueueTestQ1");

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/server/RingQueueTest$SomeProducer.class */
    class SomeProducer extends Thread {
        final ClientSessionFactory factory;
        final ServerLocator locator;
        final ClientSession prodSession;
        public final AtomicInteger errors = new AtomicInteger(0);
        final long numberOfMessages;
        final int nThreads;
        final SimpleString address;

        SomeProducer(long j, int i, SimpleString simpleString) throws Exception {
            this.locator = RingQueueTest.this.createNettyNonHALocator();
            this.factory = this.locator.createSessionFactory();
            this.prodSession = this.factory.createSession(true, false);
            this.numberOfMessages = j;
            this.nThreads = i;
            this.address = simpleString;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    ClientProducer createProducer = this.prodSession.createProducer(this.address);
                    for (int i = 0; i < this.numberOfMessages; i++) {
                        ClientMessage createMessage = this.prodSession.createMessage(true);
                        createMessage.putIntProperty("prodNR", i % this.nThreads);
                        createProducer.send(createMessage);
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                    this.errors.incrementAndGet();
                    try {
                        this.prodSession.close();
                        this.locator.close();
                    } catch (Throwable th2) {
                        th2.printStackTrace();
                    }
                }
            } finally {
                try {
                    this.prodSession.close();
                    this.locator.close();
                } catch (Throwable th3) {
                    th3.printStackTrace();
                }
            }
        }
    }

    @Test
    public void testSimple() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, true));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(1L));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(1L, locateQueue.getRingSize());
        ClientProducer createProducer = addClientSession.createProducer(this.address);
        int i = 0;
        int i2 = 0;
        while (i < 500) {
            createProducer.send(createTextMessage(addClientSession, "hello" + i));
            Wait.assertTrue(() -> {
                return locateQueue.getMessageCount() == 1;
            });
            createProducer.send(createTextMessage(addClientSession, "hello" + (i + 1)));
            int i3 = i2 + 1;
            Wait.assertTrue(() -> {
                return locateQueue.getMessagesReplaced() == ((long) i3);
            });
            Wait.assertTrue(() -> {
                return locateQueue.getMessageCount() == 1;
            });
            ClientConsumer createConsumer = addClientSession.createConsumer(this.qName);
            ClientMessage receiveImmediate = createConsumer.receiveImmediate();
            receiveImmediate.acknowledge();
            createConsumer.close();
            Assertions.assertEquals("hello" + (i + 1), receiveImmediate.getBodyBuffer().readString());
            i += 2;
            i2++;
        }
    }

    @Test
    public void testRollback() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, false));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(1L));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(1L, locateQueue.getRingSize());
        ClientProducer createProducer = addClientSession.createProducer(this.address);
        createProducer.send(createTextMessage(addClientSession, "hello0"));
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 1;
        });
        ClientConsumer createConsumer = addClientSession.createConsumer(this.qName);
        ClientMessage receiveImmediate = createConsumer.receiveImmediate();
        Assertions.assertNotNull(receiveImmediate);
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 1;
        });
        receiveImmediate.acknowledge();
        Assertions.assertEquals("hello0", receiveImmediate.getBodyBuffer().readString());
        createProducer.send(createTextMessage(addClientSession, "hello1"));
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 2;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesReplaced() == 0;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 2;
        });
        addClientSession.rollback();
        createConsumer.close();
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 0;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesReplaced() == 1;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 1;
        });
        ClientMessage receiveImmediate2 = addClientSession.createConsumer(this.qName).receiveImmediate();
        Assertions.assertNotNull(receiveImmediate2);
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 1;
        });
        receiveImmediate2.acknowledge();
        addClientSession.commit();
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesAcknowledged() == 1;
        });
        Assertions.assertEquals("hello1", receiveImmediate2.getBodyBuffer().readString());
    }

    @Test
    public void testConsumerCloseWithDirectDeliver() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, false));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(1L));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(1L, locateQueue.getRingSize());
        ClientConsumer createConsumer = addClientSession.createConsumer(this.qName);
        ClientProducer createProducer = addClientSession.createProducer(this.address);
        createProducer.send(createTextMessage(addClientSession, "hello0"));
        createProducer.send(createTextMessage(addClientSession, "hello1"));
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(2L, locateQueue::getMessageCount);
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(2, locateQueue::getDeliveringCount);
        createConsumer.close();
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(1L, locateQueue::getMessageCount);
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(0, locateQueue::getDeliveringCount);
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(1L, locateQueue::getMessagesReplaced);
        ClientConsumer createConsumer2 = addClientSession.createConsumer(this.qName);
        ClientMessage receiveImmediate = createConsumer2.receiveImmediate();
        Assertions.assertNotNull(receiveImmediate);
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 1;
        });
        receiveImmediate.acknowledge();
        addClientSession.commit();
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesAcknowledged() == 1;
        });
        Assertions.assertEquals("hello1", receiveImmediate.getBodyBuffer().readString());
        createConsumer2.close();
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 0;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 0;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesReplaced() == 1;
        });
    }

    @Test
    public void testScheduled() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, false));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(1L));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(1L, locateQueue.getRingSize());
        ClientProducer createProducer = addClientSession.createProducer(this.address);
        ClientMessage createTextMessage = createTextMessage(addClientSession, "hello0");
        createTextMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 500);
        createProducer.send(createTextMessage);
        Wait.assertTrue(() -> {
            return locateQueue.getScheduledCount() == 1;
        });
        Wait.assertTrue(() -> {
            return ((QueueImpl) locateQueue).getPendingMessageCount() == 0;
        });
        createTextMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 500);
        createProducer.send(createTextMessage);
        Wait.assertTrue(() -> {
            return locateQueue.getScheduledCount() == 2;
        });
        Wait.assertTrue(() -> {
            return ((QueueImpl) locateQueue).getPendingMessageCount() == 0;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesReplaced() == 1;
        });
        Wait.assertTrue(() -> {
            return ((QueueImpl) locateQueue).getPendingMessageCount() == 1;
        });
    }

    @Test
    public void testDefaultAddressSetting() throws Exception {
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        this.server.getAddressSettingsRepository().addMatch(this.address.toString(), new AddressSettings().setDefaultRingSize(100L));
        Session createSession = new ActiveMQConnectionFactory("vm://0").createConnection().createSession();
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.address.toString()));
        createProducer.send(createSession.createMessage());
        Wait.assertTrue(() -> {
            return this.server.locateQueue(this.address) != null;
        });
        Assertions.assertEquals(100L, this.server.locateQueue(this.address).getRingSize());
        createProducer.close();
        createSession.createProducer(createSession.createQueue(randomSimpleString.toString())).send(createSession.createMessage());
        Wait.assertTrue(() -> {
            return this.server.locateQueue(randomSimpleString) != null;
        });
        Assertions.assertEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), this.server.locateQueue(randomSimpleString).getRingSize());
    }

    @Test
    public void testUpdate() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, true));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRoutingType(RoutingType.ANYCAST));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(-1L, locateQueue.getRingSize());
        ClientProducer createProducer = addClientSession.createProducer(this.address);
        for (int i = 0; i < 100; i++) {
            createProducer.send(addClientSession.createMessage(true));
        }
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 100;
        });
        locateQueue.setRingSize(10L);
        ClientConsumer createConsumer = addClientSession.createConsumer(this.qName);
        for (int i2 = 0; i2 < 95; i2++) {
            createConsumer.receiveImmediate().acknowledge();
        }
        createConsumer.close();
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(5L, locateQueue::getMessageCount);
        for (int i3 = 0; i3 < 5; i3++) {
            createProducer.send(addClientSession.createMessage(true));
        }
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(10L, locateQueue::getMessageCount);
        for (int i4 = 0; i4 < 5; i4++) {
            createProducer.send(addClientSession.createMessage(true));
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(10L, locateQueue::getMessageCount);
        }
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(5L, locateQueue::getMessagesReplaced);
        ClientConsumer createConsumer2 = addClientSession.createConsumer(this.qName);
        ClientMessage receiveImmediate = createConsumer2.receiveImmediate();
        Assertions.assertNotNull(receiveImmediate);
        receiveImmediate.acknowledge();
        createConsumer2.close();
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 9;
        });
        locateQueue.setRingSize(5L);
        ClientConsumer createConsumer3 = addClientSession.createConsumer(this.qName);
        for (int i5 = 0; i5 < 4; i5++) {
            createConsumer3.receiveImmediate().acknowledge();
        }
        createConsumer3.close();
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 5;
        });
        createProducer.send(addClientSession.createMessage(true));
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesReplaced() == 6;
        });
        locateQueue.setRingSize(10L);
        for (int i6 = 0; i6 < 5; i6++) {
            createProducer.send(addClientSession.createMessage(true));
        }
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 10;
        });
        createProducer.send(addClientSession.createMessage(true));
        Wait.assertTrue(() -> {
            return locateQueue.getMessagesReplaced() == 7;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 10;
        });
    }

    @Test
    public void testNonDestructive() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, true));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(1L).setNonDestructive(true));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(1L, locateQueue.getRingSize());
        ClientProducer createProducer = addClientSession.createProducer(this.address);
        createProducer.send(createTextMessage(addClientSession, "hello0"));
        for (int i = 0; i < 5; i++) {
            Wait.assertTrue(() -> {
                return locateQueue.getMessageCount() == 1;
            });
            createProducer.send(createTextMessage(addClientSession, "hello" + (i + 1)));
            int i2 = i + 1;
            Wait.assertTrue(() -> {
                return locateQueue.getMessagesReplaced() == ((long) i2);
            });
            Wait.assertTrue(() -> {
                return locateQueue.getMessageCount() == 1;
            });
            ClientConsumer createConsumer = addClientSession.createConsumer(this.qName);
            ClientMessage receiveImmediate = createConsumer.receiveImmediate();
            Assertions.assertNotNull(receiveImmediate);
            receiveImmediate.acknowledge();
            createConsumer.close();
            Assertions.assertEquals("hello" + (i + 1), receiveImmediate.getBodyBuffer().readString());
        }
    }

    @Test
    public void testNonDestructiveWithConsumerClose() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, true));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(1L).setNonDestructive(true));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(1L, locateQueue.getRingSize());
        addClientSession.createProducer(this.address).send(createTextMessage(addClientSession, "hello0"));
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 1;
        });
        ClientConsumer createConsumer = addClientSession.createConsumer(this.qName);
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 1;
        });
        createConsumer.close();
        Wait.assertTrue(() -> {
            return locateQueue.getDeliveringCount() == 0;
        });
        Wait.assertTrue(() -> {
            return locateQueue.getMessageCount() == 1;
        });
    }

    @Test
    public void testMultipleConcurrentProducers() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0)).createSession(false, true, true));
        addClientSession.createQueue(QueueConfiguration.of(this.qName).setAddress(this.address).setRingSize(25L));
        addClientSession.start();
        Queue locateQueue = this.server.locateQueue(this.qName);
        Assertions.assertEquals(25L, locateQueue.getRingSize());
        SomeProducer[] someProducerArr = new SomeProducer[25];
        for (int i = 0; i < 25; i++) {
            try {
                someProducerArr[i] = new SomeProducer(25L, 25, this.address);
            } catch (Exception e) {
                e.printStackTrace();
                Assertions.fail(e.getMessage());
            }
        }
        for (int i2 = 0; i2 < 25; i2++) {
            someProducerArr[i2].start();
        }
        for (SomeProducer someProducer : someProducerArr) {
            someProducer.join();
            Assertions.assertEquals(0, someProducer.errors.get());
        }
        Wait.assertTrue("message count should be 25 but it's actually " + locateQueue.getMessageCount(), () -> {
            return locateQueue.getMessageCount() == 25;
        });
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(ActiveMQServers.newActiveMQServer(createDefaultNettyConfig(), true));
        this.server.start();
    }
}
