package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.util.ArrayList;
import java.util.Iterator;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.class */
public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase {
    protected final String groupAddress = ActiveMQTestBase.getUDPDiscoveryAddress();
    protected final int groupPort = ActiveMQTestBase.getUDPDiscoveryPort();

    protected boolean isNetty() {
        return false;
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        setupCluster();
    }

    protected void setupCluster() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        for (int i = 0; i < 5; i++) {
            setServer(messageLoadBalancingType, i);
        }
    }

    protected void setServer(MessageLoadBalancingType messageLoadBalancingType, int i) throws Exception {
        setupLiveServerWithDiscovery(i, this.groupAddress, this.groupPort, isFileStorage(), isNetty(), false);
        this.servers[i].getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedeliveryDelay(0L).setRedistributionDelay(0L));
        setupDiscoveryClusterConnection("cluster" + i, i, "dg1", "queues", messageLoadBalancingType, 1, isNetty());
    }

    @Test
    public void testRedistributeWithPreparedAndRestart() throws Exception {
        startServers(0);
        setupSessionFactory(0, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        ClientSession createSession = this.sfs[0].createSession(false, false, false);
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("key", i);
            createProducer.send(createMessage);
            createSession.commit();
        }
        createSession.close();
        ClientSession createSession2 = this.sfs[0].createSession(true, false, false);
        ClientConsumer createConsumer = createSession2.createConsumer("queue0");
        createSession2.start();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            XidImpl newXID = newXID();
            createSession2.start(newXID, 0);
            createConsumer.receive(5000L).acknowledge();
            createSession2.end(newXID, 67108864);
            createSession2.prepare(newXID);
            arrayList.add(newXID);
        }
        createSession2.close();
        this.sfs[0].close();
        this.sfs[0] = null;
        this.servers[0].stop();
        this.servers[0] = null;
        setServer(MessageLoadBalancingType.ON_DEMAND, 0);
        startServers(1, 2);
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        ClientSession createSession3 = this.sfs[1].createSession(false, false);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        createSession3.start();
        ClientConsumer createConsumer2 = createSession3.createConsumer("queue0");
        startServers(0);
        setupSessionFactory(0, isNetty());
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        ClientSession createSession4 = this.sfs[0].createSession(true, false, false);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            createSession4.rollback((Xid) it.next());
        }
        for (int i3 = 0; i3 < 100; i3++) {
            ClientMessage receive = createConsumer2.receive(15000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession3.commit();
    }
}
