package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.util.Map;
import java.util.Random;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterConnectionConfigTest.class */
public class ClusterConnectionConfigTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        start();
    }

    private void start() throws Exception {
        setupServers();
    }

    protected boolean isNetty() {
        return true;
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType, ClusterTestBase.ClusterConfigCallback clusterConfigCallback) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), clusterConfigCallback, 0, 1, 2);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), clusterConfigCallback, 1, 0, 2);
    }

    @Test
    public void testRedistributionFlowControl() throws Exception {
        int nextInt = new Random().nextInt(Integer.MAX_VALUE);
        setupCluster(MessageLoadBalancingType.ON_DEMAND, clusterConnectionConfiguration -> {
            clusterConnectionConfiguration.setProducerWindowSize(nextInt);
        });
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 0, false);
        send(0, "queues.testaddress", 1, false, null);
        Thread.sleep(5000L);
        makeSureForwardingFlowControl(nextInt, 0, 1);
        removeConsumer(1);
    }

    private void makeSureForwardingFlowControl(int i, int... iArr) throws NoSuchFieldException, IllegalAccessException {
        for (int i2 : iArr) {
            Map records = this.servers[i2].getClusterManager().getClusterConnection("cluster" + i2).getRecords();
            assertEquals(1L, records.size());
            ((MessageFlowRecord) ((Map.Entry) records.entrySet().iterator().next()).getValue()).getClass().getDeclaredField("targetLocator").setAccessible(true);
            assertEquals(i, ((ServerLocatorInternal) r0.get(r0)).getProducerWindowSize());
        }
    }
}
