package org.apache.activemq.artemis.tests.integration.addressing;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.class */
public class MulticastTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private SimpleString baseAddress = new SimpleString("multicast.address");
    private AddressInfo addressInfo;
    private ActiveMQServer server;
    private ClientSessionFactory sessionFactory;

    @Before
    public void setup() throws Exception {
        this.server = createServer(true);
        this.server.start();
        this.server.waitForActivation(10L, TimeUnit.SECONDS);
        this.sessionFactory = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{new TransportConfiguration(INVM_CONNECTOR_FACTORY)}).createSessionFactory();
        addSessionFactory(this.sessionFactory);
        this.addressInfo = new AddressInfo(this.baseAddress);
        this.addressInfo.addRoutingType(RoutingType.MULTICAST);
        this.server.addOrUpdateAddressInfo(this.addressInfo);
    }

    @Test
    public void testTxCommitReceive() throws Exception {
        Queue createQueue = this.server.createQueue(new QueueConfiguration(this.baseAddress.concat(".1")).setAddress(this.baseAddress).setMaxConsumers(-1));
        Queue createQueue2 = this.server.createQueue(new QueueConfiguration(this.baseAddress.concat(".2")).setAddress(this.baseAddress).setMaxConsumers(-1));
        ClientSession createSession = this.sessionFactory.createSession(false, false);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(createQueue.getName());
        ClientConsumer createConsumer2 = createSession.createConsumer(createQueue2.getName());
        ClientProducer createProducer = createSession.createProducer(this.baseAddress);
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage((byte) 3, true);
            createMessage.getBodyBuffer().writeString("AnyCast" + i);
            createProducer.send(createMessage);
        }
        assertNull(createConsumer.receiveImmediate());
        assertNull(createConsumer2.receiveImmediate());
        createSession.commit();
        Objects.requireNonNull(createQueue);
        Wait.assertEquals(10L, createQueue::getMessageCount, 5000L);
        Objects.requireNonNull(createQueue2);
        Wait.assertEquals(10L, createQueue2::getMessageCount, 5000L);
        ClientConsumer[] clientConsumerArr = {createConsumer, createConsumer2};
        for (int i2 = 0; i2 < clientConsumerArr.length; i2++) {
            for (int i3 = 0; i3 < 10; i3++) {
                ClientMessage receive = clientConsumerArr[i2].receive(2000L);
                assertNotNull(receive);
                logger.debug("consumer{} received: {}", Integer.valueOf(i2), receive.getBodyBuffer().readString());
            }
            assertNull(clientConsumerArr[i2].receiveImmediate());
            createSession.commit();
            assertNull(clientConsumerArr[i2].receiveImmediate());
        }
        createQueue.deleteQueue();
        createQueue2.deleteQueue();
    }

    @Test
    public void testTxRollbackReceive() throws Exception {
        Queue createQueue = this.server.createQueue(new QueueConfiguration(this.baseAddress.concat(".1")).setAddress(this.baseAddress).setMaxConsumers(-1));
        Queue createQueue2 = this.server.createQueue(new QueueConfiguration(this.baseAddress.concat(".2")).setAddress(this.baseAddress).setMaxConsumers(-1));
        ClientSession createSession = this.sessionFactory.createSession(false, false);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(createQueue.getName());
        ClientConsumer createConsumer2 = createSession.createConsumer(createQueue2.getName());
        ClientProducer createProducer = createSession.createProducer(this.baseAddress);
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage((byte) 3, true);
            createMessage.getBodyBuffer().writeString("AnyCast" + i);
            createProducer.send(createMessage);
        }
        assertNull(createConsumer.receive(200L));
        assertNull(createConsumer2.receive(200L));
        createSession.commit();
        createSession.close();
        assertTrue(TimeUtils.waitOnBoolean(true, 2000L, () -> {
            return 10 == createQueue.getMessageCount();
        }));
        assertTrue(TimeUtils.waitOnBoolean(true, 2000L, () -> {
            return 10 == createQueue2.getMessageCount();
        }));
        ClientSession createSession2 = this.sessionFactory.createSession(false, false);
        ClientSession createSession3 = this.sessionFactory.createSession(false, false);
        createSession2.start();
        createSession3.start();
        ClientConsumer createConsumer3 = createSession2.createConsumer(createQueue.getName());
        ClientConsumer createConsumer4 = createSession3.createConsumer(createQueue2.getName());
        ClientConsumer[] clientConsumerArr = new ClientConsumer[2];
        clientConsumerArr[0] = createConsumer3;
        clientConsumerArr[1] = createConsumer4;
        ClientSession[] clientSessionArr = new ClientSession[2];
        clientSessionArr[0] = createSession2;
        clientSessionArr[1] = createSession3;
        Queue[] queueArr = {createQueue, createQueue2};
        for (int i2 = 0; i2 < clientConsumerArr.length; i2++) {
            for (int i3 = 0; i3 < 10; i3++) {
                ClientMessage receive = clientConsumerArr[i2].receive(2000L);
                assertNotNull(receive);
                logger.debug("consumer{} received: {}", Integer.valueOf(i2), receive.getBodyBuffer().readString());
            }
            assertNull(clientConsumerArr[i2].receive(200L));
            clientSessionArr[i2].rollback();
            clientSessionArr[i2].close();
            clientSessionArr[i2] = this.sessionFactory.createSession(false, false);
            clientSessionArr[i2].start();
            clientConsumerArr[i2] = clientSessionArr[i2].createConsumer(queueArr[i2].getName());
            for (int i4 = 0; i4 < 10; i4++) {
                ClientMessage receive2 = clientConsumerArr[i2].receive(2000L);
                assertNotNull(receive2);
                logger.debug("consumer{} received: {}", Integer.valueOf(i2), receive2.getBodyBuffer().readString());
            }
            assertNull(clientConsumerArr[i2].receive(200L));
            clientSessionArr[i2].commit();
            assertNull(clientConsumerArr[i2].receive(200L));
            clientSessionArr[i2].close();
        }
        createQueue.deleteQueue();
        createQueue2.deleteQueue();
    }
}
