package org.apache.activemq.artemis.tests.integration.cluster.failover;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionOutcomeUnknownException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionRolledBackException;
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.class */
public class AsynchronousFailoverTest extends FailoverTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private volatile CountDownSessionFailureListener listener;
    private volatile ClientSessionFactoryInternal sf;
    private final Object lockFail = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest$TestRunner.class */
    public abstract class TestRunner implements Runnable {
        volatile boolean failed;
        ArrayList<Throwable> errors = new ArrayList<>();

        TestRunner() {
        }

        boolean isFailed() {
            return this.failed;
        }

        void setFailed() {
            this.failed = true;
        }

        void reset() {
            this.failed = false;
        }

        synchronized void addException(Throwable th) {
            this.errors.add(th);
        }

        void checkForExceptions() throws Throwable {
            if (this.errors.size() > 0) {
                AsynchronousFailoverTest.logger.warn("Exceptions on test:");
                Iterator<Throwable> it = this.errors.iterator();
                while (it.hasNext()) {
                    Throwable next = it.next();
                    AsynchronousFailoverTest.logger.warn(next.getMessage(), next);
                }
                throw this.errors.get(0);
            }
        }
    }

    @Test
    public void testNonTransactional() throws Throwable {
        runTest(new TestRunner() { // from class: org.apache.activemq.artemis.tests.integration.cluster.failover.AsynchronousFailoverTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AsynchronousFailoverTest.this.doTestNonTransactional(this);
                } catch (Throwable th) {
                    AsynchronousFailoverTest.logger.error("Test failed", th);
                    addException(th);
                }
            }
        });
    }

    @Test
    public void testTransactional() throws Throwable {
        runTest(new TestRunner() { // from class: org.apache.activemq.artemis.tests.integration.cluster.failover.AsynchronousFailoverTest.2
            volatile boolean running = false;

            @Override // java.lang.Runnable
            public void run() {
                try {
                    Assert.assertFalse(this.running);
                    this.running = true;
                    try {
                        AsynchronousFailoverTest.this.doTestTransactional(this);
                        this.running = false;
                    } catch (Throwable th) {
                        this.running = false;
                        throw th;
                    }
                } catch (Throwable th2) {
                    AsynchronousFailoverTest.logger.error("Test failed", th2);
                    addException(th2);
                }
            }
        });
    }

    private void runTest(TestRunner testRunner) throws Throwable {
        for (int i = 0; i < 1; i++) {
            logger.debug("Iteration {}", Integer.valueOf(i));
            ServerLocator callFailoverTimeout = getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(30).setRetryInterval(100L).setConfirmationWindowSize(10485760).setCallTimeout(10000L).setCallFailoverTimeout(10000L);
            this.sf = createSessionFactoryAndWaitForTopology(callFailoverTimeout, 2);
            try {
                ClientSessionInternal createSession = this.sf.createSession(true, true);
                createSession.createQueue(new QueueConfiguration(FailoverTestBase.ADDRESS).setAddress(FailoverTestBase.ADDRESS));
                createSession.getConnection();
                Thread thread = new Thread(testRunner);
                thread.setName("MainTEST");
                thread.start();
                long random = (long) (2000.0d * Math.random());
                logger.debug("Sleeping {}", Long.valueOf(random));
                Thread.sleep(random);
                logger.debug("Failing asynchronously");
                synchronized (this.lockFail) {
                    logger.debug("#test crashing test");
                    crash(createSession);
                }
                testRunner.setFailed();
                logger.debug("Fail complete");
                thread.join(TimeUnit.SECONDS.toMillis(120L));
                if (thread.isAlive()) {
                    System.out.println(threadDump("Thread still running from the test"));
                    thread.interrupt();
                    fail("Test didn't complete successful, thread still running");
                }
                testRunner.checkForExceptions();
                createSession.close();
                Assert.assertEquals(0L, this.sf.numSessions());
                callFailoverTimeout.close();
                callFailoverTimeout.close();
                Assert.assertEquals(0L, this.sf.numConnections());
                if (i != 0) {
                    tearDown();
                    testRunner.checkForExceptions();
                    testRunner.reset();
                    setUp();
                }
            } catch (Throwable th) {
                callFailoverTimeout.close();
                Assert.assertEquals(0L, this.sf.numConnections());
                throw th;
            }
        }
    }

    protected void addPayload(ClientMessage clientMessage) {
    }

    private void doTestNonTransactional(TestRunner testRunner) throws Exception {
        while (!testRunner.isFailed()) {
            logger.debug("looping");
            ClientSession createSession = this.sf.createSession(true, true, 0);
            this.listener = new CountDownSessionFailureListener(createSession);
            createSession.addFailureListener(this.listener);
            ClientProducer createProducer = createSession.createProducer(FailoverTestBase.ADDRESS);
            for (int i = 0; i < 1000; i++) {
                boolean z = false;
                do {
                    try {
                        ClientMessage createMessage = createSession.createMessage(true);
                        createMessage.getBodyBuffer().writeString("message" + i);
                        createMessage.putIntProperty("counter", i);
                        addPayload(createMessage);
                        createProducer.send(createMessage);
                        z = false;
                    } catch (ActiveMQUnBlockedException e) {
                        logger.debug("exception when sending message with counter {}", Integer.valueOf(i));
                        e.printStackTrace();
                        z = true;
                    } catch (ActiveMQException e2) {
                        fail("Invalid Exception type:" + e2.getType());
                    }
                } while (z);
            }
            ClientConsumer clientConsumer = null;
            boolean z2 = false;
            do {
                try {
                    clientConsumer = createSession.createConsumer(FailoverTestBase.ADDRESS);
                    z2 = false;
                } catch (ActiveMQUnBlockedException e3) {
                    logger.debug("exception when creating consumer");
                    z2 = true;
                } catch (ActiveMQException e4) {
                    fail("Invalid Exception type:" + e4.getType());
                }
            } while (z2);
            createSession.start();
            ArrayList arrayList = new ArrayList(1000);
            int i2 = -1;
            boolean z3 = false;
            while (true) {
                ClientMessage receive = clientConsumer.receive(500L);
                if (receive == null) {
                    break;
                }
                int intValue = receive.getIntProperty("counter").intValue();
                arrayList.add(Integer.valueOf(intValue));
                if (intValue != i2 + 1) {
                    if (z3) {
                        Assert.fail("got another counter gap at " + intValue + ": " + arrayList);
                    } else if (i2 != -1) {
                        logger.debug("got first counter gap at {}", Integer.valueOf(intValue));
                        z3 = true;
                    }
                }
                i2 = intValue;
                receive.acknowledge();
            }
            createSession.close();
            this.listener = null;
        }
    }

    private void doTestTransactional(TestRunner testRunner) throws Throwable {
        boolean z;
        int i = 0;
        while (!testRunner.isFailed()) {
            ClientSession clientSession = null;
            i++;
            logger.debug("#test doTestTransactional starting now. Execution {}", Integer.valueOf(i));
            int i2 = 4;
            while (clientSession == null) {
                try {
                    try {
                        clientSession = this.sf.createSession(true, false);
                    } catch (ActiveMQException e) {
                        if (i2 == 0) {
                            throw e;
                        }
                        i2--;
                        Thread.sleep(2000L);
                    }
                } finally {
                    if (clientSession != null) {
                        clientSession.close();
                    }
                }
            }
            this.listener = new CountDownSessionFailureListener(clientSession);
            clientSession.addFailureListener(this.listener);
            while (!testRunner.isFailed()) {
                try {
                    try {
                        try {
                            ClientProducer createProducer = clientSession.createProducer(FailoverTestBase.ADDRESS);
                            for (int i3 = 0; i3 < 1000; i3++) {
                                ClientMessage createMessage = clientSession.createMessage(true);
                                createMessage.getBodyBuffer().writeString("message" + i3);
                                createMessage.putIntProperty("counter", i3);
                                createMessage.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i3 + ",exec:" + i));
                                addPayload(createMessage);
                                logger.debug("Sending message {}", createMessage);
                                createProducer.send(createMessage);
                            }
                            logger.debug("Sending commit");
                            clientSession.commit();
                            z = false;
                        } catch (ActiveMQUnBlockedException e2) {
                            logger.debug("#test transaction rollback retrying on sending");
                            z = true;
                        } catch (ActiveMQConnectionTimedOutException e3) {
                            Thread.sleep(2000L);
                            z = true;
                        }
                    } catch (ActiveMQDuplicateIdException e4) {
                        logAndSystemOut("#test duplicate id rejected on sending");
                    } catch (ActiveMQObjectClosedException e5) {
                        logger.debug("#test producer closed, retrying on sending...");
                        Thread.sleep(2000L);
                        z = true;
                    }
                } catch (ActiveMQTransactionRolledBackException e6) {
                    logger.debug("#test transaction rollback retrying on sending");
                    z = true;
                } catch (ActiveMQTransactionOutcomeUnknownException e7) {
                    logger.debug("#test transaction rollback retrying on sending");
                    z = true;
                } catch (ActiveMQException e8) {
                    logger.debug("#test Exception {}", e8.getMessage(), e8);
                    throw e8;
                }
                if (!z) {
                    logAndSystemOut("#test Finished sending, starting consumption now");
                    boolean z2 = false;
                    boolean z3 = false;
                    ClientConsumer clientConsumer = null;
                    while (!testRunner.isFailed()) {
                        ArrayList arrayList = new ArrayList();
                        int i4 = 4;
                        while (clientConsumer == null) {
                            try {
                                try {
                                    try {
                                        clientConsumer = clientSession.createConsumer(FailoverTestBase.ADDRESS);
                                    } catch (ActiveMQObjectClosedException e9) {
                                        if (i4 == 0) {
                                            throw e9;
                                        }
                                        Thread.sleep(2000L);
                                        i4--;
                                    }
                                } catch (ActiveMQException e10) {
                                    logAndSystemOut(e10.getMessage(), e10);
                                    throw e10;
                                } catch (ActiveMQTransactionRolledBackException e11) {
                                    logAndSystemOut("Transaction rolled back with " + arrayList.size(), e11);
                                    z2 = true;
                                    z3 = true;
                                }
                            } catch (ActiveMQTransactionOutcomeUnknownException e12) {
                                logAndSystemOut("Transaction rolled back with " + arrayList.size(), e12);
                                z2 = true;
                                z3 = true;
                            } catch (ActiveMQUnBlockedException e13) {
                                logAndSystemOut("Unblocked with " + arrayList.size(), e13);
                                z2 = true;
                                z3 = true;
                            }
                        }
                        clientSession.start();
                        for (int i5 = 0; i5 < 1000; i5++) {
                            logger.debug("Consumer receiving message {}", Integer.valueOf(i5));
                            ClientMessage receive = clientConsumer.receive(60000L);
                            if (receive == null) {
                                break;
                            }
                            logger.debug("Received message {}", receive);
                            int intValue = receive.getIntProperty("counter").intValue();
                            if (intValue != i5) {
                                logger.warn("count was received out of order, {}!={}", Integer.valueOf(intValue), Integer.valueOf(i5));
                            }
                            arrayList.add(Integer.valueOf(intValue));
                            receive.acknowledge();
                        }
                        logger.debug("#test commit");
                        try {
                            clientSession.commit();
                            if (z2) {
                                try {
                                    assertTrue("msgs.size is expected to be 0 or 1000 but it was " + arrayList.size(), arrayList.size() == 0 || arrayList.size() == 1000);
                                } catch (Throwable th) {
                                    if (logger.isDebugEnabled()) {
                                        logger.debug(threadDump("Thread dump, messagesReceived = " + arrayList.size()));
                                    }
                                    logAndSystemOut(th.getMessage() + " messages received");
                                    Iterator it = arrayList.iterator();
                                    while (it.hasNext()) {
                                        logAndSystemOut(((Integer) it.next()).toString());
                                    }
                                    throw th;
                                }
                            } else {
                                assertTrue("msgs.size is expected to be 1000 but it was " + arrayList.size(), arrayList.size() == 1000);
                            }
                            int i6 = 0;
                            Iterator it2 = arrayList.iterator();
                            while (it2.hasNext()) {
                                Integer num = (Integer) it2.next();
                                int i7 = i6;
                                i6++;
                                assertEquals(i7, num.intValue());
                            }
                            z3 = false;
                            z2 = false;
                        } catch (ActiveMQTransactionRolledBackException e14) {
                            z3 = true;
                        } catch (ActiveMQException e15) {
                            logger.warn("exception during commit, continue {}", e15.getMessage(), e15);
                        }
                        if (!z3) {
                            this.listener = null;
                        }
                    }
                    if (clientSession != null) {
                        clientSession.close();
                        return;
                    }
                    return;
                }
            }
            if (clientSession != null) {
                clientSession.close();
                return;
            }
            return;
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMAcceptor(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMConnector(z);
    }
}
