package org.apache.activemq.artemis.tests.integration.routing;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicSubscriber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/AutoClientIDShardClusterTest.class */
public class AutoClientIDShardClusterTest extends RoutingTestBase {
    private final String protocol;
    final int numMessages = 50;
    AtomicInteger toSend = new AtomicInteger(50);
    Runnable producer = new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.routing.AutoClientIDShardClusterTest.1
        final AtomicInteger producerSeq = new AtomicInteger();

        @Override // java.lang.Runnable
        public void run() {
            Connection createConnection;
            Session createSession;
            while (AutoClientIDShardClusterTest.this.toSend.get() > 0) {
                try {
                    createConnection = AutoClientIDShardClusterTest.this.createFactory(AutoClientIDShardClusterTest.this.protocol, "producer", "admin", "admin").createConnection();
                    try {
                        createConnection.start();
                        createSession = createConnection.createSession(false, 1);
                    } finally {
                    }
                } catch (Exception e) {
                }
                try {
                    MessageProducer createProducer = createSession.createProducer(createSession.createTopic(AutoClientIDShardClusterTest.this.name));
                    for (int i = 0; i < 10; i++) {
                        try {
                            if (AutoClientIDShardClusterTest.this.toSend.get() <= 0) {
                                break;
                            }
                            TextMessage createTextMessage = createSession.createTextMessage();
                            createTextMessage.setIntProperty("SEQ", this.producerSeq.get() + 1);
                            createProducer.send(createTextMessage);
                            this.producerSeq.incrementAndGet();
                            AutoClientIDShardClusterTest.this.toSend.decrementAndGet();
                        } finally {
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(100L);
                    if (createProducer != null) {
                        createProducer.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createSession != null) {
                        try {
                            createSession.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }
    };

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/AutoClientIDShardClusterTest$DurableSub.class */
    class DurableSub implements Runnable {
        final String id;
        int lastReceived;
        int maxReceived;
        int receivedInOrder = -1;
        AtomicBoolean consumerDone = new AtomicBoolean();
        AtomicBoolean orderShot = new AtomicBoolean();
        CountDownLatch registered = new CountDownLatch(1);

        DurableSub(String str) {
            this.id = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            Connection connection;
            Session createSession;
            TopicSubscriber createDurableSubscriber;
            Message receive;
            while (!this.consumerDone.get()) {
                try {
                    connection = null;
                    try {
                        connection = AutoClientIDShardClusterTest.this.createFactory(AutoClientIDShardClusterTest.this.protocol, "ClientId-" + this.id, "admin", "admin").createConnection();
                        connection.start();
                        createSession = connection.createSession(false, 1);
                        try {
                            createDurableSubscriber = createSession.createDurableSubscriber(createSession.createTopic(AutoClientIDShardClusterTest.this.name), "Sub-" + this.id);
                        } catch (Throwable th) {
                            if (createSession != null) {
                                try {
                                    createSession.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                            break;
                        }
                    } catch (Throwable th3) {
                        if (connection != null) {
                            connection.close();
                        }
                        throw th3;
                        break;
                    }
                } catch (Exception e) {
                }
                try {
                    this.registered.countDown();
                    for (int i = 0; i < 5 && (receive = createDurableSubscriber.receive(500L)) != null; i++) {
                        this.lastReceived = receive.getIntProperty("SEQ");
                        if (this.lastReceived > this.maxReceived) {
                            this.maxReceived = this.lastReceived;
                        }
                        if (this.receivedInOrder < 0) {
                            this.receivedInOrder = this.lastReceived;
                        } else if (this.receivedInOrder == this.lastReceived - 1) {
                            this.receivedInOrder++;
                        } else {
                            if (!this.orderShot.get()) {
                                System.err.println("Sub: " + this.id + ", received: out of order " + this.lastReceived + ", last in order: " + this.receivedInOrder);
                            }
                            this.orderShot.set(true);
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(500L);
                    if (createDurableSubscriber != null) {
                        createDurableSubscriber.close();
                    }
                    if (createSession != null) {
                        createSession.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Throwable th4) {
                    if (createDurableSubscriber != null) {
                        try {
                            createDurableSubscriber.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    }
                    throw th4;
                    break;
                }
            }
        }
    }

    @Parameters(name = "protocol: {0}")
    public static Collection<Object[]> data() {
        ArrayList arrayList = new ArrayList();
        for (String str : new String[]{"AMQP", "CORE", "OPENWIRE"}) {
            arrayList.add(new Object[]{str});
        }
        return arrayList;
    }

    public AutoClientIDShardClusterTest(String str) {
        this.protocol = str;
    }

    protected void setupServers() throws Exception {
        for (int i = 0; i < 2; i++) {
            setupPrimaryServer(i, true, ClusterTestBase.HAType.SharedNothingReplication, true, false);
            this.servers[i].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
            this.servers[i].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
        }
        setupClusterConnection("cluster0", this.name, MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
        setupClusterConnection("cluster1", this.name, MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
        this.toSend.set(50);
    }

    @Disabled("not totally reliable, but does show the root cause of the problem being solved")
    @TestTemplate
    public void testWithoutOutSharding() throws Exception {
        setupServers();
        startServers(0, 1);
        DurableSub durableSub = new DurableSub("0");
        DurableSub durableSub2 = new DurableSub("1");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        try {
            newFixedThreadPool.submit(durableSub);
            newFixedThreadPool.submit(durableSub2);
            Assertions.assertTrue(durableSub.registered.await(20L, TimeUnit.SECONDS));
            Assertions.assertTrue(durableSub2.registered.await(20L, TimeUnit.SECONDS));
            Assertions.assertTrue(waitForBindings(this.servers[0], this.name, true, 2, -1, 10000L));
            Assertions.assertTrue(waitForBindings(this.servers[1], this.name, true, 2, -1, 10000L));
            Assertions.assertTrue(waitForBindings(this.servers[0], this.name, false, 2, -1, 10000L));
            Assertions.assertTrue(waitForBindings(this.servers[1], this.name, false, 2, -1, 10000L));
            newFixedThreadPool.submit(this.producer);
            Assertions.assertTrue(Wait.waitFor(() -> {
                return this.toSend.get() == 0;
            }), "All sent");
            Assertions.assertTrue(Wait.waitFor(() -> {
                return durableSub.maxReceived == 50;
            }), "All received sub0");
            Assertions.assertTrue(Wait.waitFor(() -> {
                return durableSub2.maxReceived == 50;
            }), "All received sub1");
            Assertions.assertTrue(durableSub.orderShot.get() || durableSub2.orderShot.get());
            durableSub.consumerDone.set(true);
            durableSub2.consumerDone.set(true);
            newFixedThreadPool.shutdown();
            stopServers(0, 1);
        } catch (Throwable th) {
            durableSub.consumerDone.set(true);
            durableSub2.consumerDone.set(true);
            newFixedThreadPool.shutdown();
            stopServers(0, 1);
            throw th;
        }
    }

    @TestTemplate
    public void testWithConsistentHashClientIDModTwo() throws Exception {
        setupServers();
        addRouterWithClientIdConsistentHashMod();
        startServers(0, 1);
        DurableSub durableSub = new DurableSub("0");
        DurableSub durableSub2 = new DurableSub("1");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        try {
            newFixedThreadPool.submit(durableSub);
            newFixedThreadPool.submit(durableSub2);
            Assertions.assertTrue(durableSub.registered.await(5L, TimeUnit.SECONDS));
            Assertions.assertTrue(durableSub2.registered.await(5L, TimeUnit.SECONDS));
            Assertions.assertTrue(waitForBindings(this.servers[0], this.name, true, 1, 1, 2000L));
            Assertions.assertTrue(waitForBindings(this.servers[1], this.name, true, 1, 1, 2000L));
            Assertions.assertTrue(waitForBindings(this.servers[0], this.name, false, 1, 1, 10000L));
            Assertions.assertTrue(waitForBindings(this.servers[1], this.name, false, 1, 1, 10000L));
            newFixedThreadPool.submit(this.producer);
            Assertions.assertTrue(Wait.waitFor(() -> {
                return this.toSend.get() == 0;
            }), "All sent");
            Assertions.assertTrue(Wait.waitFor(() -> {
                return durableSub.maxReceived == 50;
            }), "All received sub0");
            Assertions.assertTrue(Wait.waitFor(() -> {
                return durableSub2.maxReceived == 50;
            }), "All received sub1");
            Assertions.assertFalse(durableSub.orderShot.get() && durableSub2.orderShot.get());
            durableSub.consumerDone.set(true);
            durableSub2.consumerDone.set(true);
            newFixedThreadPool.shutdown();
            stopServers(0, 1);
        } catch (Throwable th) {
            durableSub.consumerDone.set(true);
            durableSub2.consumerDone.set(true);
            newFixedThreadPool.shutdown();
            stopServers(0, 1);
            throw th;
        }
    }

    private void addRouterWithClientIdConsistentHashMod() {
        for (int i = 0; i < 2; i++) {
            Configuration configuration = this.servers[i].getConfiguration();
            ConnectionRouterConfiguration name = new ConnectionRouterConfiguration().setName("bb1");
            name.setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter("NULL|" + i);
            NamedPropertyConfiguration namedPropertyConfiguration = new NamedPropertyConfiguration();
            namedPropertyConfiguration.setName("CONSISTENT_HASH_MODULO");
            HashMap hashMap = new HashMap();
            hashMap.put("MODULO", String.valueOf(2));
            namedPropertyConfiguration.setProperties(hashMap);
            name.setPolicyConfiguration(namedPropertyConfiguration);
            configuration.setConnectionRouters(Collections.singletonList(name));
            getDefaultServerAcceptor(i).getParams().put("router", "bb1");
        }
    }

    protected ConnectionFactory createFactory(String str, String str2, String str3, String str4) throws Exception {
        StringBuilder sb = new StringBuilder();
        boolean z = -1;
        switch (str.hashCode()) {
            case 2013003:
                if (str.equals("AMQP")) {
                    z = true;
                    break;
                }
                break;
            case 2074527:
                if (str.equals("CORE")) {
                    z = false;
                    break;
                }
                break;
            case 279024079:
                if (str.equals("OPENWIRE")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                sb.append("(tcp://localhost:61616,tcp://localhost:61617)?connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy");
                sb.append("&clientID=");
                sb.append(str2);
                return new ActiveMQConnectionFactory(sb.toString(), str3, str4);
            case true:
                sb.append("failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.randomize=true");
                sb.append("&jms.clientID=");
                sb.append(str2);
                return new JmsConnectionFactory(str3, str4, sb.toString());
            case true:
                sb.append("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&maxReconnectAttempts=0&startupMaxReconnectAttempts=0");
                sb.append("&jms.clientID=");
                sb.append(str2);
                return new org.apache.activemq.ActiveMQConnectionFactory(str3, str4, sb.toString());
            default:
                throw new IllegalStateException("Unexpected value: " + str);
        }
    }
}
