package org.apache.activemq.artemis.jms.tests;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/jms/tests/AcknowledgementTest.class */
public class AcknowledgementTest extends JMSTestCase {

    /* loaded from: input_file:org/apache/activemq/artemis/jms/tests/AcknowledgementTest$LatchListener.class */
    private abstract class LatchListener implements MessageListener {
        protected Session sess;
        boolean failed;
        protected CountDownLatch latch = new CountDownLatch(1);
        protected int count = 0;

        LatchListener(Session session) {
            this.sess = session;
        }

        public void waitForMessages() throws InterruptedException {
            ProxyAssertSupport.assertTrue("failed to receive all messages", this.latch.await(2000L, TimeUnit.MILLISECONDS));
        }

        public abstract void onMessage(Message message);
    }

    /* loaded from: input_file:org/apache/activemq/artemis/jms/tests/AcknowledgementTest$MessageListenerAutoAck.class */
    private class MessageListenerAutoAck extends LatchListener {
        MessageListenerAutoAck(Session session) {
            super(session);
        }

        @Override // org.apache.activemq.artemis.jms.tests.AcknowledgementTest.LatchListener
        public void onMessage(Message message) {
            try {
                this.count++;
                TextMessage textMessage = (TextMessage) message;
                if (this.count == 1) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"a".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 2) {
                    AcknowledgementTest.this.assertRemainingMessages(2);
                    if (!"b".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 3) {
                    AcknowledgementTest.this.assertRemainingMessages(1);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    this.sess.recover();
                }
                if (this.count == 4) {
                    AcknowledgementTest.this.assertRemainingMessages(1);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    this.latch.countDown();
                }
            } catch (Exception e) {
                this.failed = true;
                this.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/jms/tests/AcknowledgementTest$MessageListenerClientAck.class */
    private class MessageListenerClientAck extends LatchListener {
        MessageListenerClientAck(Session session) {
            super(session);
        }

        @Override // org.apache.activemq.artemis.jms.tests.AcknowledgementTest.LatchListener
        public void onMessage(Message message) {
            try {
                this.count++;
                TextMessage textMessage = (TextMessage) message;
                if (this.count == 1) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"a".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected a but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 2) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"b".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected b but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 3) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"c".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected c but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("calling recover");
                    this.sess.recover();
                }
                if (this.count == 4) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"a".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected a but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("*** calling acknowledge");
                    textMessage.acknowledge();
                    AcknowledgementTest.this.assertRemainingMessages(2);
                    AcknowledgementTest.this.log.trace("calling recover");
                    this.sess.recover();
                }
                if (this.count == 5) {
                    AcknowledgementTest.this.assertRemainingMessages(2);
                    if (!"b".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected b but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("calling recover");
                    this.sess.recover();
                }
                if (this.count == 6) {
                    AcknowledgementTest.this.assertRemainingMessages(2);
                    if (!"b".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected b but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 7) {
                    AcknowledgementTest.this.assertRemainingMessages(2);
                    if (!"c".equals(textMessage.getText())) {
                        AcknowledgementTest.this.log.trace("Expected c but got " + textMessage.getText());
                        this.failed = true;
                        this.latch.countDown();
                    }
                    textMessage.acknowledge();
                    AcknowledgementTest.this.assertRemainingMessages(0);
                    this.latch.countDown();
                }
            } catch (Exception e) {
                AcknowledgementTest.this.log.error("Caught exception", e);
                this.failed = true;
                this.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/jms/tests/AcknowledgementTest$MessageListenerDupsOK.class */
    private class MessageListenerDupsOK extends LatchListener {
        MessageListenerDupsOK(Session session) {
            super(session);
        }

        @Override // org.apache.activemq.artemis.jms.tests.AcknowledgementTest.LatchListener
        public void onMessage(Message message) {
            try {
                this.count++;
                TextMessage textMessage = (TextMessage) message;
                if (this.count == 1) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"a".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 2) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"b".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 3) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    this.sess.recover();
                }
                if (this.count == 4) {
                    AcknowledgementTest.this.assertRemainingMessages(1);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    this.latch.countDown();
                }
            } catch (Exception e) {
                this.failed = true;
                this.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/jms/tests/AcknowledgementTest$MessageListenerTransactionalAck.class */
    private class MessageListenerTransactionalAck extends LatchListener {
        MessageListenerTransactionalAck(Session session) {
            super(session);
        }

        @Override // org.apache.activemq.artemis.jms.tests.AcknowledgementTest.LatchListener
        public void onMessage(Message message) {
            try {
                this.count++;
                TextMessage textMessage = (TextMessage) message;
                if (this.count == 1) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"a".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 2) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"b".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 3) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("Rollback");
                    this.sess.rollback();
                }
                if (this.count == 4) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"a".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                }
                if (this.count == 5) {
                    AcknowledgementTest.this.assertRemainingMessages(3);
                    if (!"b".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("commit");
                    this.sess.commit();
                    AcknowledgementTest.this.assertRemainingMessages(1);
                }
                if (this.count == 6) {
                    AcknowledgementTest.this.assertRemainingMessages(1);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("recover");
                    this.sess.rollback();
                }
                if (this.count == 7) {
                    AcknowledgementTest.this.assertRemainingMessages(1);
                    if (!"c".equals(textMessage.getText())) {
                        this.failed = true;
                        this.latch.countDown();
                    }
                    AcknowledgementTest.this.log.trace("Commit");
                    this.sess.commit();
                    AcknowledgementTest.this.assertRemainingMessages(0);
                    this.latch.countDown();
                }
            } catch (Exception e) {
                this.failed = true;
                this.latch.countDown();
            }
        }
    }

    @Test
    public void testPersistentMessagesForTopicDropped() throws Exception {
        TopicConnection createTopicConnection = createTopicConnection();
        TopicSession createTopicSession = createTopicConnection.createTopicSession(true, 0);
        TopicPublisher createPublisher = createTopicSession.createPublisher(ActiveMQServerTestCase.topic1);
        createPublisher.setDeliveryMode(2);
        createPublisher.publish(createTopicSession.createTextMessage("testing123"));
        createTopicSession.commit();
        createTopicConnection.close();
        checkEmpty(ActiveMQServerTestCase.topic1);
    }

    @Test
    public void testPersistentMessagesForTopicDropped2() throws Exception {
        TopicConnection createTopicConnection = createTopicConnection();
        createTopicConnection.start();
        TopicSession createTopicSession = createTopicConnection.createTopicSession(true, 0);
        TopicPublisher createPublisher = createTopicSession.createPublisher(ActiveMQServerTestCase.topic1);
        TopicSubscriber createSubscriber = createTopicSession.createSubscriber(ActiveMQServerTestCase.topic1);
        createPublisher.setDeliveryMode(2);
        createPublisher.publish(createTopicSession.createTextMessage("testing123"));
        createTopicSession.commit();
        TextMessage receive = createSubscriber.receive(3000L);
        ProxyAssertSupport.assertNotNull(receive);
        ProxyAssertSupport.assertEquals("testing123", receive.getText());
        createTopicSession.rollback();
        createTopicConnection.close();
        checkEmpty(ActiveMQServerTestCase.topic1);
    }

    @Test
    public void testRollbackRecover() throws Exception {
        TopicConnection createTopicConnection = createTopicConnection();
        TopicSession createTopicSession = createTopicConnection.createTopicSession(true, 0);
        TopicPublisher createPublisher = createTopicSession.createPublisher(ActiveMQServerTestCase.topic1);
        TopicSubscriber createSubscriber = createTopicSession.createSubscriber(ActiveMQServerTestCase.topic1);
        createTopicConnection.start();
        createPublisher.publish(createTopicSession.createTextMessage("testing123"));
        createTopicSession.commit();
        TextMessage receive = createSubscriber.receive(3000L);
        ProxyAssertSupport.assertNotNull(receive);
        ProxyAssertSupport.assertEquals("testing123", receive.getText());
        createTopicSession.rollback();
        TextMessage receive2 = createSubscriber.receive(3000L);
        ProxyAssertSupport.assertNotNull(receive2);
        ProxyAssertSupport.assertEquals("testing123", receive2.getText());
        createTopicConnection.close();
        TopicConnection createTopicConnection2 = createTopicConnection();
        createTopicConnection2.start();
        TopicSession createTopicSession2 = createTopicConnection2.createTopicSession(true, 0);
        TopicPublisher createPublisher2 = createTopicSession2.createPublisher(ActiveMQServerTestCase.topic1);
        TopicSubscriber createSubscriber2 = createTopicSession2.createSubscriber(ActiveMQServerTestCase.topic1);
        TextMessage createTextMessage = createTopicSession2.createTextMessage("testing456");
        createPublisher2.publish(createTextMessage);
        createTopicSession2.commit();
        TextMessage receive3 = createSubscriber2.receive(3000L);
        ProxyAssertSupport.assertNotNull(receive3);
        ProxyAssertSupport.assertEquals("testing456", receive3.getText());
        createTopicSession2.commit();
        createPublisher2.publish(createTextMessage);
        createTopicSession2.commit();
        TextMessage receive4 = createSubscriber2.receive(3000L);
        ProxyAssertSupport.assertNotNull(receive4);
        ProxyAssertSupport.assertEquals("testing456", receive4.getText());
        createTopicSession2.rollback();
        TextMessage receive5 = createSubscriber2.receive(3000L);
        ProxyAssertSupport.assertNotNull(receive5);
        ProxyAssertSupport.assertEquals("testing456", receive5.getText());
        createTopicSession2.commit();
    }

    @Test
    public void testTransactionalAcknowledgement() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(0);
        createSession.rollback();
        for (int i2 = 0; i2 < 20; i2++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(0);
        createSession.commit();
        assertRemainingMessages(20);
        int i3 = 0;
        while (createConsumer.receive(200L) != null) {
            i3++;
        }
        assertRemainingMessages(20);
        ProxyAssertSupport.assertEquals(i3, 20);
        createSession2.rollback();
        assertRemainingMessages(20);
        for (int i4 = 0; i4 < 20; i4++) {
            createConsumer.receive();
        }
        assertRemainingMessages(20);
        createSession2.commit();
        assertRemainingMessages(0);
        checkEmpty(this.queue1);
    }

    @Test
    public void testClientAcknowledgeNoAcknowledgement() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 2);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(false, 2);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(20);
        this.log.trace("Sent messages");
        int i2 = 0;
        while (createConsumer.receive(1000L) != null) {
            i2++;
        }
        assertRemainingMessages(20);
        this.log.trace("Received " + i2 + " messages");
        ProxyAssertSupport.assertEquals(i2, 20);
        createSession2.recover();
        assertRemainingMessages(20);
        this.log.trace("Session recover called");
        Message message = null;
        int i3 = 0;
        while (i3 < 20) {
            message = createConsumer.receive();
            this.log.trace("Received message " + i3);
            i3++;
        }
        assertRemainingMessages(20);
        this.log.trace("Received " + i3 + " messages after recover");
        message.acknowledge();
        assertRemainingMessages(0);
        checkEmpty(this.queue1);
        createConnection.close();
    }

    @Test
    public void testIndividualClientAcknowledge() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 2);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(false, 2);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(20);
        for (int i2 = 0; i2 < 20; i2++) {
            Message receive = createConsumer.receive(200L);
            ProxyAssertSupport.assertNotNull(receive);
            assertRemainingMessages(20 - i2);
            receive.acknowledge();
            assertRemainingMessages(20 - (i2 + 1));
        }
        assertRemainingMessages(0);
        createSession2.recover();
        ProxyAssertSupport.assertNull(createConsumer.receive(200L));
    }

    @Test
    public void testBulkClientAcknowledge() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 2);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(false, 2);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(20);
        this.log.trace("Sent messages");
        Message message = null;
        int i2 = 0;
        for (int i3 = 0; i3 < 20; i3++) {
            message = createConsumer.receive(200L);
            if (message == null) {
                break;
            }
            i2++;
        }
        assertRemainingMessages(20);
        ProxyAssertSupport.assertNotNull(message);
        message.acknowledge();
        assertRemainingMessages(0);
        this.log.trace("Received " + i2 + " messages");
        ProxyAssertSupport.assertEquals(i2, 20);
        createSession2.recover();
        this.log.trace("Session recover called");
        Message receive = createConsumer.receive(200L);
        this.log.trace("Message is:" + receive);
        ProxyAssertSupport.assertNull(receive);
    }

    @Test
    public void testPartialClientAcknowledge() throws Exception {
        Connection connection = null;
        try {
            connection = createConnection();
            Session createSession = connection.createSession(false, 2);
            MessageProducer createProducer = createSession.createProducer(this.queue1);
            Session createSession2 = connection.createSession(false, 2);
            MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
            connection.start();
            for (int i = 0; i < 20; i++) {
                createProducer.send(createSession.createMessage());
            }
            assertRemainingMessages(20);
            this.log.trace("Sent messages");
            int i2 = 0;
            Message message = null;
            for (int i3 = 0; i3 < 20; i3++) {
                message = createConsumer.receive(200L);
                if (message == null) {
                    break;
                }
                if (i2 == 10) {
                    message.acknowledge();
                }
                i2++;
            }
            assertRemainingMessages(9);
            ProxyAssertSupport.assertNotNull(message);
            this.log.trace("Received " + i2 + " messages");
            ProxyAssertSupport.assertEquals(i2, 20);
            createSession2.recover();
            this.log.trace("Session recover called");
            int i4 = 0;
            while (createConsumer.receive(200L) != null) {
                i4++;
            }
            ProxyAssertSupport.assertEquals(9, i4);
            if (connection != null) {
                connection.close();
            }
            removeAllMessages(this.queue1.getQueueName(), true);
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            removeAllMessages(this.queue1.getQueueName(), true);
            throw th;
        }
    }

    @Test
    public void testAutoAcknowledge() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(20);
        int i2 = 0;
        Message message = null;
        for (int i3 = 0; i3 < 20; i3++) {
            assertRemainingMessages(20 - i3);
            message = createConsumer.receive(200L);
            assertRemainingMessages(20 - (i3 + 1));
            if (message == null) {
                break;
            }
            i2++;
        }
        assertRemainingMessages(0);
        ProxyAssertSupport.assertNotNull(message);
        this.log.trace("Received " + i2 + " messages");
        ProxyAssertSupport.assertEquals(i2, 20);
        createSession2.recover();
        this.log.trace("Session recover called");
        Message receive = createConsumer.receive(200L);
        this.log.trace("Message is:" + receive);
        ProxyAssertSupport.assertNull(receive);
    }

    @Test
    public void testDupsOKAcknowledgeQueue() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 3);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(false, 3);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(20);
        int i2 = 0;
        Message message = null;
        for (int i3 = 0; i3 < 20; i3++) {
            message = createConsumer.receive(200L);
            if (message == null) {
                break;
            }
            i2++;
        }
        assertRemainingMessages(20);
        ProxyAssertSupport.assertNotNull(message);
        this.log.trace("Received " + i2 + " messages");
        ProxyAssertSupport.assertEquals(i2, 20);
        createSession2.recover();
        this.log.trace("Session recover called");
        Message receive = createConsumer.receive(200L);
        this.log.trace("Message is:" + receive);
        ProxyAssertSupport.assertNull(receive);
        createConnection.close();
        assertRemainingMessages(0);
    }

    @Test
    public void testDupsOKAcknowledgeTopic() throws Exception {
        deployConnectionFactory(null, "MyConnectionFactory2", -1, -1, -1, -1, false, false, 10, true, "mycf");
        Connection connection = null;
        try {
            connection = ((ConnectionFactory) this.ic.lookup("/mycf")).createConnection();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(ActiveMQServerTestCase.topic1);
            Session createSession2 = connection.createSession(false, 3);
            MessageConsumer createConsumer = createSession2.createConsumer(ActiveMQServerTestCase.topic1);
            connection.start();
            for (int i = 0; i < 19; i++) {
                createProducer.send(createSession.createMessage());
            }
            this.log.trace("Sent messages");
            for (int i2 = 0; i2 < 19; i2++) {
                ProxyAssertSupport.assertNotNull(createConsumer.receive(200L));
            }
            createSession2.close();
            if (connection != null) {
                connection.close();
            }
            ActiveMQServerTestCase.undeployConnectionFactory("MyConnectionFactory2");
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            ActiveMQServerTestCase.undeployConnectionFactory("MyConnectionFactory2");
            throw th;
        }
    }

    @Test
    public void testLazyAcknowledge() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 3);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(false, 3);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createMessage());
        }
        assertRemainingMessages(20);
        this.log.trace("Sent messages");
        int i2 = 0;
        Message message = null;
        for (int i3 = 0; i3 < 20; i3++) {
            message = createConsumer.receive(200L);
            if (message == null) {
                break;
            }
            i2++;
        }
        ProxyAssertSupport.assertNotNull(message);
        assertRemainingMessages(20);
        this.log.trace("Received " + i2 + " messages");
        ProxyAssertSupport.assertEquals(i2, 20);
        createSession2.recover();
        this.log.trace("Session recover called");
        Message receive = createConsumer.receive(200L);
        this.log.trace("Message is:" + receive);
        ProxyAssertSupport.assertNull(receive);
        createConnection.close();
        assertRemainingMessages(0);
    }

    @Test
    public void testMessageListenerAutoAck() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        this.log.trace("Sending messages");
        TextMessage createTextMessage = createSession.createTextMessage("a");
        TextMessage createTextMessage2 = createSession.createTextMessage("b");
        TextMessage createTextMessage3 = createSession.createTextMessage("c");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage2);
        createProducer.send(createTextMessage3);
        this.log.trace("Sent messages");
        createSession.close();
        assertRemainingMessages(3);
        createConnection.start();
        Session createSession2 = createConnection.createSession(false, 1);
        this.log.trace("Creating consumer");
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        this.log.trace("Created consumer");
        MessageListenerAutoAck messageListenerAutoAck = new MessageListenerAutoAck(createSession2);
        this.log.trace("Setting message listener");
        createConsumer.setMessageListener(messageListenerAutoAck);
        this.log.trace("Set message listener");
        messageListenerAutoAck.waitForMessages();
        Thread.sleep(500L);
        assertRemainingMessages(0);
        ProxyAssertSupport.assertFalse(messageListenerAutoAck.failed);
    }

    @Test
    public void testRecoverAutoACK() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        createProducer.setDeliveryMode(2);
        createProducer.send(createSession.createTextMessage("one"));
        createProducer.send(createSession.createTextMessage("two"));
        createConnection.close();
        assertRemainingMessages(2);
        Connection createConnection2 = createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        TextMessage receive = createConsumer.receive(1000L);
        ProxyAssertSupport.assertNotNull(receive);
        ProxyAssertSupport.assertEquals("one", receive.getText());
        createSession2.recover();
        ProxyAssertSupport.assertEquals("two", createConsumer.receive(1000L).getText());
        TextMessage receiveNoWait = createConsumer.receiveNoWait();
        if (receiveNoWait != null) {
            System.out.println("Message received " + receiveNoWait.getText());
        }
        Assert.assertNull(receiveNoWait);
        createConsumer.close();
        assertRemainingMessages(0);
    }

    @Test
    public void testMessageListenerDupsOK() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        this.log.trace("Sending messages");
        TextMessage createTextMessage = createSession.createTextMessage("a");
        TextMessage createTextMessage2 = createSession.createTextMessage("b");
        TextMessage createTextMessage3 = createSession.createTextMessage("c");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage2);
        createProducer.send(createTextMessage3);
        this.log.trace("Sent messages");
        createSession.close();
        assertRemainingMessages(3);
        createConnection.start();
        Session createSession2 = createConnection.createSession(false, 3);
        this.log.trace("Creating consumer");
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        this.log.trace("Created consumer");
        MessageListenerDupsOK messageListenerDupsOK = new MessageListenerDupsOK(createSession2);
        this.log.trace("Setting message listener");
        createConsumer.setMessageListener(messageListenerDupsOK);
        this.log.trace("Set message listener");
        messageListenerDupsOK.waitForMessages();
        createConsumer.close();
        assertRemainingMessages(0);
        ProxyAssertSupport.assertFalse(messageListenerDupsOK.failed);
    }

    @Test
    public void testMessageListenerClientAck() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        TextMessage createTextMessage = createSession.createTextMessage("a");
        TextMessage createTextMessage2 = createSession.createTextMessage("b");
        TextMessage createTextMessage3 = createSession.createTextMessage("c");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage2);
        createProducer.send(createTextMessage3);
        createSession.close();
        assertRemainingMessages(3);
        createConnection.start();
        Session createSession2 = createConnection.createSession(false, 2);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        MessageListenerClientAck messageListenerClientAck = new MessageListenerClientAck(createSession2);
        createConsumer.setMessageListener(messageListenerClientAck);
        messageListenerClientAck.waitForMessages();
        Thread.sleep(500L);
        assertRemainingMessages(0);
        createConnection.close();
        ProxyAssertSupport.assertFalse(messageListenerClientAck.failed);
    }

    @Test
    public void testMessageListenerTransactionalAck() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        TextMessage createTextMessage = createSession.createTextMessage("a");
        TextMessage createTextMessage2 = createSession.createTextMessage("b");
        TextMessage createTextMessage3 = createSession.createTextMessage("c");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage2);
        createProducer.send(createTextMessage3);
        createSession.close();
        assertRemainingMessages(3);
        createConnection.start();
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        MessageListenerTransactionalAck messageListenerTransactionalAck = new MessageListenerTransactionalAck(createSession2);
        createConsumer.setMessageListener(messageListenerTransactionalAck);
        messageListenerTransactionalAck.waitForMessages();
        Thread.sleep(500L);
        assertRemainingMessages(0);
        createConnection.close();
        ProxyAssertSupport.assertFalse(messageListenerTransactionalAck.failed);
    }

    @Test
    public void testTransactionalIgnoreACK() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(this.queue1);
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(this.queue1);
        createConnection.start();
        for (int i = 0; i < 20; i++) {
            Message createMessage = createSession.createMessage();
            createMessage.acknowledge();
            createProducer.send(createMessage);
        }
        assertRemainingMessages(0);
        createSession.rollback();
        for (int i2 = 0; i2 < 20; i2++) {
            Message createMessage2 = createSession.createMessage();
            createMessage2.acknowledge();
            createProducer.send(createMessage2);
        }
        assertRemainingMessages(0);
        createSession.commit();
        assertRemainingMessages(20);
        int i3 = 0;
        while (true) {
            Message receive = createConsumer.receive(200L);
            if (receive == null) {
                break;
            }
            receive.acknowledge();
            i3++;
        }
        assertRemainingMessages(20);
        ProxyAssertSupport.assertEquals(i3, 20);
        createSession2.rollback();
        assertRemainingMessages(20);
        for (int i4 = 0; i4 < 20; i4++) {
            createConsumer.receive();
        }
        assertRemainingMessages(20);
        createSession2.commit();
        assertRemainingMessages(0);
        checkEmpty(this.queue1);
    }

    @Test
    public void testNonBlockingAckPerf() throws Exception {
        getJmsServerManager().createConnectionFactory("testsuitecf1", false, JMSFactoryType.CF, NETTY_CONNECTOR, (String) null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, 30000L, 30000L, false, 102400, false, 1048576, -1, -1, 65536, -1, true, true, true, false, false, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, 1048576, 1048576, true, 5, -1, 2000L, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 0, false, (String) null, new String[]{"/testsuitecf1"});
        getJmsServerManager().createConnectionFactory("testsuitecf2", false, JMSFactoryType.CF, NETTY_CONNECTOR, (String) null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, 30000L, 30000L, false, 102400, false, 1048576, -1, -1, 65536, -1, true, true, true, false, false, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, 1048576, 1048576, true, 5, -1, 2000L, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 0, false, (String) null, new String[]{"/testsuitecf2"});
        ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf1");
        activeMQJMSConnectionFactory.setBlockOnAcknowledge(false);
        ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory2 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf2");
        activeMQJMSConnectionFactory2.setBlockOnAcknowledge(true);
        send(activeMQJMSConnectionFactory, this.queue1, 100);
        send(activeMQJMSConnectionFactory2, this.queue2, 100);
        long consume = consume(activeMQJMSConnectionFactory, this.queue1, 100);
        long consume2 = consume(activeMQJMSConnectionFactory2, this.queue2, 100);
        this.log.info("BlockOnAcknowledge=false MessageCount=100 TimeToConsume=" + consume);
        this.log.info("BlockOnAcknowledge=true MessageCount=100 TimeToConsume=" + consume2);
        Assert.assertTrue(consume < consume2 / 2);
    }

    private long send(ConnectionFactory connectionFactory, Destination destination, int i) throws JMSException {
        Connection createConnection = connectionFactory.createConnection();
        Throwable th = null;
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(true, 2);
            Throwable th2 = null;
            try {
                try {
                    MessageProducer createProducer = createSession.createProducer(destination);
                    TextMessage createTextMessage = createSession.createTextMessage("testing123");
                    long nanoTime = System.nanoTime();
                    for (int i2 = 0; i2 < i; i2++) {
                        createProducer.send(createTextMessage);
                    }
                    createSession.commit();
                    long nanoTime2 = System.nanoTime() - nanoTime;
                    if (createSession != null) {
                        if (0 != 0) {
                            try {
                                createSession.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createSession.close();
                        }
                    }
                    return nanoTime2;
                } finally {
                }
            } catch (Throwable th4) {
                if (createSession != null) {
                    if (th2 != null) {
                        try {
                            createSession.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createSession.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createConnection != null) {
                if (0 != 0) {
                    try {
                        createConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createConnection.close();
                }
            }
        }
    }

    private long consume(ConnectionFactory connectionFactory, Destination destination, int i) throws JMSException {
        Connection createConnection = connectionFactory.createConnection();
        Throwable th = null;
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 2);
            Throwable th2 = null;
            try {
                try {
                    MessageConsumer createConsumer = createSession.createConsumer(destination);
                    long nanoTime = System.nanoTime();
                    for (int i2 = 0; i2 < i; i2++) {
                        Message receive = createConsumer.receive(100L);
                        if (receive != null) {
                            receive.acknowledge();
                        }
                    }
                    long nanoTime2 = System.nanoTime() - nanoTime;
                    if (createSession != null) {
                        if (0 != 0) {
                            try {
                                createSession.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createSession.close();
                        }
                    }
                    return nanoTime2;
                } finally {
                }
            } catch (Throwable th4) {
                if (createSession != null) {
                    if (th2 != null) {
                        try {
                            createSession.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createSession.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createConnection != null) {
                if (0 != 0) {
                    try {
                        createConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createConnection.close();
                }
            }
        }
    }
}
