package org.apache.activemq.artemis.tests.integration.server;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.activemq.artemis.tests.integration.replication.ReplicationOrderTest;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressFailoverTest.class */
public class RetroactiveAddressFailoverTest extends FailoverTestBase {
    protected ServerLocator locator;
    protected ClientSessionFactoryInternal sf;
    String internalNamingPrefix = "$.artemis.internal.";
    String delimiter = ".";

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.locator = getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(ReplicationOrderTest.NUM).setRetryInterval(100L);
        this.sf = createSessionFactoryAndWaitForTopology(this.locator, 2);
    }

    @Test
    public void testFailover() throws Exception {
        ActiveMQServer server = this.primaryServer.getServer();
        ActiveMQServer server2 = this.backupServer.getServer();
        ClientSession addClientSession = addClientSession(this.sf.createSession(true, true));
        SimpleString of = SimpleString.of("simpleQueue");
        SimpleString of2 = SimpleString.of("myAddress");
        SimpleString retroactiveResourceQueueName = ResourceNames.getRetroactiveResourceQueueName(this.internalNamingPrefix, this.delimiter, of2, RoutingType.MULTICAST);
        server.getAddressSettingsRepository().addMatch(of2.toString(), new AddressSettings().setRetroactiveMessageCount(10L));
        server2.getAddressSettingsRepository().addMatch(of2.toString(), new AddressSettings().setRetroactiveMessageCount(10L));
        server.addAddressInfo(new AddressInfo(of2));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer(of2));
        for (int i = 0; i < 5; i++) {
            ClientMessage createMessage = addClientSession.createMessage(true);
            createMessage.putIntProperty("xxx", i);
            addClientProducer.send(createMessage);
        }
        Wait.assertTrue(() -> {
            return server.locateQueue(retroactiveResourceQueueName).getMessageCount() == 5;
        });
        crash(addClientSession);
        Wait.assertTrue(() -> {
            return server2.locateQueue(retroactiveResourceQueueName).getMessageCount() == 5;
        });
        for (int i2 = 5; i2 < 15; i2++) {
            ClientMessage createMessage2 = addClientSession.createMessage(true);
            createMessage2.putIntProperty("xxx", i2);
            addClientProducer.send(createMessage2);
        }
        Wait.assertTrue(() -> {
            return server2.locateQueue(retroactiveResourceQueueName).getMessageCount() == 10;
        });
        addClientSession.createQueue(QueueConfiguration.of(of).setAddress(of2).setRoutingType(RoutingType.ANYCAST));
        Wait.assertTrue(() -> {
            return server2.locateQueue(of) != null;
        });
        Wait.assertTrue(() -> {
            return server2.locateQueue(of).getMessageCount() == 10;
        });
        ClientConsumer createConsumer = addClientSession.createConsumer(of);
        for (int i3 = 5; i3 < 15; i3++) {
            addClientSession.start();
            ClientMessage receive = createConsumer.receive(1000L);
            Assertions.assertNotNull(receive);
            receive.acknowledge();
            Assertions.assertEquals(i3, receive.getIntProperty("xxx").intValue());
        }
        createConsumer.close();
        addClientSession.deleteQueue(of);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMAcceptor(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMConnector(z);
    }
}
