package org.apache.activemq.artemis.tests.smoke.replicationflow;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.class */
public class ReplicationFlowControlTest extends SmokeTestBase {
    public static final String SERVER_NAME_0 = "replicated-static0";
    public static final String SERVER_NAME_1 = "replicated-static1";
    ArrayList<Consumer> consumers = new ArrayList<>();
    private static Process server0;
    private static Process server1;
    static final int START_CONSUMERS = 10000;
    static final int START_SERVER = 15000;
    static final int NUMBER_OF_CONSUMERS = 10;
    static final int NUM_MESSAGES = 50000;
    static final ReusableLatch latch = new ReusableLatch(NUM_MESSAGES);
    static AtomicBoolean running = new AtomicBoolean(true);
    static AtomicInteger totalConsumed = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest$Consumer.class */
    public static class Consumer extends Thread {
        ConnectionFactory factory;
        Connection connection;
        Session session;
        Queue queue;
        MessageConsumer consumer;
        int count;
        int totalCount;
        final int consumerID;
        final boolean amqp;

        Consumer(boolean z, int i) {
            super("amqp=" + z + ", id=" + i);
            this.count = 0;
            this.totalCount = 0;
            this.amqp = z;
            this.consumerID = i;
        }

        @Override // java.lang.Thread
        public String toString() {
            return "Consumer " + this.consumerID + ", amqp::" + this.amqp;
        }

        void connect() throws Exception {
            if (this.connection != null) {
                this.connection.close();
            }
            this.count = 0;
            if (this.amqp) {
                this.factory = new JmsConnectionFactory("amqp://localhost:61616");
            } else {
                this.factory = new ActiveMQConnectionFactory();
            }
            this.connection = this.factory.createConnection();
            this.session = this.connection.createSession(true, 0);
            this.queue = this.session.createQueue("exampleQueue");
            this.consumer = this.session.createConsumer(this.queue);
            this.connection.start();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (ReplicationFlowControlTest.running.get()) {
                try {
                    try {
                        if (this.connection == null) {
                            connect();
                        }
                        this.totalCount++;
                        if (this.totalCount % 1000 == 0) {
                            System.out.println(this + " received " + this.totalCount + " messages");
                        }
                        if (this.consumer.receive(5000L) == null) {
                            System.out.println("Consumer " + this + " couldn't get a message");
                            if (this.count > 0) {
                                this.session.commit();
                                ReplicationFlowControlTest.latch.countDown(this.count);
                                ReplicationFlowControlTest.totalConsumed.addAndGet(this.count);
                                this.count = 0;
                            }
                        } else {
                            this.count++;
                            if (this.count == 100) {
                                this.session.commit();
                                ReplicationFlowControlTest.latch.countDown(this.count);
                                ReplicationFlowControlTest.totalConsumed.addAndGet(this.count);
                                this.count = 0;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } finally {
                    try {
                        this.session.commit();
                        this.connection.close();
                    } catch (Throwable th) {
                    }
                }
            }
            System.out.println("Giving up the loop " + this);
        }
    }

    @Before
    public void before() throws Exception {
        cleanupData(SERVER_NAME_0);
        cleanupData(SERVER_NAME_1);
        disableCheckThread();
    }

    @Override // org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase
    @After
    public void after() throws Exception {
        super.after();
        cleanupData(SERVER_NAME_0);
        cleanupData(SERVER_NAME_1);
    }

    @Test
    public void testPageWhileSynchronizingReplica() throws Exception {
        internalTest(false);
    }

    @Test
    @Ignore
    public void testPageWhileSyncFailover() throws Exception {
        internalTest(true);
    }

    private void internalTest(boolean z) throws Exception {
        int i = z ? NUM_MESSAGES : -1;
        Connection connection = null;
        try {
            server0 = startServer(SERVER_NAME_0, 0, 30000);
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            connection = activeMQConnectionFactory.createConnection();
            Session createSession = connection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("exampleQueue"));
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(new byte[20480]);
            for (int i2 = 0; i2 < NUM_MESSAGES; i2++) {
                createProducer.send(createBytesMessage);
                if (i2 % 1000 == 0) {
                    System.out.println("Sent " + i2 + " messages, consumed=" + totalConsumed.get());
                    createSession.commit();
                }
                if (i2 == START_CONSUMERS) {
                    System.out.println("Starting consumers");
                    startConsumers(!z);
                }
                if (i >= 0 && i2 == i) {
                    createSession.commit();
                    System.out.println("Killing server");
                    ServerUtil.killServer(server0);
                    Thread.sleep(2000L);
                    connection.close();
                    connection = activeMQConnectionFactory.createConnection();
                    createSession = connection.createSession(true, 0);
                    createProducer = createSession.createProducer(createSession.createQueue("exampleQueue"));
                }
                if (i2 == START_SERVER) {
                    System.out.println("Starting extra server");
                    server1 = startServer(SERVER_NAME_1, 0, 30000);
                }
            }
            createSession.commit();
            System.out.println("Awaiting all consumers to finish");
            while (!latch.await(10L, TimeUnit.SECONDS)) {
                fail("couldn't receive all messages");
            }
            running.set(false);
            Iterator<Consumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                Consumer next = it.next();
                next.join(10000L);
                if (next.isAlive()) {
                    next.interrupt();
                }
            }
            running.set(false);
            if (connection != null) {
                connection.close();
            }
            Iterator<Consumer> it2 = this.consumers.iterator();
            while (it2.hasNext()) {
                Consumer next2 = it2.next();
                next2.interrupt();
                next2.join();
            }
        } catch (Throwable th) {
            running.set(false);
            if (connection != null) {
                connection.close();
            }
            Iterator<Consumer> it3 = this.consumers.iterator();
            while (it3.hasNext()) {
                Consumer next3 = it3.next();
                next3.interrupt();
                next3.join();
            }
            throw th;
        }
    }

    void startConsumers(boolean z) {
        for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
            Consumer consumer = new Consumer(z && i % 2 == 0, i);
            consumer.start();
            this.consumers.add(consumer);
        }
    }
}
