package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.NamingException;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/FlowControlOnIgnoreLargeMessageBodyTest.class */
public class FlowControlOnIgnoreLargeMessageBodyTest extends JMSTestBase {
    private Topic topic;
    private static int TOTAL_MESSAGES_COUNT = 20000;
    private static int MSG_SIZE = 153600;
    private static final String ATTR_MSG_COUNTER = "msgIdex";
    IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    private final int CONSUMERS_COUNT = 5;
    protected int receiveTimeout = 10000;
    private volatile boolean error = false;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/FlowControlOnIgnoreLargeMessageBodyTest$LoadConsumer.class */
    class LoadConsumer extends Thread {
        private final ConnectionFactory cf;
        private final Topic topic;
        private volatile boolean requestForStop;
        private volatile boolean stopped;
        private volatile int receivedMessages;
        private final int numberOfMessages;
        private int receiveTimeout;
        private final CountDownLatch consumerCreated;

        LoadConsumer(CountDownLatch countDownLatch, String str, Topic topic, ConnectionFactory connectionFactory, int i, int i2) {
            super(str);
            this.requestForStop = false;
            this.stopped = false;
            this.receivedMessages = 0;
            this.receiveTimeout = 0;
            this.cf = connectionFactory;
            this.topic = topic;
            this.receiveTimeout = i;
            this.numberOfMessages = i2;
            this.consumerCreated = countDownLatch;
        }

        public void sendStopRequest() {
            this.stopped = false;
            this.requestForStop = true;
        }

        public boolean isStopped() {
            return this.stopped;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Connection connection = null;
            Session session = null;
            this.stopped = false;
            this.requestForStop = false;
            System.out.println("Starting consumer for " + this.topic + " - " + getName());
            try {
                try {
                    connection = this.cf.createConnection();
                    connection.setClientID(getName());
                    connection.start();
                    session = connection.createSession(true, 0);
                    TopicSubscriber createDurableSubscriber = session.createDurableSubscriber(this.topic, getName());
                    this.consumerCreated.countDown();
                    int i = 0;
                    while (i < this.numberOfMessages && !this.requestForStop && !FlowControlOnIgnoreLargeMessageBodyTest.this.error) {
                        if (i == 0) {
                            System.out.println("Starting to consume for " + this.topic + " - " + getName());
                        }
                        BytesMessage receive = createDurableSubscriber.receive(this.receiveTimeout);
                        if (receive == null) {
                            System.out.println("Cannot get message in specified timeout: " + this.topic + " - " + getName());
                            FlowControlOnIgnoreLargeMessageBodyTest.this.error = true;
                        } else {
                            i++;
                            if (receive.getIntProperty(FlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != i) {
                                FlowControlOnIgnoreLargeMessageBodyTest.this.error = true;
                            }
                        }
                        if (i % 10 == 0) {
                            session.commit();
                        }
                        if (i % 100 == 0) {
                            FlowControlOnIgnoreLargeMessageBodyTest.this.log.info("## " + getName() + " " + this.topic + " received " + i);
                        }
                        this.receivedMessages = i;
                    }
                    session.commit();
                    if (session != null) {
                        try {
                            session.close();
                        } catch (JMSException e) {
                            System.err.println("Cannot close session " + e.getMessage());
                        }
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException e2) {
                            System.err.println("Cannot close connection " + e2.getMessage());
                        }
                    }
                } catch (Exception e3) {
                    System.out.println("Exception in consumer " + getName() + " : " + e3.getMessage());
                    e3.printStackTrace();
                    if (session != null) {
                        try {
                            session.close();
                        } catch (JMSException e4) {
                            System.err.println("Cannot close session " + e4.getMessage());
                        }
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException e5) {
                            System.err.println("Cannot close connection " + e5.getMessage());
                        }
                    }
                }
                this.stopped = true;
                System.out.println("Stopping consumer for " + this.topic + " - " + getName() + ", received " + getReceivedMessages());
            } catch (Throwable th) {
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e6) {
                        System.err.println("Cannot close session " + e6.getMessage());
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e7) {
                        System.err.println("Cannot close connection " + e7.getMessage());
                    }
                }
                throw th;
            }
        }

        public int getReceivedMessages() {
            return this.receivedMessages;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/FlowControlOnIgnoreLargeMessageBodyTest$LoadProducer.class */
    class LoadProducer extends Thread {
        private final ConnectionFactory cf;
        private final Topic topic;
        private final int messagesCount;
        private volatile boolean requestForStop;
        private volatile boolean stopped;
        private int sentMessages;

        LoadProducer(String str, Topic topic, ConnectionFactory connectionFactory, int i) throws Exception {
            super(str);
            this.requestForStop = false;
            this.stopped = false;
            this.sentMessages = 0;
            this.cf = connectionFactory;
            this.topic = topic;
            this.messagesCount = i;
        }

        public void sendStopRequest() {
            this.stopped = false;
            this.requestForStop = true;
        }

        public boolean isStopped() {
            return this.stopped;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.stopped = false;
            Connection connection = null;
            Session session = null;
            FlowControlOnIgnoreLargeMessageBodyTest.this.log.info("Starting producer for " + this.topic + " - " + getName());
            try {
                try {
                    connection = this.cf.createConnection();
                    session = connection.createSession(true, 0);
                    MessageProducer createProducer = session.createProducer(this.topic);
                    createProducer.setDeliveryMode(2);
                    for (int i = 1; i <= this.messagesCount && !this.requestForStop && !FlowControlOnIgnoreLargeMessageBodyTest.this.error; i++) {
                        this.sentMessages++;
                        BytesMessage createBytesMessage = session.createBytesMessage();
                        createBytesMessage.setIntProperty(FlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER, i);
                        createBytesMessage.writeBytes(new byte[FlowControlOnIgnoreLargeMessageBodyTest.MSG_SIZE]);
                        createProducer.send(createBytesMessage);
                        if (i % 10 == 0) {
                            session.commit();
                        }
                        if (i % 100 == 0) {
                            FlowControlOnIgnoreLargeMessageBodyTest.this.log.info("Address " + this.topic + " sent " + i + " messages");
                        }
                    }
                    System.out.println("Ending producer for " + this.topic + " - " + getName() + " messages " + this.sentMessages);
                    try {
                        session.commit();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    try {
                        connection.close();
                    } catch (Exception e2) {
                        e2.printStackTrace();
                    }
                } catch (Throwable th) {
                    try {
                        session.commit();
                    } catch (Exception e3) {
                        e3.printStackTrace();
                    }
                    try {
                        connection.close();
                    } catch (Exception e4) {
                        e4.printStackTrace();
                    }
                    throw th;
                }
            } catch (Exception e5) {
                FlowControlOnIgnoreLargeMessageBodyTest.this.error = true;
                e5.printStackTrace();
                try {
                    session.commit();
                } catch (Exception e6) {
                    e6.printStackTrace();
                }
                try {
                    connection.close();
                } catch (Exception e7) {
                    e7.printStackTrace();
                }
            }
            this.stopped = true;
        }

        public int getSentMessages() {
            return this.sentMessages;
        }
    }

    @Override // org.apache.activemq.artemis.tests.util.JMSTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.jmsServer.createTopic(true, "topicIn", new String[]{"/topic/topicIn"});
        this.topic = (Topic) this.namingContext.lookup("/topic/topicIn");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.util.JMSTestBase
    public boolean usePersistence() {
        return false;
    }

    @Test
    public void testFlowControl() {
        Context context = null;
        try {
            try {
                LoadProducer loadProducer = new LoadProducer("producer", this.topic, this.cf, TOTAL_MESSAGES_COUNT);
                LoadConsumer[] loadConsumerArr = new LoadConsumer[5];
                CountDownLatch countDownLatch = new CountDownLatch(5);
                for (int i = 0; i < loadConsumerArr.length; i++) {
                    loadConsumerArr[i] = new LoadConsumer(countDownLatch, "consumer " + i, this.topic, this.cf, this.receiveTimeout, TOTAL_MESSAGES_COUNT);
                }
                for (LoadConsumer loadConsumer : loadConsumerArr) {
                    loadConsumer.start();
                }
                waitForLatch(countDownLatch);
                loadProducer.start();
                loadProducer.join();
                for (LoadConsumer loadConsumer2 : loadConsumerArr) {
                    loadConsumer2.join();
                }
                String str = null;
                if (loadProducer.getSentMessages() != TOTAL_MESSAGES_COUNT) {
                    str = "Producer did not send defined count of messages";
                } else {
                    int length = loadConsumerArr.length;
                    int i2 = 0;
                    while (true) {
                        if (i2 >= length) {
                            break;
                        }
                        if (loadConsumerArr[i2].getReceivedMessages() != TOTAL_MESSAGES_COUNT) {
                            str = "Consumer did not send defined count of messages";
                            break;
                        }
                        i2++;
                    }
                }
                if (str != null) {
                    System.err.println(" ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ");
                    System.err.println(str);
                } else {
                    System.out.println(" OK ");
                }
                assertFalse(this.error);
                assertNull(str);
                if (0 != 0) {
                    try {
                        context.close();
                    } catch (NamingException e) {
                        this.log.warn(e.getMessage(), e);
                    }
                }
            } catch (Exception e2) {
                this.log.warn(e2.getMessage(), e2);
                if (0 != 0) {
                    try {
                        context.close();
                    } catch (NamingException e3) {
                        this.log.warn(e3.getMessage(), e3);
                    }
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    context.close();
                } catch (NamingException e4) {
                    this.log.warn(e4.getMessage(), e4);
                }
            }
            throw th;
        }
    }
}
