package org.apache.activemq.artemis.tests.integration.cluster.reattach;

import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/reattach/OrderReattachTest.class */
public class OrderReattachTest extends ActiveMQTestBase {
    final SimpleString ADDRESS = new SimpleString("address");
    private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    private ActiveMQServer server;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.tests.integration.cluster.reattach.OrderReattachTest$1MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/reattach/OrderReattachTest$1MyHandler.class */
    public class C1MyHandler implements MessageHandler {
        final CountDownLatch latch = new CountDownLatch(1);
        int count;
        Exception failure;
        final /* synthetic */ LinkedBlockingDeque val$failureQueue;

        C1MyHandler(LinkedBlockingDeque linkedBlockingDeque) {
            this.val$failureQueue = linkedBlockingDeque;
        }

        public void onMessage(ClientMessage clientMessage) {
            if (this.count >= 500) {
                this.failure = new Exception("too many messages");
                this.latch.countDown();
            }
            if (clientMessage.getIntProperty("count").intValue() != this.count) {
                this.failure = new Exception("counter " + this.count + " was not as expected (" + clientMessage.getIntProperty("count") + ")");
                OrderReattachTest.this.log.warn("Failure on receiving message ", this.failure);
                this.failure.printStackTrace();
                this.latch.countDown();
            }
            this.count++;
            if (this.count % 100 == 0) {
                this.val$failureQueue.push(true);
            }
            if (this.count == 500) {
                this.latch.countDown();
            }
        }
    }

    @Test
    public void testOrderOnSendInVM() throws Throwable {
        doTestOrderOnSend(false);
    }

    public void doTestOrderOnSend(boolean z) throws Throwable {
        this.server = createServer(false, z);
        this.server.start();
        ServerLocator blockOnAcknowledge = createFactory(z).setReconnectAttempts(15).setConfirmationWindowSize(1048576).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
        ClientSessionFactory createSessionFactory = createSessionFactory(blockOnAcknowledge);
        final ClientSession createSession = createSessionFactory.createSession(false, true, true);
        final LinkedBlockingDeque<Boolean> linkedBlockingDeque = new LinkedBlockingDeque<>();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread() { // from class: org.apache.activemq.artemis.tests.integration.cluster.reattach.OrderReattachTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Boolean bool;
                RemotingConnectionImpl connection;
                countDownLatch.countDown();
                while (true) {
                    try {
                        try {
                            bool = (Boolean) linkedBlockingDeque.poll(60L, TimeUnit.SECONDS);
                            Thread.sleep(1L);
                            connection = createSession.getConnection();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            return;
                        }
                    } catch (Exception e2) {
                        e2.printStackTrace();
                    }
                    if (!bool.booleanValue()) {
                        return;
                    } else {
                        connection.fail(new ActiveMQNotConnectedException("poop"));
                    }
                }
            }
        };
        thread.start();
        countDownLatch.await();
        try {
            doSend2(1, createSessionFactory, linkedBlockingDeque);
        } finally {
            try {
                createSession.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                blockOnAcknowledge.close();
            } catch (Exception e2) {
            }
            try {
                createSessionFactory.close();
            } catch (Exception e3) {
                e3.printStackTrace();
            }
            linkedBlockingDeque.put(Boolean.valueOf(false));
            thread.join();
        }
    }

    public void doSend2(int i, ClientSessionFactory clientSessionFactory, LinkedBlockingDeque<Boolean> linkedBlockingDeque) throws Exception {
        ClientSession createSession = clientSessionFactory.createSession(false, false, false);
        HashSet<ClientConsumer> hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            SimpleString simpleString = new SimpleString("sub" + i2);
            ClientSession createSession2 = clientSessionFactory.createSession(false, true, true);
            createSession2.createQueue(this.ADDRESS, simpleString, (SimpleString) null, false);
            hashSet.add(createSession2.createConsumer(simpleString));
            hashSet2.add(createSession2);
        }
        ClientSession createSession3 = clientSessionFactory.createSession(false, true, true);
        ClientProducer createProducer = createSession3.createProducer(this.ADDRESS);
        for (int i3 = 0; i3 < 500; i3++) {
            ClientMessage createMessage = createSession3.createMessage((byte) 3, false, 0L, System.currentTimeMillis(), (byte) 1);
            if (i3 % 10 == 0) {
            }
            createMessage.putIntProperty(new SimpleString("count"), i3);
            createProducer.send(createMessage);
        }
        Iterator it = hashSet2.iterator();
        while (it.hasNext()) {
            ((ClientSession) it.next()).start();
        }
        HashSet<C1MyHandler> hashSet3 = new HashSet();
        for (ClientConsumer clientConsumer : hashSet) {
            C1MyHandler c1MyHandler = new C1MyHandler(linkedBlockingDeque);
            clientConsumer.setMessageHandler(c1MyHandler);
            hashSet3.add(c1MyHandler);
        }
        for (C1MyHandler c1MyHandler2 : hashSet3) {
            Assert.assertTrue(c1MyHandler2.latch.await(60000L, TimeUnit.MILLISECONDS));
            if (c1MyHandler2.failure != null) {
                throw c1MyHandler2.failure;
            }
        }
        createSession3.close();
        Iterator it2 = hashSet2.iterator();
        while (it2.hasNext()) {
            ((ClientSession) it2.next()).close();
        }
        for (int i4 = 0; i4 < 10; i4++) {
            linkedBlockingDeque.push(true);
            createSession.deleteQueue(new SimpleString("sub" + i4));
        }
        createSession.close();
    }
}
