package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.transport.amqp.client.AmqpTransferTagGenerator;
import org.apache.activemq.transport.netty.NettyTransportOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.class */
public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
    Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
    private static final String SLOW_SERVER_NAME = "slow";
    private static final int SLOW_SERVER_PORT = 5673;
    private ActiveMQServer slowServer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest$StorageCallback.class */
    public interface StorageCallback {
        void storage(boolean z, boolean z2, long j, long j2, byte b, Persister persister, Object obj);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,OPENWIRE,CORE";
    }

    @Test
    public void testPersistedSendAMQP() throws Exception {
        testPersistedSend("AMQP", false, 100);
    }

    @Test
    public void testPersistedSendAMQPLarge() throws Exception {
        testPersistedSend("AMQP", false, 204800);
    }

    @Test
    public void testPersistedSendCore() throws Exception {
        testPersistedSend("CORE", false, 100);
    }

    @Test
    public void testPersistedSendCoreLarge() throws Exception {
        testPersistedSend("CORE", false, 204800);
    }

    @Test
    public void testPersistedSendAMQPTXLarge() throws Exception {
        testPersistedSend("AMQP", true, 204800);
    }

    @Test
    public void testPersistedSendAMQPTX() throws Exception {
        testPersistedSend("AMQP", true, 100);
    }

    @Test
    public void testPersistedSendCoreTX() throws Exception {
        testPersistedSend("CORE", true, 100);
    }

    @Test
    public void testPersistedSendCoreTXLarge() throws Exception {
        testPersistedSend("CORE", true, 204800);
    }

    private void testPersistedSend(String str, boolean z, int i) throws Exception {
        ReusableLatch reusableLatch = new ReusableLatch(0);
        Semaphore semaphore = new Semaphore(1);
        Semaphore semaphore2 = new Semaphore(1);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            this.slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, SLOW_SERVER_NAME, (z2, z3, j, j2, b, persister, obj) -> {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("StorageCallback::slow isUpdate={}, isTX={}, txID={}, id={},recordType={}, record={}", new Object[]{Boolean.valueOf(z2), Boolean.valueOf(z3), Long.valueOf(j), Long.valueOf(j2), Byte.valueOf(b), obj});
                }
                if (z && z3) {
                    try {
                        if (atomicInteger2.get() > 0) {
                            atomicInteger2.incrementAndGet();
                            this.logger.debug("semSend.tryAcquire");
                            if (semaphore.tryAcquire(20L, TimeUnit.SECONDS)) {
                                this.logger.debug("acquired TX, now release");
                                semaphore.release();
                            }
                        }
                    } catch (Exception e) {
                        this.logger.warn(e.getMessage(), e);
                    }
                }
                if (b == 33) {
                    this.logger.debug("slow ACK REF");
                    try {
                        if (semaphore2.tryAcquire(20L, TimeUnit.SECONDS)) {
                            semaphore2.release();
                            this.logger.debug("slow acquired ACK semaphore");
                        } else {
                            this.logger.debug("Semaphore wasn't acquired");
                        }
                    } catch (Exception e2) {
                        this.logger.warn(e2.getMessage(), e2);
                    }
                }
                if (b == 45 || b == 30) {
                    try {
                        atomicInteger2.incrementAndGet();
                        if (!z) {
                            this.logger.debug("semSend.tryAcquire");
                            if (semaphore.tryAcquire(20L, TimeUnit.SECONDS)) {
                                this.logger.debug("acquired non TX now release");
                                semaphore.release();
                            }
                        }
                    } catch (Exception e3) {
                        this.logger.warn(e3.getMessage(), e3);
                        atomicInteger.incrementAndGet();
                    }
                }
            });
            this.slowServer.setIdentity("slowServer");
            this.server.setIdentity("server");
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            Objects.requireNonNull(newFixedThreadPool);
            runAfter(newFixedThreadPool::shutdown);
            configureMirrorTowardsSlow(this.server);
            this.slowServer.getConfiguration().setName(SLOW_SERVER_NAME);
            this.server.getConfiguration().setName("fast");
            this.slowServer.start();
            this.server.start();
            waitForServerToStart(this.slowServer);
            waitForServerToStart(this.server);
            this.server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
            this.server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
            Wait.waitFor(() -> {
                return this.slowServer.locateQueue(getQueueName()) != null;
            });
            Queue locateQueue = this.slowServer.locateQueue(getQueueName());
            ActiveMQConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
            if (createConnectionFactory instanceof ActiveMQConnectionFactory) {
                createConnectionFactory.getServerLocator().setBlockOnAcknowledge(true);
            }
            Connection createConnection = createConnectionFactory.createConnection();
            Objects.requireNonNull(createConnection);
            runAfter(createConnection::close);
            Session createSession = createConnection.createSession(z, z ? 0 : 2);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
            createConnection.start();
            createProducer.setDeliveryMode(2);
            StringBuffer stringBuffer = new StringBuffer();
            for (int i2 = 0; i2 < i; i2++) {
                stringBuffer.append("large Buffer...");
            }
            String stringBuffer2 = stringBuffer.toString();
            for (int i3 = 0; i3 < 10; i3++) {
                this.logger.debug("===>>> send message {}", Integer.valueOf(i3));
                int i4 = i3;
                reusableLatch.countUp();
                this.logger.debug("semSend.acquire");
                semaphore.acquire();
                if (z) {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    newFixedThreadPool.execute(() -> {
                        try {
                            TextMessage createTextMessage = createSession.createTextMessage(stringBuffer2);
                            createTextMessage.setStringProperty("strProperty", i4);
                            createProducer.send(createTextMessage);
                        } catch (Throwable th) {
                            atomicInteger.incrementAndGet();
                            this.logger.warn(th.getMessage(), th);
                        }
                        countDownLatch.countDown();
                    });
                    Objects.requireNonNull(locateQueue);
                    Wait.assertEquals(i3, locateQueue::getMessageCount);
                    Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                    newFixedThreadPool.execute(() -> {
                        try {
                            createSession.commit();
                            reusableLatch.countDown();
                        } catch (Throwable th) {
                            this.logger.warn(th.getMessage(), th);
                        }
                    });
                } else {
                    newFixedThreadPool.execute(() -> {
                        try {
                            this.logger.debug("Entering non TX send with sendPending = {}", Integer.valueOf(reusableLatch.getCount()));
                            TextMessage createTextMessage = createSession.createTextMessage(stringBuffer2);
                            createTextMessage.setStringProperty("strProperty", i4);
                            createProducer.send(createTextMessage);
                            reusableLatch.countDown();
                            this.logger.debug("leaving non TX send with sendPending = {}", Integer.valueOf(reusableLatch.getCount()));
                        } catch (Throwable th) {
                            this.logger.warn(th.getMessage(), th);
                            atomicInteger.incrementAndGet();
                        }
                    });
                }
                Assertions.assertFalse(reusableLatch.await(10L, TimeUnit.MILLISECONDS), "sendPending.await() not supposed to succeed");
                this.logger.debug("semSend.release");
                semaphore.release();
                Assertions.assertTrue(reusableLatch.await(10L, TimeUnit.SECONDS));
                Objects.requireNonNull(locateQueue);
                Wait.assertEquals(i3 + 1, locateQueue::getMessageCount);
            }
            if (!z) {
                Objects.requireNonNull(atomicInteger2);
                Wait.assertEquals(10, atomicInteger2::get);
            }
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(10L, locateQueue::getMessageCount);
            createConnection.start();
            Session createSession2 = z ? createConnection.createSession(true, 1) : createConnection.createSession(false, 2);
            MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
            for (int i5 = 0; i5 < 10; i5++) {
                this.logger.debug("===<<< Receiving message {}", Integer.valueOf(i5));
                Message receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                semaphore2.acquire();
                CountDownLatch countDownLatch2 = new CountDownLatch(1);
                newFixedThreadPool.execute(() -> {
                    try {
                        try {
                            if (z) {
                                createSession2.commit();
                            } else {
                                receive.acknowledge();
                            }
                            countDownLatch2.countDown();
                        } catch (Exception e) {
                            this.logger.warn(e.getMessage(), e);
                            atomicInteger.incrementAndGet();
                            countDownLatch2.countDown();
                        }
                    } catch (Throwable th) {
                        countDownLatch2.countDown();
                        throw th;
                    }
                });
                if (z || !str.equals("AMQP")) {
                    Assertions.assertFalse(countDownLatch2.await(10L, TimeUnit.MILLISECONDS));
                } else {
                    this.logger.debug("non transactional and amqp is always asynchronous. No need to verify anything");
                }
                semaphore2.release();
                Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS));
                Objects.requireNonNull(locateQueue);
                Wait.assertEquals((10 - i5) - 1, locateQueue::getMessageCount);
            }
            Assertions.assertEquals(0, atomicInteger.get());
            semaphore2.release();
            semaphore.release();
        } catch (Throwable th) {
            semaphore2.release();
            semaphore.release();
            throw th;
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        ActiveMQServer createServerWithCallbackStorage = createServerWithCallbackStorage(NettyTransportOptions.DEFAULT_TCP_PORT, "fastServer", (z, z2, j, j2, b, persister, obj) -> {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("StorageCallback::fast isUpdate={}, isTX={}, txID={}, id={},recordType={}, record={}", new Object[]{Boolean.valueOf(z), Boolean.valueOf(z2), Long.valueOf(j), Long.valueOf(j2), Byte.valueOf(b), obj});
            }
        });
        addServer(createServerWithCallbackStorage);
        return createServerWithCallbackStorage;
    }

    private void configureMirrorTowardsSlow(ActiveMQServer activeMQServer) {
        AMQPBrokerConnectConfiguration retryInterval = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:5673").setReconnectAttempts(-1).setRetryInterval(100);
        retryInterval.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true));
        activeMQServer.getConfiguration().addAMQPConnection(retryInterval);
    }

    private ActiveMQServer createServerWithCallbackStorage(int i, String str, final StorageCallback storageCallback) throws Exception {
        ActiveMQServerImpl activeMQServerImpl = new ActiveMQServerImpl(createBasicConfig(i), this.mBeanServer, new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())) { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPSyncMirrorTest.1
            protected StorageManager createStorageManager() {
                return AMQPSyncMirrorTest.this.createCallbackStorageManager(getConfiguration(), getCriticalAnalyzer(), this.executorFactory, this.scheduledPool, this.ioExecutorFactory, this.ioCriticalErrorListener, storageCallback);
            }
        };
        activeMQServerImpl.getConfiguration().setName(str);
        activeMQServerImpl.getConfiguration().getAcceptorConfigurations().clear();
        activeMQServerImpl.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(this.slowServer, i));
        activeMQServerImpl.getConfiguration().setMessageExpiryScanPeriod(-1L);
        activeMQServerImpl.getConfiguration().setJMXManagementEnabled(true);
        configureAddressPolicy(activeMQServerImpl);
        configureBrokerSecurity(activeMQServerImpl);
        addServer(activeMQServerImpl);
        return activeMQServerImpl;
    }

    private StorageManager createCallbackStorageManager(Configuration configuration, CriticalAnalyzer criticalAnalyzer, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory2, IOCriticalErrorListener iOCriticalErrorListener, final StorageCallback storageCallback) {
        return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledExecutorService, executorFactory2, iOCriticalErrorListener) { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPSyncMirrorTest.2
            protected Journal createMessageJournal(Configuration configuration2, IOCriticalErrorListener iOCriticalErrorListener2, int i) {
                return new JournalImpl(this.ioExecutorFactory, i, configuration2.getJournalMinFiles(), configuration2.getJournalPoolFiles(), configuration2.getJournalCompactMinFiles(), configuration2.getJournalCompactPercentage(), configuration2.getJournalFileOpenTimeout(), this.journalFF, "activemq-data", "amq", this.journalFF.getMaxIO(), 0, iOCriticalErrorListener2, configuration2.getJournalMaxAtticFiles()) { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPSyncMirrorTest.2.1
                    public void appendAddRecordTransactional(long j, long j2, byte b, Persister persister, Object obj) throws Exception {
                        storageCallback.storage(false, false, j, j2, b, persister, obj);
                        super.appendAddRecordTransactional(j, j2, b, persister, obj);
                    }

                    public void appendAddRecord(long j, byte b, Persister persister, Object obj, boolean z, IOCompletion iOCompletion) throws Exception {
                        storageCallback.storage(false, false, -1L, j, b, persister, obj);
                        super.appendAddRecord(j, b, persister, obj, z, iOCompletion);
                    }

                    public void appendUpdateRecord(long j, byte b, EncodingSupport encodingSupport, boolean z) throws Exception {
                        storageCallback.storage(true, false, -1L, j, b, null, encodingSupport);
                        super.appendUpdateRecord(j, b, encodingSupport, z);
                    }

                    public void appendUpdateRecordTransactional(long j, long j2, byte b, EncodingSupport encodingSupport) throws Exception {
                        storageCallback.storage(true, false, j, j2, b, null, encodingSupport);
                        super.appendUpdateRecordTransactional(j, j2, b, encodingSupport);
                    }

                    public void appendCommitRecord(long j, boolean z, IOCompletion iOCompletion, boolean z2) throws Exception {
                        storageCallback.storage(false, true, j, j, (byte) 0, null, null);
                        super.appendCommitRecord(j, z, iOCompletion, z2);
                    }

                    public void tryAppendUpdateRecord(long j, byte b, Persister persister, Object obj, boolean z, boolean z2, JournalUpdateCallback journalUpdateCallback, IOCompletion iOCompletion) throws Exception {
                        storageCallback.storage(true, false, -1L, -1L, b, persister, obj);
                        super.tryAppendUpdateRecord(j, b, persister, obj, z, z2, journalUpdateCallback, iOCompletion);
                    }
                };
            }
        };
    }

    @Test
    public void testSimpleACK_TX_AMQP() throws Exception {
        testSimpleAckSync("AMQP", true, false, AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE);
    }

    @Test
    public void testSimpleACK_TX_CORE() throws Exception {
        testSimpleAckSync("CORE", true, false, AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE);
    }

    @Test
    public void testSimpleACK_NoTX_AMQP() throws Exception {
        testSimpleAckSync("AMQP", false, false, AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE);
    }

    @Test
    public void testSimpleACK_NoTX_CORE() throws Exception {
        testSimpleAckSync("CORE", false, false, AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE);
    }

    @Test
    public void testSimpleACK_NoTX_CORE_Large() throws Exception {
        testSimpleAckSync("CORE", false, false, 261120);
    }

    @Test
    public void testSimpleACK_TX_CORE_Large() throws Exception {
        testSimpleAckSync("CORE", true, false, 261120);
    }

    @Test
    public void testSimple_Core_Individual_Large() throws Exception {
        testSimpleAckSync("CORE", false, true, 261120);
    }

    @Test
    public void testSimple_Core_Individual() throws Exception {
        testSimpleAckSync("CORE", false, true, AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE);
    }

    public void testSimpleAckSync(String str, boolean z, boolean z2, int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, SLOW_SERVER_NAME, (z3, z4, j, j2, b, persister, obj) -> {
        });
        this.slowServer.setIdentity("slowServer");
        this.server.setIdentity("server");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        Objects.requireNonNull(newFixedThreadPool);
        runAfter(newFixedThreadPool::shutdown);
        configureMirrorTowardsSlow(this.server);
        this.slowServer.getConfiguration().setName(SLOW_SERVER_NAME);
        this.server.getConfiguration().setName("fast");
        this.slowServer.start();
        this.server.start();
        waitForServerToStart(this.slowServer);
        waitForServerToStart(this.server);
        Wait.waitFor(() -> {
            return this.server.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror") != null;
        }, 5000L);
        Queue locateQueue = this.server.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror");
        Assertions.assertNotNull(locateQueue);
        Assertions.assertEquals(AddressFullMessagePolicy.BLOCK, locateQueue.getPagingStore().getAddressFullMessagePolicy());
        this.server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
        this.server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
        Wait.waitFor(() -> {
            return this.slowServer.locateQueue(getQueueName()) != null;
        });
        Queue locateQueue2 = this.slowServer.locateQueue(getQueueName());
        ActiveMQConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        if (createConnectionFactory instanceof ActiveMQConnectionFactory) {
            createConnectionFactory.getServerLocator().setBlockOnAcknowledge(true);
        }
        Connection createConnection = createConnectionFactory.createConnection();
        Objects.requireNonNull(createConnection);
        runAfter(createConnection::close);
        Session createSession = createConnection.createSession(false, 2);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
        createConnection.start();
        createProducer.setDeliveryMode(2);
        StringBuffer stringBuffer = new StringBuffer();
        for (int i2 = 0; i2 < i; i2++) {
            stringBuffer.append("large Buffer...");
        }
        String stringBuffer2 = stringBuffer.toString();
        for (int i3 = 0; i3 < 10; i3++) {
            TextMessage createTextMessage = createSession.createTextMessage(stringBuffer2);
            createTextMessage.setStringProperty("strProperty", i3);
            createProducer.send(createTextMessage);
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(i3 + 1, locateQueue2::getMessageCount, 5000L);
        }
        Objects.requireNonNull(locateQueue2);
        Wait.assertEquals(10L, locateQueue2::getMessageCount);
        createConnection.start();
        Session createSession2 = createConnection.createSession(z, z ? 0 : z2 ? 101 : 2);
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(getQueueName()));
        for (int i4 = 0; i4 < 10; i4++) {
            Message receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            if (z) {
                createSession2.commit();
            }
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals((10 - i4) - 1, locateQueue2::getMessageCount, 5000L);
        }
        Assertions.assertEquals(0, atomicInteger.get());
    }
}
