package org.apache.activemq.artemis.tests.integration.xa;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/xa/SessionFailureXATest.class */
public class SessionFailureXATest extends ActiveMQTestBase {
    private ActiveMQServer messagingService;
    private ClientSession clientSession;
    private ClientSessionFactory sessionFactory;
    private Configuration configuration;
    private ServerLocator locator;
    private StoreConfiguration.StoreType storeType;
    private final Map<String, AddressSettings> addressSettings = new HashMap();
    private final SimpleString atestq = SimpleString.of("BasicXaTestq");

    public SessionFailureXATest(StoreConfiguration.StoreType storeType) {
        this.storeType = storeType;
    }

    @Parameters(name = "storeType={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{StoreConfiguration.StoreType.FILE});
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.addressSettings.clear();
        if (this.storeType == StoreConfiguration.StoreType.DATABASE) {
            this.configuration = createDefaultJDBCConfig(true);
        } else {
            this.configuration = createDefaultNettyConfig();
        }
        this.messagingService = createServer(true, this.configuration, -1, -1L, this.addressSettings);
        this.messagingService.start();
        this.locator = createInVMNonHALocator();
        this.locator.setAckBatchSize(0);
        this.sessionFactory = createSessionFactory(this.locator);
        this.clientSession = addClientSession(this.sessionFactory.createSession(true, false, false));
        this.clientSession.createQueue(QueueConfiguration.of(this.atestq));
    }

    @TestTemplate
    public void testFailureWithXAEnd() throws Exception {
        testFailure(true, false);
    }

    @TestTemplate
    public void testFailureWithoutXAEnd() throws Exception {
        testFailure(false, false);
    }

    @TestTemplate
    public void testFailureWithXAPrepare() throws Exception {
        testFailure(true, true);
    }

    public void testFailure(boolean z, boolean z2) throws Exception {
        ClientSession createSession = this.sessionFactory.createSession(false, true, true);
        try {
            ClientProducer createProducer = createSession.createProducer(this.atestq);
            ClientMessage createTextMessage = createTextMessage(createSession, "m1");
            ClientMessage createTextMessage2 = createTextMessage(createSession, "m2");
            ClientMessage createTextMessage3 = createTextMessage(createSession, "m3");
            ClientMessage createTextMessage4 = createTextMessage(createSession, "m4");
            createProducer.send(createTextMessage);
            createProducer.send(createTextMessage2);
            createProducer.send(createTextMessage3);
            createProducer.send(createTextMessage4);
            createSession.close();
            XidImpl newXID = newXID();
            this.clientSession.start(newXID, 0);
            this.clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10L));
            this.clientSession.start();
            ClientConsumer createConsumer = this.clientSession.createConsumer(this.atestq);
            ClientMessage receive = createConsumer.receive(1000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            Assertions.assertEquals(receive.getBodyBuffer().readString(), "m1");
            ClientMessage receive2 = createConsumer.receive(1000L);
            Assertions.assertNotNull(receive2);
            receive2.acknowledge();
            Assertions.assertEquals(receive2.getBodyBuffer().readString(), "m2");
            ClientMessage receive3 = createConsumer.receive(1000L);
            Assertions.assertNotNull(receive3);
            receive3.acknowledge();
            Assertions.assertEquals(receive3.getBodyBuffer().readString(), "m3");
            ClientMessage receive4 = createConsumer.receive(1000L);
            Assertions.assertNotNull(receive4);
            receive4.acknowledge();
            Assertions.assertEquals(receive4.getBodyBuffer().readString(), "m4");
            if (z) {
                this.clientSession.end(newXID, 67108864);
                if (z2) {
                    this.clientSession.prepare(newXID);
                }
            }
            Wait.assertEquals(1, () -> {
                return this.messagingService.getSessions().size();
            });
            for (ServerSession serverSession : this.messagingService.getSessions()) {
                serverSession.getRemotingConnection().fail(new ActiveMQException("fail this"));
                serverSession.getRemotingConnection().disconnect(false);
            }
            Wait.assertEquals(0, () -> {
                return this.messagingService.getSessions().size();
            });
            if (z2) {
                ResourceManager resourceManager = this.messagingService.getResourceManager();
                Objects.requireNonNull(resourceManager);
                Wait.assertEquals(1, resourceManager::size);
            } else {
                ResourceManager resourceManager2 = this.messagingService.getResourceManager();
                Objects.requireNonNull(resourceManager2);
                Wait.assertEquals(0, resourceManager2::size);
            }
            this.locator = createInVMNonHALocator();
            this.sessionFactory = createSessionFactory(this.locator);
            this.clientSession = addClientSession(this.sessionFactory.createSession(true, false, false));
            Wait.assertEquals(1, () -> {
                return this.messagingService.getSessions().size();
            });
            this.clientSession.start(newXID(), 0);
            this.clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10L));
            this.clientSession.start();
            ClientConsumer createConsumer2 = this.clientSession.createConsumer(this.atestq);
            HashSet<String> hashSet = new HashSet<>();
            ClientMessage receive5 = createConsumer2.receive(1000L);
            if (z2) {
                Assertions.assertNull(receive5);
                return;
            }
            Assertions.assertNotNull(receive5);
            receive5.acknowledge();
            assertOrTrack(z, receive5, hashSet, "m1");
            ClientMessage receive6 = createConsumer2.receive(1000L);
            Assertions.assertNotNull(receive6);
            receive6.acknowledge();
            assertOrTrack(z, receive6, hashSet, "m2");
            ClientMessage receive7 = createConsumer2.receive(1000L);
            Assertions.assertNotNull(receive7);
            receive7.acknowledge();
            assertOrTrack(z, receive7, hashSet, "m3");
            ClientMessage receive8 = createConsumer2.receive(1000L);
            Assertions.assertNotNull(receive8);
            receive8.acknowledge();
            assertOrTrack(z, receive8, hashSet, "m4");
            if (z) {
                return;
            }
            Assertions.assertEquals(4, hashSet.size(), "got all bodies");
        } catch (Throwable th) {
            createSession.close();
            throw th;
        }
    }

    private void assertOrTrack(boolean z, ClientMessage clientMessage, HashSet<String> hashSet, String str) {
        String readString = clientMessage.getBodyBuffer().readString();
        if (z) {
            Assertions.assertEquals(str, readString);
        } else {
            hashSet.add(readString);
        }
    }
}
