package org.apache.activemq.artemis.tests.integration.remoting;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.class */
public class ReconnectTest extends ActiveMQTestBase {
    @Test
    public void testReconnectNetty() throws Exception {
        internalTestReconnect(true);
    }

    @Test
    public void testReconnectInVM() throws Exception {
        internalTestReconnect(false);
    }

    public void internalTestReconnect(boolean z) throws Exception {
        ActiveMQServer createServer = createServer(false, z);
        createServer.start();
        ClientSessionInternal clientSessionInternal = null;
        try {
            ServerLocator confirmationWindowSize = createFactory(z).setClientFailureCheckPeriod(1000L).setRetryInterval(500L).setRetryIntervalMultiplier(1.0d).setReconnectAttempts(-1).setConfirmationWindowSize(1048576);
            clientSessionInternal = (ClientSessionInternal) createSessionFactory(confirmationWindowSize).createSession();
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            clientSessionInternal.addFailureListener(new SessionFailureListener() { // from class: org.apache.activemq.artemis.tests.integration.remoting.ReconnectTest.1
                public void connectionFailed(ActiveMQException activeMQException, boolean z2) {
                    atomicInteger.incrementAndGet();
                    countDownLatch.countDown();
                }

                public void connectionFailed(ActiveMQException activeMQException, boolean z2, String str) {
                    connectionFailed(activeMQException, z2);
                }

                public void beforeReconnect(ActiveMQException activeMQException) {
                }
            });
            createServer.stop();
            Thread.sleep(2000L);
            createServer.start();
            Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
            Thread.sleep(500L);
            Assertions.assertEquals(1, atomicInteger.get());
            confirmationWindowSize.close();
            try {
                clientSessionInternal.close();
            } catch (Throwable th) {
            }
            createServer.stop();
        } catch (Throwable th2) {
            try {
                clientSessionInternal.close();
            } catch (Throwable th3) {
            }
            createServer.stop();
            throw th2;
        }
    }

    @Test
    public void testMetadataAfterReconnectionNetty() throws Exception {
        internalMetadataAfterRetry(true);
    }

    @Test
    public void testMetadataAfterReconnectionInVM() throws Exception {
        internalMetadataAfterRetry(false);
    }

    public void internalMetadataAfterRetry(boolean z) throws Exception {
        ActiveMQServer createServer = createServer(false, z);
        createServer.start();
        ClientSessionInternal clientSessionInternal = null;
        for (int i = 0; i < 100; i++) {
            try {
                ServerLocator createFactory = createFactory(z);
                createFactory.setClientFailureCheckPeriod(1000L);
                createFactory.setRetryInterval(1L);
                createFactory.setRetryIntervalMultiplier(1.0d);
                createFactory.setReconnectAttempts(-1);
                createFactory.setConfirmationWindowSize(-1);
                clientSessionInternal = (ClientSessionInternal) createSessionFactory(createFactory).createSession();
                clientSessionInternal.addMetaData("meta1", "meta1");
                ServerSession[] countMetadata = countMetadata(createServer, "meta1", 1);
                Assertions.assertEquals(1, countMetadata.length);
                new AtomicInteger(0);
                CountDownLatch countDownLatch = new CountDownLatch(1);
                clientSessionInternal.addFailoverListener(failoverEventType -> {
                    if (failoverEventType == FailoverEventType.FAILOVER_COMPLETED) {
                        countDownLatch.countDown();
                    }
                });
                countMetadata[0].getRemotingConnection().fail(new ActiveMQException("failure!"));
                Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
                Assertions.assertEquals(1, countMetadata(createServer, "meta1", 1).length);
                createFactory.close();
            } finally {
                try {
                    clientSessionInternal.close();
                } catch (Throwable th) {
                }
                createServer.stop();
            }
        }
    }

    private ServerSession[] countMetadata(ActiveMQServer activeMQServer, String str, int i) throws Exception {
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < 10 && linkedList.size() != i; i2++) {
            linkedList.clear();
            for (ServerSession serverSession : activeMQServer.getSessions()) {
                if (serverSession.getMetaData(str) != null) {
                    linkedList.add(serverSession);
                }
            }
            if (linkedList.size() != i) {
                Thread.sleep(100L);
            }
        }
        return (ServerSession[]) linkedList.toArray(new ServerSession[linkedList.size()]);
    }

    @Test
    public void testInterruptReconnectNetty() throws Exception {
        internalTestInterruptReconnect(true, false);
    }

    @Test
    public void testInterruptReconnectInVM() throws Exception {
        internalTestInterruptReconnect(false, false);
    }

    @Test
    public void testInterruptReconnectNettyInterruptMainThread() throws Exception {
        internalTestInterruptReconnect(true, true);
    }

    @Test
    public void testInterruptReconnectInVMInterruptMainThread() throws Exception {
        internalTestInterruptReconnect(false, true);
    }

    public void internalTestInterruptReconnect(boolean z, boolean z2) throws Exception {
        ActiveMQServer createServer = createServer(false, z);
        createServer.start();
        ServerLocator confirmationWindowSize = createFactory(z).setClientFailureCheckPeriod(1000L).setRetryInterval(500L).setRetryIntervalMultiplier(1.0d).setReconnectAttempts(-1).setConfirmationWindowSize(1048576);
        ClientSessionFactoryInternal createSessionFactory = confirmationWindowSize.createSessionFactory();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final ArrayList arrayList = new ArrayList();
        createSessionFactory.addFailureListener(new SessionFailureListener() { // from class: org.apache.activemq.artemis.tests.integration.remoting.ReconnectTest.2
            public void connectionFailed(ActiveMQException activeMQException, boolean z3) {
            }

            public void connectionFailed(ActiveMQException activeMQException, boolean z3, String str) {
                connectionFailed(activeMQException, z3);
            }

            public void beforeReconnect(ActiveMQException activeMQException) {
                arrayList.add(Thread.currentThread());
                countDownLatch.countDown();
            }
        });
        ClientSessionInternal createSession = createSessionFactory.createSession();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createSession.addFailureListener(new SessionFailureListener() { // from class: org.apache.activemq.artemis.tests.integration.remoting.ReconnectTest.3
            public void connectionFailed(ActiveMQException activeMQException, boolean z3) {
                atomicInteger.incrementAndGet();
                countDownLatch2.countDown();
            }

            public void connectionFailed(ActiveMQException activeMQException, boolean z3, String str) {
                connectionFailed(activeMQException, z3);
            }

            public void beforeReconnect(ActiveMQException activeMQException) {
            }
        });
        createServer.stop();
        Thread thread = new Thread(() -> {
            countDownLatch.countDown();
            try {
                createSession.commit();
            } catch (ActiveMQException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        Assertions.assertEquals(1, arrayList.size());
        if (z2) {
            thread.interrupt();
        } else {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Thread) it.next()).interrupt();
            }
        }
        thread.join(5000L);
        Assertions.assertFalse(thread.isAlive());
        confirmationWindowSize.close();
    }

    @Test
    public void testReattachTimeout() throws Exception {
        ActiveMQServer createServer = createServer(true, true);
        createServer.start();
        createServer.getRemotingService().addIncomingInterceptor(new Interceptor() { // from class: org.apache.activemq.artemis.tests.integration.remoting.ReconnectTest.4
            boolean reattached;

            public boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
                if (this.reattached || packet.getType() != 32) {
                    return true;
                }
                this.reattached = true;
                return false;
            }
        });
        ClientSessionFactoryInternal createSessionFactory = createSessionFactory(createFactory(true).setCallTimeout(2000L).setRetryInterval(50L).setRetryIntervalMultiplier(1.0d).setReconnectAttempts(1).setConfirmationWindowSize(-1));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createSessionFactory.addFailoverListener(failoverEventType -> {
            if (failoverEventType == FailoverEventType.FAILOVER_FAILED) {
                countDownLatch.countDown();
            }
        });
        ClientSessionInternal createSession = createSessionFactory.createSession(false, true, true);
        createSession.getConnection().fail(new ActiveMQNotConnectedException());
        Assertions.assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
        Assertions.assertTrue(createSession.isClosed());
        createSession.close();
        createSessionFactory.close();
        createServer.stop();
    }

    @Test
    public void testClosingConsumerTimeout() throws Exception {
        ActiveMQServer createServer = createServer(true, true);
        createServer.start();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        createServer.getRemotingService().addIncomingInterceptor((packet, remotingConnection) -> {
            if (atomicBoolean.get() || packet.getType() != 74) {
                return true;
            }
            atomicBoolean.set(true);
            return false;
        });
        ClientSessionFactoryInternal createSessionFactory = createSessionFactory(createFactory(true).setCallTimeout(200L).setRetryInterval(500L).setRetryIntervalMultiplier(1.0d).setReconnectAttempts(10).setConfirmationWindowSize(-1));
        ClientSessionInternal createSession = createSessionFactory.createSession(false, true, true);
        SimpleString of = SimpleString.of("my_queue_one");
        SimpleString of2 = SimpleString.of("my_address_one");
        createServer.addAddressInfo(new AddressInfo(of2, RoutingType.ANYCAST));
        createServer.createQueue(QueueConfiguration.of(of).setAddress(of2).setRoutingType(RoutingType.ANYCAST));
        ClientConsumer createConsumer = createSession.createConsumer(of);
        ClientConsumer createConsumer2 = createSession.createConsumer(of);
        createConsumer.close();
        Objects.requireNonNull(atomicBoolean);
        Wait.assertTrue(atomicBoolean::get);
        Wait.assertEquals(1, () -> {
            return getConsumerCount(createServer, createSession);
        });
        Assertions.assertEquals(createConsumer2.getConsumerContext().getId(), ((ServerConsumer) createServer.getSessionByID(createSession.getName()).getServerConsumers().iterator().next()).getID());
        createSession.close();
        createSessionFactory.close();
        createServer.stop();
    }

    private int getConsumerCount(ActiveMQServer activeMQServer, ClientSessionInternal clientSessionInternal) {
        ServerSession sessionByID = activeMQServer.getSessionByID(clientSessionInternal.getName());
        if (sessionByID == null) {
            return 0;
        }
        return sessionByID.getServerConsumers().size();
    }
}
