package org.apache.activemq.artemis.tests.integration.cluster.failover;

import java.io.File;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpTransferTagGenerator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncLargeMessageTest.class */
public class BackupSyncLargeMessageTest extends BackupSyncJournalTest {
    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    public void assertMessageBody(int i, ClientMessage clientMessage) {
        assertLargeMessageBody(i, clientMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    public ServerLocatorInternal getServerLocator() throws Exception {
        return super.getServerLocator().setMinLargeMessageSize(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected boolean supportsRetention() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    public void setBody(int i, ClientMessage clientMessage) {
        setLargeMessageBody(i, clientMessage);
    }

    @Test
    public void testDeleteLargeMessages() throws Exception {
        setNumberOfMessages(200);
        File file = new File(this.backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
        Assertions.assertEquals(0, getAllMessageFileIds(file).size(), "Should not have any large messages... previous test failed to clean up?");
        createProducerSendSomeMessages();
        startBackupFinishSyncing();
        receiveMsgsInRange(0, getNumberOfMessages() / 2);
        finishSyncAndFailover();
        int numberOfMessages = getNumberOfMessages() / 2;
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (getAllMessageFileIds(file).size() != numberOfMessages && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(50L);
        }
        Assertions.assertEquals(numberOfMessages, getAllMessageFileIds(file).size(), "we really ought to delete these after delivery");
    }

    @Test
    public void testDeleteLargeMessagesDuringSync() throws Exception {
        setNumberOfMessages(200);
        File file = new File(this.backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
        File file2 = new File(this.primaryServer.getServer().getConfiguration().getLargeMessagesDirectory());
        Assertions.assertEquals(0, getAllMessageFileIds(file).size(), "Should not have any large messages... previous test failed to clean up?");
        createProducerSendSomeMessages();
        this.backupServer.start();
        waitForComponent(this.backupServer.getServer(), 5L);
        receiveMsgsInRange(0, getNumberOfMessages() / 2);
        startBackupFinishSyncing();
        Thread.sleep(500L);
        this.primaryServer.getServer().stop();
        this.backupServer.getServer().waitForActivation(10L, TimeUnit.SECONDS);
        this.backupServer.stop();
        Set<Long> allMessageFileIds = getAllMessageFileIds(file);
        Assertions.assertEquals(getAllMessageFileIds(file2), allMessageFileIds, "primary and backup should have the same files ");
        Assertions.assertEquals(getNumberOfMessages() / 2, allMessageFileIds.size(), "we really ought to delete these after delivery: " + allMessageFileIds);
        Assertions.assertEquals(getNumberOfMessages() / 2, getAllMessageFileIds(file).size(), "we really ought to delete these after delivery");
    }

    @Test
    public void testBackupStartsWhenPrimaryIsReceivingLargeMessage() throws Exception {
        ClientSession addClientSession = addClientSession(this.sessionFactory.createSession(true, true));
        addClientSession.createQueue(QueueConfiguration.of(FailoverTestBase.ADDRESS));
        ClientProducer createProducer = addClientSession.createProducer(FailoverTestBase.ADDRESS);
        ClientMessage createMessage = addClientSession.createMessage(true);
        createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(1024000L));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Executors.defaultThreadFactory().newThread(() -> {
            try {
                try {
                    countDownLatch.countDown();
                    createProducer.send(createMessage);
                    sendMessages(addClientSession, createProducer, 20);
                    addClientSession.commit();
                    countDownLatch2.countDown();
                } catch (ActiveMQException e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                    countDownLatch2.countDown();
                }
            } catch (Throwable th) {
                countDownLatch2.countDown();
                throw th;
            }
        }).start();
        waitForLatch(countDownLatch);
        startBackupFinishSyncing();
        ActiveMQTestBase.waitForLatch(countDownLatch2);
        crash(addClientSession);
        Assertions.assertFalse(atomicBoolean.get(), "no exceptions while sending message");
        addClientSession.start();
        ClientConsumer createConsumer = addClientSession.createConsumer(FailoverTestBase.ADDRESS);
        ActiveMQBuffer bodyBuffer = createConsumer.receive(2000L).getBodyBuffer();
        for (int i = 0; i < 1024000; i++) {
            Assertions.assertTrue(bodyBuffer.readable(), "large msg , expecting 1024000 bytes, got " + i);
            Assertions.assertEquals(ActiveMQTestBase.getSamplebyte(i), bodyBuffer.readByte(), "equal at " + i);
        }
        receiveMessages(createConsumer, 0, 20, true);
        Assertions.assertNull(createConsumer.receiveImmediate(), "there should be no more messages!");
        createConsumer.close();
        addClientSession.commit();
    }

    private Set<Long> getAllMessageFileIds(File file) {
        TreeSet treeSet = new TreeSet();
        String[] list = file.list();
        if (list != null) {
            for (String str : list) {
                if (str.endsWith(".msg")) {
                    treeSet.add(Long.valueOf(str.split("\\.")[0]));
                }
            }
        }
        return treeSet;
    }
}
