package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.util.Iterator;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.class */
public class ExpireWhileLoadBalanceTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        setupServer(0, isFileStorage(), true);
        setupServer(1, isFileStorage(), true);
        setupServer(2, isFileStorage(), true);
        for (int i = 0; i < 3; i++) {
            this.servers[i].getConfiguration().setMessageExpiryScanPeriod(100L);
        }
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.STRICT, 1, true, 2, 0, 1);
        startServers(0, 1, 2);
        setupSessionFactory(0, true);
        setupSessionFactory(1, true);
        setupSessionFactory(2, true);
    }

    @Test
    public void testSend() throws Exception {
        waitForTopology(getServer(0), 3);
        waitForTopology(getServer(1), 3);
        waitForTopology(getServer(2), 3);
        SimpleString simpleString = SimpleString.toSimpleString("expiryQueue");
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setDeadLetterAddress(simpleString);
        addressSettings.setExpiryAddress(simpleString);
        for (int i = 0; i <= 2; i++) {
            createQueue(i, "queues.testaddress", "queue0", null, true);
            getServer(i).createQueue(simpleString, simpleString, (SimpleString) null, true, false);
            getServer(i).getAddressSettingsRepository().addMatch("#", addressSettings);
        }
        Iterator it = getServer(0).getClusterManager().getClusterConnections().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((ClusterConnection) it.next()).getRecords().values().iterator();
            while (it2.hasNext()) {
                ((MessageFlowRecord) it2.next()).getBridge().pause();
            }
        }
        ClientSession createSession = this.sfs[0].createSession(false, false);
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        for (int i2 = 0; i2 < 1000; i2++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.setExpiration(500L);
            createProducer.send(createMessage);
        }
        createSession.commit();
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer("expiryQueue");
        for (int i3 = 0; i3 < 1000; i3++) {
            ClientMessage receive = createConsumer.receive(2000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession.commit();
        createSession.close();
    }
}
