package org.apache.activemq.artemis.tests.integration.persistence;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.io.File;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.class */
public class SyncSendTest extends ActiveMQTestBase {
    private static long totalRecordTime = -1;
    private static final int RECORDS = 300;
    private static final int MEASURE_RECORDS = 100;
    private static final int WRAMP_UP = 100;
    private final String storage;
    private final String protocol;
    ActiveMQServer server;

    @Parameterized.Parameters(name = "storage={0}, protocol={1}")
    public static Collection getParameters() {
        Object[] objArr = {"core", "openwire", "amqp"};
        ArrayList arrayList = new ArrayList();
        for (Object obj : new Object[]{"libaio", "nio", "null"}) {
            if (!obj.equals("libaio") || LibaioContext.isLoaded()) {
                for (Object obj2 : objArr) {
                    arrayList.add(new Object[]{obj, obj2});
                }
            }
        }
        return arrayList;
    }

    public SyncSendTest(String str, String str2) {
        this.storage = str;
        this.protocol = str2;
    }

    protected ConfigurationImpl createBasicConfig(int i) {
        ConfigurationImpl createBasicConfig = super.createBasicConfig(i);
        createBasicConfig.setJournalDatasync(true).setJournalSyncNonTransactional(true).setJournalSyncTransactional(true);
        return createBasicConfig;
    }

    public void setUp() throws Exception {
        super.setUp();
        if (this.storage.equals("null")) {
            this.server = createServer(false, true);
        } else {
            this.server = createServer(true, true);
        }
        if (this.storage.equals("libaio")) {
            this.server.getConfiguration().setJournalType(JournalType.ASYNCIO);
        } else {
            this.server.getConfiguration().setJournalType(JournalType.NIO);
        }
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
        this.server.getConfiguration().setJournalSyncTransactional(true).setJournalSyncNonTransactional(true).setJournalDatasync(true);
        this.server.start();
    }

    private long getTimePerSync() throws Exception {
        if (this.storage.equals("null")) {
            return 0L;
        }
        if (totalRecordTime < 0) {
            File newFile = this.temporaryFolder.newFile();
            System.out.println("File::" + newFile);
            FileChannel channel = new RandomAccessFile(newFile, "rw").getChannel();
            channel.position(0L);
            ByteBuffer allocate = ByteBuffer.allocate(10);
            allocate.put(new byte[10]);
            allocate.position(0);
            Assert.assertEquals(10L, channel.write(allocate));
            channel.force(true);
            long nanoTime = System.nanoTime();
            for (int i = 0; i < 200; i++) {
                if (i == 100) {
                    nanoTime = System.nanoTime();
                }
                channel.position(0L);
                allocate.position(0);
                allocate.putInt(i);
                allocate.position(0);
                Assert.assertEquals(10L, channel.write(allocate));
                channel.force(false);
            }
            totalRecordTime = ((System.nanoTime() - nanoTime) / 100) * 300;
            System.out.println("total time = " + totalRecordTime);
        }
        return totalRecordTime;
    }

    @Test
    public void testSendConsumeAudoACK() throws Exception {
        long timePerSync = getTimePerSync();
        this.server.createQueue(new QueueConfiguration("queue").setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
        Connection createConnection = newCF().createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue("queue");
            MessageProducer createProducer = createSession.createProducer(createQueue);
            long nanoTime = System.nanoTime();
            for (int i = 0; i < 400; i++) {
                if (i == 100) {
                    nanoTime = System.nanoTime();
                }
                createProducer.send(createSession.createMessage());
            }
            long nanoTime2 = System.nanoTime();
            PrintStream printStream = System.out;
            TimeUnit.NANOSECONDS.toMillis(nanoTime2 - nanoTime);
            printStream.println("end - start = " + (nanoTime2 - nanoTime) + " milliseconds = " + printStream);
            PrintStream printStream2 = System.out;
            TimeUnit.NANOSECONDS.toMillis(timePerSync);
            printStream2.println("RECORD TIME = " + timePerSync + " milliseconds = " + printStream2);
            if (nanoTime2 - nanoTime < timePerSync * 0.7d) {
                Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!");
            }
            createConnection.start();
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            for (int i2 = 0; i2 < 400; i2++) {
                if (i2 == 100) {
                    nanoTime = System.nanoTime();
                }
                Assert.assertNotNull(createConsumer.receive(5000L));
            }
            long nanoTime3 = System.nanoTime();
            PrintStream printStream3 = System.out;
            TimeUnit.NANOSECONDS.toMillis(nanoTime3 - nanoTime);
            printStream3.println("end - start = " + (nanoTime3 - nanoTime) + " milliseconds = " + printStream3);
            PrintStream printStream4 = System.out;
            TimeUnit.NANOSECONDS.toMillis(timePerSync);
            printStream4.println("RECORD TIME = " + timePerSync + " milliseconds = " + printStream4);
            if (!this.protocol.equals("amqp") && nanoTime3 - nanoTime < timePerSync * 0.7d) {
                Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!");
            }
            org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(SimpleString.toSimpleString("queue"));
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(0L, locateQueue::getMessageCount);
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(0, locateQueue::getDeliveringCount);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    private ConnectionFactory newCF() {
        if (this.protocol.equals("core")) {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setBlockOnAcknowledge(true);
            return activeMQConnectionFactory;
        }
        if (this.protocol.equals("amqp")) {
            JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
            jmsConnectionFactory.setForceAsyncAcks(true);
            return jmsConnectionFactory;
        }
        org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory2 = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
        activeMQConnectionFactory2.setSendAcksAsync(false);
        return activeMQConnectionFactory2;
    }
}
