package org.apache.activemq.artemis.tests.integration.client;

import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ConsumerStuckTest.class */
public class ConsumerStuckTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ActiveMQServer server;
    private final SimpleString QUEUE = SimpleString.of("ConsumerTestQueue");

    protected boolean isNetty() {
        return true;
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(false, isNetty());
        this.server.start();
    }

    @Test
    public void testClientStuckTest() throws Exception {
        ClientSessionFactoryImpl createSessionFactory = createNettyNonHALocator().setConnectionTTL(1000L).setClientFailureCheckPeriod(100L).setConsumerWindowSize(10485760).setCallTimeout(1000L).createSessionFactory();
        createSessionFactory.stopPingingAfterOne();
        RemotingConnectionImpl connection = createSessionFactory.getConnection();
        ClientSession createSession = createSessionFactory.createSession(false, true, true, true);
        createSession.createQueue(QueueConfiguration.of(this.QUEUE).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 10000; i++) {
            createProducer.send(createTextMessage(createSession, "m" + i));
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        NettyConnection transportConnection = connection.getTransportConnection();
        Thread thread = new Thread(() -> {
            boolean z = true;
            while (!Thread.interrupted()) {
                try {
                    ClientMessage receive = createConsumer.receive(500L);
                    logger.debug("Received {}", receive);
                    if (z) {
                        z = false;
                        transportConnection.getNettyChannel().config().setAutoRead(false);
                    }
                    if (receive != null) {
                        receive.acknowledge();
                    }
                } catch (Throwable th) {
                    Thread.currentThread().interrupt();
                    th.printStackTrace();
                    return;
                }
            }
        });
        thread.start();
        try {
            Assertions.assertEquals(1, this.server.getSessions().size());
            logger.debug("sessions = {}", Integer.valueOf(this.server.getSessions().size()));
            Assertions.assertEquals(1, this.server.getConnectionCount());
            long currentTimeMillis = System.currentTimeMillis() + 20000;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (currentTimeMillis > System.currentTimeMillis() && (this.server.getSessions().size() != 0 || this.server.getConnectionCount() != 0)) {
                Thread.sleep(10L);
            }
            logger.debug("Time = {} time diff = {}, connections Size = {} sessions = {}", new Object[]{Long.valueOf(System.currentTimeMillis()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), Integer.valueOf(this.server.getConnectionCount()), Integer.valueOf(this.server.getSessions().size())});
            if (this.server.getSessions().size() != 0) {
                System.out.println(threadDump("Thread dump"));
                Assertions.fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
            }
            logger.debug("Size = {}", Integer.valueOf(this.server.getConnectionCount()));
            logger.debug("sessions = {}", Integer.valueOf(this.server.getSessions().size()));
            if (this.server.getSessions().size() != 0) {
                System.out.println(threadDump("Thread dump"));
                Assertions.fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
            }
            Assertions.assertEquals(0, this.server.getConnectionCount());
            transportConnection.getNettyChannel().config().setAutoRead(true);
            thread.interrupt();
            thread.join();
        } catch (Throwable th) {
            transportConnection.getNettyChannel().config().setAutoRead(true);
            thread.interrupt();
            thread.join();
            throw th;
        }
    }

    @Test
    public void testClientStuckTestWithDirectDelivery() throws Exception {
        ClientSessionFactoryImpl createSessionFactory = createNettyNonHALocator().setConnectionTTL(1000L).setClientFailureCheckPeriod(100L).setConsumerWindowSize(10485760).setCallTimeout(1000L).createSessionFactory();
        createSessionFactory.stopPingingAfterOne();
        RemotingConnectionImpl connection = createSessionFactory.getConnection();
        ClientSession createSession = createSessionFactory.createSession(false, true, true, true);
        createSession.createQueue(QueueConfiguration.of(this.QUEUE).setDurable(false));
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        NettyConnection transportConnection = connection.getTransportConnection();
        Thread thread = new Thread(() -> {
            boolean z = true;
            while (!Thread.interrupted()) {
                try {
                    ClientMessage receive = createConsumer.receive(500L);
                    logger.debug("Received {}", receive);
                    if (z) {
                        z = false;
                        transportConnection.getNettyChannel().config().setAutoRead(false);
                    }
                    if (receive != null) {
                        receive.acknowledge();
                    }
                } catch (Throwable th) {
                    Thread.currentThread().interrupt();
                    th.printStackTrace();
                    return;
                }
            }
        });
        thread.start();
        Thread thread2 = new Thread(() -> {
            try {
                ServerLocator createNettyNonHALocator = createNettyNonHALocator();
                try {
                    ClientSessionFactory createSessionFactory2 = createNettyNonHALocator.createSessionFactory();
                    try {
                        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true, true);
                        try {
                            ClientProducer createProducer = createSession2.createProducer(this.QUEUE);
                            for (int i = 0; i < 10000; i++) {
                                try {
                                    createProducer.send(createTextMessage(createSession2, "m" + i));
                                } catch (Throwable th) {
                                    if (createProducer != null) {
                                        try {
                                            createProducer.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            }
                            if (createProducer != null) {
                                createProducer.close();
                            }
                            if (createSession2 != null) {
                                createSession2.close();
                            }
                            if (createSessionFactory2 != null) {
                                createSessionFactory2.close();
                            }
                            if (createNettyNonHALocator != null) {
                                createNettyNonHALocator.close();
                            }
                        } catch (Throwable th3) {
                            if (createSession2 != null) {
                                try {
                                    createSession2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        if (createSessionFactory2 != null) {
                            try {
                                createSessionFactory2.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                        throw th5;
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread2.start();
        try {
            long currentTimeMillis = System.currentTimeMillis() + 20000;
            while (System.currentTimeMillis() < currentTimeMillis && this.server.getSessions().size() != 2) {
                Thread.sleep(10L);
            }
            Assertions.assertEquals(2, this.server.getSessions().size());
            logger.debug("sessions = {}", Integer.valueOf(this.server.getSessions().size()));
            Assertions.assertEquals(2, this.server.getConnectionCount());
            long currentTimeMillis2 = System.currentTimeMillis() + 20000;
            while (System.currentTimeMillis() < currentTimeMillis2 && this.server.getSessions().size() != 1) {
                Thread.sleep(10L);
            }
            logger.debug("Size = {}", Integer.valueOf(this.server.getConnectionCount()));
            logger.debug("sessions = {}", Integer.valueOf(this.server.getSessions().size()));
            if (this.server.getSessions().size() != 1) {
                System.out.println(threadDump("Thread dump"));
                Assertions.fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
            }
            thread2.join();
            long currentTimeMillis3 = System.currentTimeMillis() + 20000;
            while (System.currentTimeMillis() < currentTimeMillis3 && this.server.getConnectionCount() != 0) {
                Thread.sleep(10L);
            }
            Assertions.assertEquals(0, this.server.getConnectionCount());
            transportConnection.getNettyChannel().config().setAutoRead(true);
            thread.interrupt();
            thread.join();
        } catch (Throwable th) {
            transportConnection.getNettyChannel().config().setAutoRead(true);
            thread.interrupt();
            thread.join();
            throw th;
        }
    }
}
