package org.apache.activemq.artemis.tests.integration.client;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.class */
public class RedeliveryConsumerTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    ActiveMQServer server;
    final SimpleString ADDRESS = new SimpleString("address");
    ClientSessionFactory factory;
    private ServerLocator locator;

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.RedeliveryConsumerTest$1ConsumerThread, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest$1ConsumerThread.class */
    class C1ConsumerThread extends Thread {
        long delay;
        int errors;
        final /* synthetic */ CountDownLatch val$aligned;
        final /* synthetic */ CountDownLatch val$startRollback;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        C1ConsumerThread(int i, CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            super("RedeliveryCollisionAvoidance::" + i);
            this.val$aligned = countDownLatch;
            this.val$startRollback = countDownLatch2;
            this.delay = 0L;
            this.errors = 0;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ServerLocator createInVMNonHALocator = RedeliveryConsumerTest.this.createInVMNonHALocator();
                try {
                    createInVMNonHALocator.setConsumerWindowSize(0);
                    ClientSession createSession = createInVMNonHALocator.createSessionFactory().createSession(false, false, false);
                    createSession.start();
                    ClientConsumer createConsumer = createSession.createConsumer(RedeliveryConsumerTest.this.ADDRESS);
                    ClientMessage receive = createConsumer.receive(5000L);
                    Assert.assertNotNull(receive);
                    receive.acknowledge();
                    this.val$aligned.countDown();
                    this.val$startRollback.await();
                    createSession.rollback();
                    long currentTimeMillis = System.currentTimeMillis();
                    ClientMessage receive2 = createConsumer.receive(5000L);
                    this.delay = System.currentTimeMillis() - currentTimeMillis;
                    Assert.assertNotNull(receive2);
                    receive2.acknowledge();
                    createSession.commit();
                    if (createInVMNonHALocator != null) {
                        createInVMNonHALocator.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
                this.errors++;
            }
        }
    }

    @Test
    public void testRedeliveryMessageStrict() throws Exception {
        testDedeliveryMessageOnPersistent(true);
    }

    @Test
    public void testRedeliveryMessageSimpleCancel() throws Exception {
        testDedeliveryMessageOnPersistent(false);
    }

    @Test
    public void testDeliveryNonPersistent() throws Exception {
        testDelivery(false);
    }

    @Test
    public void testDeliveryPersistent() throws Exception {
        testDelivery(true);
    }

    public void testDelivery(boolean z) throws Exception {
        setUp(true);
        ClientSession createSession = this.factory.createSession(false, false, false);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createTextMessage(createSession, Integer.toString(i), z));
        }
        createSession.commit();
        createSession.close();
        ClientSession createSession2 = this.factory.createSession((String) null, (String) null, false, true, true, true, 0);
        createSession2.start();
        for (int i2 = 0; i2 < 5; i2++) {
            ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS, (SimpleString) null, true);
            for (int i3 = 0; i3 < 10; i3++) {
                ClientMessage receive = createConsumer.receive(1000L);
                Assert.assertNotNull("element i=" + i3 + " loopAck = " + i2 + " was expected", receive);
                receive.acknowledge();
                Assert.assertEquals(Integer.toString(i3), getTextMessage(receive));
                Assert.assertEquals(0L, receive.getDeliveryCount());
            }
            createSession2.commit();
            createConsumer.close();
        }
        createSession2.close();
        ClientSession createSession3 = this.factory.createSession(false, false, false);
        createSession3.start();
        ClientConsumer createConsumer2 = createSession3.createConsumer(this.ADDRESS);
        for (int i4 = 0; i4 < 5; i4++) {
            for (int i5 = 0; i5 < 10; i5++) {
                ClientMessage receive2 = createConsumer2.receive(1000L);
                Assert.assertNotNull(receive2);
                Assert.assertEquals(Integer.toString(i5), getTextMessage(receive2));
                Assert.assertEquals(1L, receive2.getDeliveryCount());
            }
            createSession3.rollback();
        }
        if (z) {
            createSession3.close();
            this.server.stop();
            this.server.start();
            this.factory = createSessionFactory(this.locator);
            createSession3 = this.factory.createSession(false, false, false);
            createSession3.start();
            createConsumer2 = createSession3.createConsumer(this.ADDRESS);
        }
        for (int i6 = 1; i6 <= 5; i6++) {
            for (int i7 = 0; i7 < 10; i7++) {
                ClientMessage receive3 = createConsumer2.receive(1000L);
                Assert.assertNotNull(receive3);
                receive3.acknowledge();
                Assert.assertEquals(Integer.toString(i7), getTextMessage(receive3));
                Assert.assertEquals(i6, receive3.getDeliveryCount());
            }
            if (i6 < 5) {
                if (z) {
                    createSession3.close();
                    this.server.stop();
                    this.server.start();
                    this.factory = createSessionFactory(this.locator);
                    createSession3 = this.factory.createSession(false, false, false);
                    createSession3.start();
                    createConsumer2 = createSession3.createConsumer(this.ADDRESS);
                } else {
                    createSession3.rollback();
                }
            }
        }
        createSession3.close();
    }

    protected void testDedeliveryMessageOnPersistent(boolean z) throws Exception {
        setUp(z);
        ClientSession createSession = this.factory.createSession(false, false, false);
        logger.debug("created");
        createSession.createProducer(this.ADDRESS).send(createTextMessage(createSession, "Hello"));
        createSession.commit();
        createSession.close();
        ClientSession createSession2 = this.factory.createSession(false, false, false);
        createSession2.start();
        Assert.assertEquals(1L, createSession2.createConsumer(this.ADDRESS).receive(1000L).getDeliveryCount());
        createSession2.stop();
        if (!z) {
            createSession2.rollback(true);
            createSession2.close();
        }
        this.server.stop();
        createSession2.close();
        this.server.start();
        this.factory = createSessionFactory(this.locator);
        ClientSession createSession3 = this.factory.createSession(false, true, false);
        createSession3.start();
        Assert.assertNotNull(createSession3.createConsumer(this.ADDRESS).receive(1000L));
        Assert.assertEquals(z ? 2L : 2L, r0.getDeliveryCount());
        createSession3.close();
    }

    @Test
    public void testInfiniteDedeliveryMessageOnPersistent() throws Exception {
        internaltestInfiniteDedeliveryMessageOnPersistent(false);
    }

    private void internaltestInfiniteDedeliveryMessageOnPersistent(boolean z) throws Exception {
        setUp(z);
        ClientSession createSession = this.factory.createSession(false, false, false);
        logger.debug("created");
        createSession.createProducer(this.ADDRESS).send(createTextMessage(createSession, "Hello"));
        createSession.commit();
        createSession.close();
        int i = 1;
        for (int i2 = 0; i2 < 700; i2++) {
            ClientSession createSession2 = this.factory.createSession(false, false, false);
            createSession2.start();
            ClientMessage receive = createSession2.createConsumer(this.ADDRESS).receive(5000L);
            assertNotNull(receive);
            assertEquals(i, receive.getDeliveryCount());
            if (i2 % 100 == 0) {
                i++;
                receive.acknowledge();
                createSession2.rollback();
            }
            createSession2.close();
        }
        this.factory.close();
        this.server.stop();
        setUp(false);
        for (int i3 = 0; i3 < 700; i3++) {
            ClientSession createSession3 = this.factory.createSession(false, false, false);
            createSession3.start();
            assertNotNull(createSession3.createConsumer(this.ADDRESS).receive(5000L));
            assertEquals(i, r0.getDeliveryCount());
            createSession3.close();
        }
        this.server.stop();
        JournalImpl journalImpl = new JournalImpl(this.server.getConfiguration().getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(this.server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        journalImpl.start();
        journalImpl.load(new LoaderCallback() { // from class: org.apache.activemq.artemis.tests.integration.client.RedeliveryConsumerTest.1
            public void failedTransaction(long j, List<RecordInfo> list, List<RecordInfo> list2) {
            }

            public void updateRecord(RecordInfo recordInfo) {
                if (recordInfo.userRecordType == 34) {
                    atomicInteger.incrementAndGet();
                }
            }

            public void deleteRecord(long j) {
            }

            public void addRecord(RecordInfo recordInfo) {
            }

            public void addPreparedTransaction(PreparedTransactionInfo preparedTransactionInfo) {
            }
        });
        journalImpl.stop();
        assertEquals(7L, atomicInteger.get());
    }

    @Test
    public void testRedeliveryCollisionAvoidance() throws Exception {
        setUp(false);
        ((AddressSettings) this.server.getAddressSettingsRepository().getMatch(this.ADDRESS.toString())).setRedeliveryDelay(1000L).setRedeliveryCollisionAvoidanceFactor(0.5d);
        ClientSession createSession = this.factory.createSession(false, false, false);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createTextMessage(createSession, "Hello" + i));
        }
        createSession.commit();
        createSession.close();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        C1ConsumerThread[] c1ConsumerThreadArr = new C1ConsumerThread[10];
        for (int i2 = 0; i2 < 10; i2++) {
            c1ConsumerThreadArr[i2] = new C1ConsumerThread(i2, countDownLatch, countDownLatch2);
            c1ConsumerThreadArr[i2].start();
        }
        countDownLatch.await();
        countDownLatch2.countDown();
        try {
            for (C1ConsumerThread c1ConsumerThread : c1ConsumerThreadArr) {
                c1ConsumerThread.join(60000L);
                assertFalse(c1ConsumerThread.isAlive());
                assertEquals("There are Errors on the test thread", 0L, c1ConsumerThread.errors);
            }
            long j = 0;
            long j2 = Long.MAX_VALUE;
            for (C1ConsumerThread c1ConsumerThread2 : c1ConsumerThreadArr) {
                if (c1ConsumerThread2.delay < j2) {
                    j2 = c1ConsumerThread2.delay;
                }
                if (c1ConsumerThread2.delay > j) {
                    j = c1ConsumerThread2.delay;
                }
            }
            assertTrue(((double) (j - j2)) > ((double) 1000) * 0.05d);
            this.factory.close();
        } finally {
            for (C1ConsumerThread c1ConsumerThread3 : c1ConsumerThreadArr) {
                if (c1ConsumerThread3.isAlive()) {
                    c1ConsumerThread3.interrupt();
                }
                c1ConsumerThread3.join(1000L);
            }
        }
    }

    private void setUp(boolean z) throws Exception {
        this.server = createServer(true, createDefaultInVMConfig().setPersistDeliveryCountBeforeDelivery(z));
        this.server.start();
        this.locator = createInVMNonHALocator();
        this.factory = createSessionFactory(this.locator);
        ClientSession addClientSession = addClientSession(this.factory.createSession(false, false, false));
        try {
            addClientSession.createQueue(new QueueConfiguration(this.ADDRESS));
        } catch (ActiveMQException e) {
        }
        addClientSession.close();
    }
}
