package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNonDestructiveTest.class */
public class JMSNonDestructiveTest extends MultiprotocolJMSClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE";
    private static final String NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME = "NON_DESTRUCTIVE_EXPIRY_QUEUE";
    private static final String NON_DESTRUCTIVE_LVQ_QUEUE_NAME = "NON_DESTRUCTIVE_LVQ_QUEUE";
    private static final String NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME = "NON_DESTRUCTIVE_LVQ_TOMBSTONE_QUEUE";
    protected final boolean persistenceEnabled;
    protected final long scanPeriod;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNonDestructiveTest$Producer.class */
    public class Producer implements Runnable {
        private final MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier;
        private final int messageCount;
        private final int groupCount;
        private final int offset;
        public boolean failed = false;

        Producer(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, int i, int i2, int i3) {
            this.connectionSupplier = connectionSupplier;
            this.messageCount = i;
            this.groupCount = i2;
            this.offset = i3;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Connection createConnection = this.connectionSupplier.createConnection();
                try {
                    Session createSession = createConnection.createSession(false, 2);
                    MessageProducer createProducer = createSession.createProducer(createSession.createQueue(JMSNonDestructiveTest.NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
                    int i = this.offset * this.messageCount * this.groupCount;
                    int i2 = this.messageCount * this.groupCount;
                    for (int i3 = i; i3 < i2 + i; i3++) {
                        String str = (i3 % this.groupCount);
                        TextMessage createTextMessage = createSession.createTextMessage();
                        createTextMessage.setText(i3);
                        createTextMessage.setStringProperty("data", i3);
                        createTextMessage.setStringProperty("lastval", str);
                        createTextMessage.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), str);
                        createProducer.send(createTextMessage);
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } catch (JMSException e) {
                e.printStackTrace();
                this.failed = true;
            }
        }
    }

    public JMSNonDestructiveTest(boolean z, long j) {
        this.persistenceEnabled = z;
        this.scanPeriod = j;
    }

    @Parameters(name = "persistenceEnabled={0}, scanPeriod={1}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{false, 100}, new Object[]{true, 100}, new Object[]{true, -1});
    }

    @Override // org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport
    protected void addConfiguration(ActiveMQServer activeMQServer) {
        activeMQServer.getConfiguration().setPersistenceEnabled(this.persistenceEnabled);
        activeMQServer.getConfiguration().setMessageExpiryScanPeriod(this.scanPeriod);
        activeMQServer.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
        activeMQServer.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));
        activeMQServer.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));
        activeMQServer.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport
    public void createAddressAndQueues(ActiveMQServer activeMQServer) throws Exception {
        super.createAddressAndQueues(activeMQServer);
        activeMQServer.addAddressInfo(new AddressInfo(SimpleString.of(NON_DESTRUCTIVE_QUEUE_NAME), RoutingType.ANYCAST));
        activeMQServer.createQueue(QueueConfiguration.of(NON_DESTRUCTIVE_QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
        activeMQServer.addAddressInfo(new AddressInfo(SimpleString.of(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME), RoutingType.ANYCAST));
        activeMQServer.createQueue(QueueConfiguration.of(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
        activeMQServer.addAddressInfo(new AddressInfo(SimpleString.of(NON_DESTRUCTIVE_LVQ_QUEUE_NAME), RoutingType.ANYCAST));
        activeMQServer.createQueue(QueueConfiguration.of(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
        activeMQServer.addAddressInfo(new AddressInfo(SimpleString.of(NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME), RoutingType.ANYCAST));
        activeMQServer.createQueue(QueueConfiguration.of(NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
    }

    @TestTemplate
    public void testNonDestructiveAMQPProducerAMQPConsumer() throws Exception {
        testNonDestructive(this.AMQPConnection, this.AMQPConnection);
    }

    @TestTemplate
    public void testNonDestructiveCoreProducerCoreConsumer() throws Exception {
        testNonDestructive(this.CoreConnection, this.CoreConnection);
    }

    @TestTemplate
    public void testNonDestructiveCoreProducerAMQPConsumer() throws Exception {
        testNonDestructive(this.CoreConnection, this.AMQPConnection);
    }

    @TestTemplate
    public void testNonDestructiveAMQPProducerCoreConsumer() throws Exception {
        testNonDestructive(this.AMQPConnection, this.CoreConnection);
    }

    @TestTemplate
    public void testNonDestructiveLVQWithConsumerFirstCore() throws Exception {
        testNonDestructiveLVQWithConsumerFirst(this.CoreConnection);
    }

    @TestTemplate
    public void testNonDestructiveLVQWithConsumerFirstAMQP() throws Exception {
        testNonDestructiveLVQWithConsumerFirst(this.AMQPConnection);
    }

    public void testNonDestructive(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        testNonDestructiveSingle(connectionSupplier, connectionSupplier2);
        testNonDestructiveDualConsumer(connectionSupplier, connectionSupplier2);
        testNonDestructiveExpiry(connectionSupplier, connectionSupplier2);
        testNonDestructiveMulitpleMessages(connectionSupplier, connectionSupplier2);
        testNonDestructiveMulitpleMessagesDualConsumer(connectionSupplier, connectionSupplier2);
        testNonDestructiveLVQ(connectionSupplier, connectionSupplier2);
        testNonDestructiveLVQTombstone(connectionSupplier, connectionSupplier2);
    }

    public void testNonDestructiveSingle(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME);
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_QUEUE_NAME));
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receive(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receive(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    public void testNonDestructiveDualConsumer(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME);
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_QUEUE_NAME));
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receiveDualConsumer(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receiveDualConsumer(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    public void testNonDestructiveExpiry(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME);
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME));
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receive(connectionSupplier2, NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME);
        Queue queue = binding.getQueue();
        Objects.requireNonNull(queue);
        Wait.assertEquals(1L, queue::getMessageCount);
        Wait.waitFor(() -> {
            return binding.getQueue().getMessageCount() == 0;
        }, 200L);
        receiveNull(connectionSupplier2, NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME);
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Ensure Message count");
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_EXPIRY_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    public void testNonDestructiveMulitpleMessages(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME, 0);
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME, 1);
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME, 2);
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_QUEUE_NAME));
        Assertions.assertEquals(3L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receive(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME, 3);
        receive(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME, 3);
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    public void testNonDestructiveMulitpleMessagesDualConsumer(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME, 0);
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME, 1);
        sendMessage(connectionSupplier, NON_DESTRUCTIVE_QUEUE_NAME, 2);
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_QUEUE_NAME));
        Assertions.assertEquals(3L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receiveDualConsumer(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME, 3);
        receiveDualConsumer(connectionSupplier2, NON_DESTRUCTIVE_QUEUE_NAME, 3);
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    public void testNonDestructiveLVQ(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        sendLVQ(connectionSupplier, NON_DESTRUCTIVE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        Thread.sleep(10L);
        receiveLVQ(connectionSupplier2, NON_DESTRUCTIVE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receiveLVQ(connectionSupplier2, NON_DESTRUCTIVE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        sendLVQ(connectionSupplier, NON_DESTRUCTIVE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        Thread.sleep(10L);
        receiveLVQ(connectionSupplier2, NON_DESTRUCTIVE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_LVQ_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    public void testNonDestructiveLVQWithConsumerFirst(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        newFixedThreadPool.submit(() -> {
            Connection createConnection;
            Session createSession;
            MessageConsumer createConsumer;
            try {
                createConnection = connectionSupplier.createConnection();
                try {
                    createSession = createConnection.createSession(false, 1);
                    try {
                        createConsumer = createSession.createConsumer(createSession.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
                    } catch (Throwable th) {
                        if (createSession != null) {
                            try {
                                createSession.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Exception e) {
                Assertions.fail(e.getMessage());
            }
            try {
                createConnection.start();
                countDownLatch.countDown();
                Assertions.assertNotNull(createConsumer.receive(5000L));
                countDownLatch2.countDown();
                if (createConsumer != null) {
                    createConsumer.close();
                }
                if (createSession != null) {
                    createSession.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                countDownLatch2.countDown();
            } catch (Throwable th3) {
                if (createConsumer != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        });
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            try {
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
                BytesMessage createBytesMessage = createSession.createBytesMessage();
                createBytesMessage.writeUTF("mills " + System.currentTimeMillis());
                createBytesMessage.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME");
                createProducer.send(createBytesMessage);
                countDownLatch2.await(5L, TimeUnit.SECONDS);
                BytesMessage createBytesMessage2 = createSession.createBytesMessage();
                createBytesMessage2.writeUTF("mills " + System.currentTimeMillis());
                createBytesMessage2.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME");
                createProducer.send(createBytesMessage2);
                if (createSession != null) {
                    createSession.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                createConnection = connectionSupplier.createConnection();
                try {
                    createSession = createConnection.createSession(false, 1);
                    try {
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
                        try {
                            createConnection.start();
                            Assertions.assertNotNull(createConsumer.receive(5000L));
                            if (createConsumer != null) {
                                createConsumer.close();
                            }
                            if (createSession != null) {
                                createSession.close();
                            }
                            if (createConnection != null) {
                                createConnection.close();
                            }
                            newFixedThreadPool.shutdownNow();
                        } catch (Throwable th) {
                            if (createConsumer != null) {
                                try {
                                    createConsumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } finally {
                        if (createSession != null) {
                            try {
                                createSession.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    public void testNonDestructiveLVQTombstone(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier2) throws Exception {
        LastValueQueue queue = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME)).getQueue();
        sendLVQ(connectionSupplier, NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        Assertions.assertEquals(1L, queue.getMessageCount(), "Ensure Message count");
        Thread.sleep(10L);
        receiveLVQ(connectionSupplier2, NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        sendLVQTombstone(connectionSupplier, NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString(), 500);
        Assertions.assertEquals(1L, queue.getMessageCount(), "Ensure Message count");
        Thread.sleep(10L);
        receiveLVQTombstone(connectionSupplier2, NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        receiveLVQTombstone(connectionSupplier2, NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, Message.HDR_LAST_VALUE_NAME.toString());
        Assertions.assertEquals(1, queue.getLastValueKeys().size(), "Ensure Message count");
        Thread.sleep(500 * 3);
        receiveLVQAssertEmpty(connectionSupplier2, NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME);
        Assertions.assertEquals(0L, queue.getMessageCount(), "Ensure Message count");
        Assertions.assertEquals(0, queue.getLastValueKeys().size(), "Ensure Message count");
    }

    @TestTemplate
    public void testMessageCount() throws Exception {
        sendMessage(this.CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
        QueueBinding binding = this.server.getPostOffice().getBinding(SimpleString.of(NON_DESTRUCTIVE_QUEUE_NAME));
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receive(this.CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(1L, binding.getQueue().getMessageCount(), "Ensure Message count");
        sendMessage(this.CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(2L, binding.getQueue().getMessageCount(), "Ensure Message count");
        receive(this.CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
        Assertions.assertEquals(2L, binding.getQueue().getMessageCount(), "Ensure Message count");
        ((QueueControl) this.server.getManagementService().getResource("queue.NON_DESTRUCTIVE_QUEUE")).removeAllMessages();
        Assertions.assertEquals(0L, binding.getQueue().getMessageCount(), "Message count after clearing queue via queue control should be 0");
    }

    private void receive(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, int i) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
            for (int i2 = 0; i2 < i; i2++) {
                TextMessage receive = createConsumer.receive(200L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals(Integer.toString(i2), receive.getText());
            }
            Assertions.assertNull(createConsumer.receiveNoWait());
            createConsumer.close();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void receive(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
            Assertions.assertNotNull(createConsumer.receive(2000L));
            createConsumer.close();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void receiveNull(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
            Assertions.assertNull(createConsumer.receiveNoWait());
            createConsumer.close();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void receiveDualConsumer(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Connection createConnection2 = connectionSupplier.createConnection();
            try {
                MessageConsumer createConsumer = createConsumer(createConnection, str);
                MessageConsumer createConsumer2 = createConsumer(createConnection2, str);
                TextMessage receive = createConsumer.receive(2000L);
                TextMessage receive2 = createConsumer2.receive(2000L);
                Assertions.assertNotNull(receive);
                Assertions.assertNotNull(receive2);
                createConsumer.close();
                createConsumer2.close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void receiveDualConsumer(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, int i) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Connection createConnection2 = connectionSupplier.createConnection();
            try {
                MessageConsumer createConsumer = createConsumer(createConnection, str);
                MessageConsumer createConsumer2 = createConsumer(createConnection2, str);
                for (int i2 = 0; i2 < i; i2++) {
                    TextMessage receive = createConsumer.receive(200L);
                    TextMessage receive2 = createConsumer2.receive(200L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertNotNull(receive2);
                    Assertions.assertEquals(Integer.toString(i2), receive.getText());
                    Assertions.assertEquals(Integer.toString(i2), receive2.getText());
                }
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(createConsumer2.receiveNoWait());
                createConsumer.close();
                createConsumer2.close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private MessageConsumer createConsumer(Connection connection, String str) throws JMSException {
        connection.start();
        Session createSession = connection.createSession(false, 1);
        return createSession.createConsumer(createSession.createQueue(str));
    }

    private void sendMessage(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str) throws JMSException {
        sendMessage(connectionSupplier, str, 0);
    }

    private void sendMessage(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, int i) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession();
            try {
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
                try {
                    TextMessage createTextMessage = createSession.createTextMessage();
                    createTextMessage.setText(Integer.toString(i));
                    createProducer.send(createTextMessage);
                    if (createProducer != null) {
                        createProducer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createProducer != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void receiveLVQ(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, String str2) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession();
            try {
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
                try {
                    TextMessage receive = createConsumer.receive(1000L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertEquals("KEY", receive.getStringProperty(str2));
                    Assertions.assertEquals("how are you", receive.getText());
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConsumer != null) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void sendLVQ(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, String str2) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession();
            try {
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
                try {
                    TextMessage createTextMessage = createSession.createTextMessage();
                    createTextMessage.setStringProperty(str2, "KEY");
                    createTextMessage.setText("hello");
                    createProducer.send(createTextMessage);
                    TextMessage createTextMessage2 = createSession.createTextMessage();
                    createTextMessage2.setStringProperty(str2, "KEY");
                    createTextMessage2.setText("how are you");
                    createProducer.send(createTextMessage2);
                    if (createProducer != null) {
                        createProducer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createProducer != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void receiveLVQTombstone(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, String str2) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession();
            try {
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
                try {
                    TextMessage receive = createConsumer.receive(1000L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertEquals("KEY", receive.getStringProperty(str2));
                    Assertions.assertEquals("tombstone", receive.getText());
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConsumer != null) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void receiveLVQAssertEmpty(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession();
            try {
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
                try {
                    Assertions.assertNull(createConsumer.receiveNoWait());
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConsumer != null) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void sendLVQTombstone(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier, String str, String str2, int i) throws JMSException {
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession();
            try {
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
                try {
                    TextMessage createTextMessage = createSession.createTextMessage();
                    createTextMessage.setStringProperty(str2, "KEY");
                    createTextMessage.setText("tombstone");
                    createProducer.send(createTextMessage, 2, 4, i);
                    if (createProducer != null) {
                        createProducer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createProducer != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @TestTemplate
    public void testMultipleLastValuesCore() throws Exception {
        testMultipleLastValues(this.CoreConnection);
    }

    @TestTemplate
    public void testMultipleLastValuesAMQP() throws Exception {
        testMultipleLastValues(this.AMQPConnection);
    }

    private void testMultipleLastValues(MultiprotocolJMSClientTestSupport.ConnectionSupplier connectionSupplier) throws Exception {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            hashMap.put(i, new ArrayList());
        }
        HashMap hashMap2 = new HashMap();
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        Connection createConnection = connectionSupplier.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 2);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
            createConnection.start();
            for (int i3 = 0; i3 < 5; i3++) {
                arrayList.add(new Producer(connectionSupplier, 25, 5, i3));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                new Thread((Producer) it.next()).start();
            }
            while (true) {
                TextMessage receive = createConsumer.receive(500L);
                if (receive == null) {
                    break;
                }
                i2++;
                ((List) hashMap.get(receive.getStringProperty("lastval"))).add(receive.getText());
                receive.acknowledge();
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                Assertions.assertFalse(((Producer) it2.next()).failed, "Producer failed!");
            }
            if (createConnection != null) {
                createConnection.close();
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                StringBuilder sb = new StringBuilder();
                for (String str : (List) entry.getValue()) {
                    int frequency = Collections.frequency((Collection) entry.getValue(), str);
                    if (frequency > 1 && !hashMap2.containsValue(Integer.valueOf(Integer.parseInt(str)))) {
                        hashMap2.put(str, Integer.valueOf(frequency));
                    }
                    sb.append(str);
                    sb.append(",");
                }
                logger.info("Messages received with lastval={} ({})", entry.getKey(), sb);
            }
            if (hashMap2.size() > 0) {
                StringBuffer stringBuffer = new StringBuffer();
                for (Map.Entry entry2 : hashMap2.entrySet()) {
                    stringBuffer.append(((String) entry2.getKey()) + "(" + entry2.getValue() + "),");
                }
                Assertions.fail("Duplicate messages received " + stringBuffer);
            }
            Assertions.assertEquals(625, i2, "Got all messages produced");
            Wait.assertEquals(5L, () -> {
                return this.server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount();
            }, 2000L, 100L, false);
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
