package org.apache.activemq.artemis.tests.integration.openwire;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.class */
public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override // org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        this.realStore = true;
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    public void configureAddressSettings(Map<String, AddressSettings> map) {
        super.configureAddressSettings(map);
        map.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(SimpleString.of("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
        map.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(SimpleString.of("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    public void extraServerConfig(Configuration configuration) {
    }

    @Timeout(60)
    @Test
    public void testConsumerSingleMessageLoopExclusive() throws Exception {
        doTestConsumerSingleMessageLoop(true);
    }

    @Timeout(60)
    @Test
    public void testConsumerSingleMessageLoopNonExclusive() throws Exception {
        doTestConsumerSingleMessageLoop(false);
    }

    public void doTestConsumerSingleMessageLoop(boolean z) throws Exception {
        Connection connection = null;
        this.server.createQueue(QueueConfiguration.of(SimpleString.of("exampleQueue")).setRoutingType(RoutingType.ANYCAST).setExclusive(Boolean.valueOf(z)));
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            ActiveMQQueue activeMQQueue = new ActiveMQQueue("exampleQueue");
            connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(activeMQQueue);
            TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
            for (int i = 0; i < 20; i++) {
                createProducer.send(createTextMessage);
            }
            for (int i2 = 0; i2 < 20; i2++) {
                MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals("This is a text message", receive.getText());
                createConsumer.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testExclusiveConsumerOrderOnReconnectionLargePrefetch() throws Exception {
        ActiveMQConnection activeMQConnection = null;
        this.server.createQueue(QueueConfiguration.of(SimpleString.of("exampleQueueTwo")).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
            activeMQPrefetchPolicy.setAll(2000);
            activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
            redeliveryPolicy.setMaximumRedeliveries(4000);
            activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
            ActiveMQQueue activeMQQueue = new ActiveMQQueue("exampleQueueTwo");
            activeMQConnection = activeMQConnectionFactory.createConnection();
            activeMQConnection.start();
            Session createSession = activeMQConnection.createSession(true, 1);
            MessageProducer createProducer = createSession.createProducer(activeMQQueue);
            TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
            for (int i = 0; i < 10000; i++) {
                createTextMessage.setIntProperty("SEQ", i);
                createProducer.send(createTextMessage);
            }
            createSession.commit();
            activeMQConnection.close();
            for (int i2 = 0; i2 < 10000; i2 += 200) {
                activeMQConnection = activeMQConnectionFactory.createConnection();
                activeMQConnection.start();
                Session createSession2 = activeMQConnection.createSession(true, 0);
                MessageConsumer createConsumer = createSession2.createConsumer(activeMQQueue);
                for (int i3 = 0; i3 < 200; i3++) {
                    TextMessage receive = createConsumer.receive(5000L);
                    Assertions.assertNotNull(receive, "null @ i=" + i2);
                    Assertions.assertEquals(i2 + i3, receive.getIntProperty("SEQ"));
                    Assertions.assertEquals("This is a text message", receive.getText());
                }
                createSession2.commit();
                ((FailoverTransport) activeMQConnection.getTransport().narrow(FailoverTransport.class)).stop();
                activeMQConnection.close();
            }
        } finally {
            if (activeMQConnection != null) {
                activeMQConnection.close();
            }
        }
    }

    @Timeout(60)
    @Test
    public void testServerSideRollbackOnCloseOrder() throws Exception {
        ArrayList arrayList = new ArrayList();
        SimpleString of = SimpleString.of("exampleQueueTwo");
        this.server.createQueue(QueueConfiguration.of(of).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(of.toString());
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setConnectResponseTimeout(10000);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(2000);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setRedeliveryDelay(0L);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
        int i = 1000;
        for (int i2 = 0; i2 < 1000; i2++) {
            createTextMessage.setIntProperty("SEQ", i2);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        createConnection.close();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        underLoad(2, () -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicInteger atomicInteger2 = new AtomicInteger();
            ExecutorService newCachedThreadPool2 = Executors.newCachedThreadPool();
            Runnable runnable = () -> {
                ActiveMQConnection activeMQConnection = null;
                while (!atomicBoolean.get() && atomicInteger.get() < 20 * i) {
                    try {
                        try {
                            try {
                                ActiveMQConnection createConnection2 = activeMQConnectionFactory.createConnection();
                                try {
                                    activeMQConnection = createConnection2;
                                    createConnection2.setCloseTimeout(1);
                                    createConnection2.start();
                                    ActiveMQMessageConsumer createConsumer = createConnection2.createSession(true, 0).createConsumer(activeMQQueue);
                                    TextMessage textMessage = null;
                                    int i3 = 0;
                                    while (i3 < 100) {
                                        textMessage = createConsumer.receive(2000L);
                                        if (textMessage == null) {
                                            break;
                                        }
                                        atomicInteger.incrementAndGet();
                                        int intProperty = textMessage.getIntProperty("SEQ");
                                        int i4 = intProperty / 100;
                                        if (i3 == 0 && atomicInteger2.get() != i4) {
                                            if (atomicInteger2.get() + 1 != i4) {
                                                atomicBoolean.set(true);
                                                throw new AssertionError("@:" + atomicInteger.get() + ", batch out of sequence, expected: " + (atomicInteger2.get() + 1) + ", but have: " + i4 + " @" + intProperty + ", Message: " + textMessage);
                                                break;
                                            } else {
                                                atomicInteger2.incrementAndGet();
                                                logger.info("@:" + atomicInteger.get() + ", current batch increment to: " + atomicInteger2.get() + ", Received Seq: " + intProperty + ", Message: " + textMessage);
                                            }
                                        }
                                        Assertions.assertEquals((100 * atomicInteger2.get()) + i3, intProperty, "@:" + atomicInteger.get() + " batch out of order");
                                        i3++;
                                    }
                                    if (i3 != 100) {
                                        if (createConnection2 != null) {
                                            createConnection2.close();
                                        }
                                        if (activeMQConnection != null) {
                                            try {
                                                activeMQConnection.close();
                                            } catch (Throwable th) {
                                            }
                                        }
                                    } else {
                                        Transport transport = createConnection2.getTransport();
                                        TransactionInfo transactionInfo = new TransactionInfo(createConnection2.getConnectionInfo().getConnectionId(), new LocalTransactionId(createConnection2.getConnectionInfo().getConnectionId(), atomicInteger.get()), (byte) 0);
                                        transport.request(transactionInfo);
                                        MessageAck messageAck = new MessageAck();
                                        ActiveMQMessage activeMQMessage = (ActiveMQMessage) textMessage;
                                        messageAck.setDestination(activeMQMessage.getDestination());
                                        messageAck.setMessageID(activeMQMessage.getMessageId());
                                        messageAck.setMessageCount(100);
                                        messageAck.setTransactionId(transactionInfo.getTransactionId());
                                        messageAck.setConsumerId(createConsumer.getConsumerId());
                                        transport.request(messageAck);
                                        try {
                                            ((TcpTransport) createConnection2.getTransport().narrow(TcpTransport.class)).stop();
                                        } catch (Throwable th2) {
                                        }
                                        if (createConnection2 != null) {
                                            createConnection2.close();
                                        }
                                        if (activeMQConnection != null) {
                                            try {
                                                activeMQConnection.close();
                                            } catch (Throwable th3) {
                                            }
                                        }
                                    }
                                } catch (Throwable th4) {
                                    if (createConnection2 != null) {
                                        try {
                                            createConnection2.close();
                                        } catch (Throwable th5) {
                                            th4.addSuppressed(th5);
                                        }
                                    }
                                    throw th4;
                                    break;
                                }
                            } catch (Throwable th6) {
                                if (activeMQConnection != null) {
                                    try {
                                        activeMQConnection.close();
                                    } catch (Throwable th7) {
                                    }
                                }
                                throw th6;
                            }
                        } catch (NullPointerException | ConcurrentModificationException e) {
                            if (activeMQConnection != null) {
                                try {
                                    activeMQConnection.close();
                                } catch (Throwable th8) {
                                }
                            }
                        }
                    } catch (JMSException e2) {
                        if (activeMQConnection != null) {
                            try {
                                activeMQConnection.close();
                            } catch (Throwable th9) {
                            }
                        }
                    } catch (Throwable th10) {
                        th10.printStackTrace();
                        arrayList.add(th10);
                        atomicBoolean.set(true);
                        if (activeMQConnection != null) {
                            try {
                                activeMQConnection.close();
                            } catch (Throwable th11) {
                            }
                        }
                    }
                }
            };
            for (int i3 = 0; i3 < 2; i3++) {
                newCachedThreadPool.submit(runnable);
            }
            newCachedThreadPool.shutdown();
            try {
                try {
                    Assertions.assertTrue(newCachedThreadPool.awaitTermination(30L, TimeUnit.SECONDS));
                    Assertions.assertTrue(arrayList.isEmpty());
                    atomicBoolean.set(true);
                    newCachedThreadPool2.shutdownNow();
                    newCachedThreadPool.shutdownNow();
                } catch (Throwable th) {
                    arrayList.add(th);
                    atomicBoolean.set(true);
                    newCachedThreadPool2.shutdownNow();
                    newCachedThreadPool.shutdownNow();
                }
                Assertions.assertTrue(arrayList.isEmpty(), "errors: " + arrayList);
            } catch (Throwable th2) {
                atomicBoolean.set(true);
                newCachedThreadPool2.shutdownNow();
                newCachedThreadPool.shutdownNow();
                throw th2;
            }
        });
        Assertions.assertTrue(arrayList.isEmpty());
    }

    @Timeout(60)
    @Test
    public void testExclusiveConsumerBatchOrderUnderLoad() throws Exception {
        ArrayList arrayList = new ArrayList();
        SimpleString of = SimpleString.of("exampleQueueTwo");
        this.server.createQueue(QueueConfiguration.of(of).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(of.toString());
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setConnectResponseTimeout(10000);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(2000);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setRedeliveryDelay(0L);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
        for (int i = 0; i < 10000; i++) {
            createTextMessage.setIntProperty("SEQ", i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        createConnection.close();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        underLoad(4, () -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicInteger atomicInteger2 = new AtomicInteger();
            ExecutorService newCachedThreadPool2 = Executors.newCachedThreadPool();
            Runnable runnable = () -> {
                TextMessage receive;
                ActiveMQConnection activeMQConnection = null;
                while (!atomicBoolean.get() && this.server.locateQueue(of).getMessageCount() > 0) {
                    try {
                        try {
                            try {
                                ActiveMQConnection createConnection2 = activeMQConnectionFactory.createConnection();
                                try {
                                    activeMQConnection = createConnection2;
                                    createConnection2.setCloseTimeout(1);
                                    createConnection2.start();
                                    Session createSession2 = createConnection2.createSession(true, 0);
                                    MessageConsumer createConsumer = createSession2.createConsumer(activeMQQueue);
                                    int i2 = 0;
                                    while (i2 < 200 && (receive = createConsumer.receive(2000L)) != null) {
                                        atomicInteger.incrementAndGet();
                                        int intProperty = receive.getIntProperty("SEQ");
                                        int i3 = intProperty / 200;
                                        if (i2 == 0 && atomicInteger2.get() != i3) {
                                            if (atomicInteger2.get() + 1 != i3) {
                                                atomicBoolean.set(true);
                                                throw new AssertionError("@:" + atomicInteger.get() + ", batch out of sequence, expected: " + (atomicInteger2.get() + 1) + ", but have: " + i3 + " @" + intProperty + ", Message: " + receive);
                                                break;
                                            } else {
                                                atomicInteger2.incrementAndGet();
                                                logger.info("@:" + atomicInteger.get() + ", current batch increment to: " + atomicInteger2.get() + ", Received Seq: " + intProperty + ", Message: " + receive);
                                            }
                                        }
                                        Assertions.assertEquals((200 * atomicInteger2.get()) + i2, intProperty, "@:" + atomicInteger.get() + " batch out of order");
                                        i2++;
                                    }
                                    if (i2 != 200) {
                                        if (createConnection2 != null) {
                                            createConnection2.close();
                                        }
                                        if (activeMQConnection != null) {
                                            try {
                                                activeMQConnection.close();
                                            } catch (Throwable th) {
                                            }
                                        }
                                    } else {
                                        CountDownLatch countDownLatch = new CountDownLatch(1);
                                        newCachedThreadPool2.submit(() -> {
                                            try {
                                                countDownLatch.countDown();
                                                createSession2.commit();
                                            } catch (Throwable th2) {
                                            }
                                        });
                                        countDownLatch.await(1L, TimeUnit.SECONDS);
                                        TimeUnit.MILLISECONDS.sleep(15L);
                                        try {
                                            ((TcpTransport) createConnection2.getTransport().narrow(TcpTransport.class)).stop();
                                        } catch (Throwable th2) {
                                        }
                                        if (createConnection2 != null) {
                                            createConnection2.close();
                                        }
                                        if (activeMQConnection != null) {
                                            try {
                                                activeMQConnection.close();
                                            } catch (Throwable th3) {
                                            }
                                        }
                                    }
                                } catch (Throwable th4) {
                                    if (createConnection2 != null) {
                                        try {
                                            createConnection2.close();
                                        } catch (Throwable th5) {
                                            th4.addSuppressed(th5);
                                        }
                                    }
                                    throw th4;
                                    break;
                                }
                            } catch (Throwable th6) {
                                if (activeMQConnection != null) {
                                    try {
                                        activeMQConnection.close();
                                    } catch (Throwable th7) {
                                    }
                                }
                                throw th6;
                            }
                        } catch (InterruptedException | NullPointerException | ConcurrentModificationException e) {
                            if (activeMQConnection != null) {
                                try {
                                    activeMQConnection.close();
                                } catch (Throwable th8) {
                                }
                            }
                        }
                    } catch (JMSException e2) {
                        if (activeMQConnection != null) {
                            try {
                                activeMQConnection.close();
                            } catch (Throwable th9) {
                            }
                        }
                    } catch (Throwable th10) {
                        th10.printStackTrace();
                        arrayList.add(th10);
                        atomicBoolean.set(true);
                        if (activeMQConnection != null) {
                            try {
                                activeMQConnection.close();
                            } catch (Throwable th11) {
                            }
                        }
                    }
                }
            };
            for (int i2 = 0; i2 < 4; i2++) {
                newCachedThreadPool.submit(runnable);
            }
            newCachedThreadPool.shutdown();
            try {
                try {
                    Wait.assertEquals(0L, () -> {
                        if (arrayList.isEmpty()) {
                            return this.server.locateQueue(of).getMessageCount();
                        }
                        return -1L;
                    }, AmqpConnection.DEFAULT_CLOSE_TIMEOUT);
                    atomicBoolean.set(true);
                    newCachedThreadPool2.shutdownNow();
                    newCachedThreadPool.shutdownNow();
                } catch (Throwable th) {
                    arrayList.add(th);
                    atomicBoolean.set(true);
                    newCachedThreadPool2.shutdownNow();
                    newCachedThreadPool.shutdownNow();
                }
                Assertions.assertTrue(arrayList.isEmpty());
            } catch (Throwable th2) {
                atomicBoolean.set(true);
                newCachedThreadPool2.shutdownNow();
                newCachedThreadPool.shutdownNow();
                throw th2;
            }
        });
        Assertions.assertTrue(arrayList.isEmpty());
    }

    public void underLoad(int i, Runnable runnable) throws Exception {
        org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory activeMQConnectionFactory;
        Queue activeMQQueue;
        SimpleString of = SimpleString.of("exampleQueue");
        this.server.createQueue(QueueConfiguration.of(of).setRoutingType(RoutingType.ANYCAST));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i + 1);
        if (1 != 0) {
            org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory activeMQConnectionFactory2 = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
            activeMQConnectionFactory2.setConfirmationWindowSize(1000000);
            activeMQConnectionFactory2.setBlockOnDurableSend(true);
            activeMQConnectionFactory2.setBlockOnNonDurableSend(true);
            activeMQConnectionFactory = activeMQConnectionFactory2;
            activeMQQueue = activeMQConnectionFactory2.createContext().createQueue(of.toString());
        } else {
            org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory activeMQConnectionFactory3 = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?startupMaxReconnectAttempts=0&maxReconnectAttempts=0&timeout=1000");
            activeMQConnectionFactory3.setWatchTopicAdvisories(false);
            activeMQConnectionFactory3.setCloseTimeout(1);
            activeMQConnectionFactory3.setSendTimeout(2000);
            activeMQConnectionFactory = activeMQConnectionFactory3;
            activeMQQueue = new ActiveMQQueue(of.toString());
        }
        Queue queue = activeMQQueue;
        org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory activeMQConnectionFactory4 = activeMQConnectionFactory;
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Runnable runnable2 = () -> {
            try {
                Connection createConnection = activeMQConnectionFactory4.createConnection();
                try {
                    createConnection.start();
                    Session createSession = createConnection.createSession(true, 0);
                    MessageProducer createProducer = createSession.createProducer(queue);
                    TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
                    int i2 = 1;
                    while (!atomicBoolean.get()) {
                        createProducer.send(createTextMessage);
                        int i3 = i2;
                        i2++;
                        if (i3 % 100 == 0) {
                            createSession.commit();
                        }
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } catch (Exception e) {
            }
        };
        for (int i2 = 0; i2 < i; i2++) {
            newFixedThreadPool.submit(runnable2);
        }
        newFixedThreadPool.submit(() -> {
            try {
                Connection createConnection = activeMQConnectionFactory4.createConnection();
                try {
                    createConnection.start();
                    MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(queue);
                    while (!atomicBoolean.get()) {
                        createConsumer.receive(200L);
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } catch (Exception e) {
            }
        });
        try {
            runnable.run();
            atomicBoolean.set(true);
            newFixedThreadPool.shutdown();
            if (!newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
            }
            logger.info("LOAD ADDED: " + this.server.locateQueue(of).getMessagesAdded());
        } catch (Throwable th) {
            atomicBoolean.set(true);
            newFixedThreadPool.shutdown();
            if (!newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
            }
            logger.info("LOAD ADDED: " + this.server.locateQueue(of).getMessagesAdded());
            throw th;
        }
    }

    @Timeout(120)
    @Test
    public void testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws Exception {
        doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch();
    }

    public void doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws Exception {
        ActiveMQConnection activeMQConnection = null;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        SimpleString of = SimpleString.of("exampleQueueTwo");
        this.server.createQueue(QueueConfiguration.of(of).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
            redeliveryPolicy.setMaximumRedeliveries(4000);
            activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
            ActiveMQQueue activeMQQueue = new ActiveMQQueue("exampleQueueTwo");
            activeMQConnection = activeMQConnectionFactory.createConnection();
            activeMQConnection.start();
            Session createSession = activeMQConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(activeMQQueue);
            TextMessage createTextMessage = createSession.createTextMessage("This is a text message");
            for (int i = 0; i < 1000; i++) {
                createTextMessage.setIntProperty("SEQ", i);
                createProducer.send(createTextMessage);
            }
            createSession.close();
            activeMQConnection.close();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            while (!atomicBoolean.get()) {
                activeMQConnection = activeMQConnectionFactory.createConnection();
                activeMQConnection.setCloseTimeout(1);
                activeMQConnection.start();
                Session createSession2 = activeMQConnection.createSession(true, 0);
                MessageConsumer createConsumer = createSession2.createConsumer(activeMQQueue);
                int i2 = 0;
                int i3 = 0;
                while (true) {
                    if (i3 >= 200) {
                        break;
                    }
                    TextMessage receive = createConsumer.receive(2000L);
                    if (receive == null) {
                        atomicBoolean.set(true);
                        break;
                    }
                    i2++;
                    atomicInteger.incrementAndGet();
                    Assertions.assertEquals("This is a text message", receive.getText());
                    int intProperty = receive.getIntProperty("SEQ");
                    Assertions.assertEquals((200 * (intProperty / 200)) + i3, intProperty, "@:" + i2 + ", out of order");
                    i3++;
                }
                CountDownLatch countDownLatch = new CountDownLatch(1);
                newFixedThreadPool.submit(() -> {
                    try {
                        countDownLatch.countDown();
                        createSession2.commit();
                    } catch (JMSException e) {
                    }
                });
                countDownLatch.await(1L, TimeUnit.SECONDS);
                ((FailoverTransport) activeMQConnection.getTransport().narrow(FailoverTransport.class)).stop();
                newFixedThreadPool.submit(() -> {
                    try {
                        activeMQConnection.close();
                    } catch (JMSException e) {
                    }
                });
            }
            logger.info("Done after: {}, queue: {}", Integer.valueOf(atomicInteger.get()), this.server.locateQueue(of));
            try {
                Wait.assertEquals(0L, () -> {
                    return this.server.locateQueue(of).getDeliveringCount();
                }, 1000L);
            } catch (Throwable th) {
                final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                PrintData.printData(this.server.getConfiguration().getBindingsLocation(), this.server.getConfiguration().getJournalLocation(), this.server.getConfiguration().getPagingLocation(), new PrintStream(System.out) { // from class: org.apache.activemq.artemis.tests.integration.openwire.PrefetchRedeliveryCountOpenwireTest.1
                    @Override // java.io.PrintStream
                    public void println(String str) {
                        if (atomicBoolean2.get()) {
                            super.println(str);
                        } else if (str.startsWith("### Failed Transactions")) {
                            atomicBoolean2.set(true);
                            super.println(str);
                        }
                    }
                }, true, true, true, false, -1);
                throw th;
            }
        } finally {
            if (activeMQConnection != null) {
                activeMQConnection.close();
            }
            newFixedThreadPool.shutdownNow();
        }
    }
}
