package org.apache.activemq.artemis.tests.integration.amqp;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.class */
public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Parameterized.Parameter(0)
    public RoutingType routingType;

    @Parameterized.Parameters(name = "routingType={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{RoutingType.ANYCAST}, new Object[]{RoutingType.MULTICAST});
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    public void createAddressAndQueues(ActiveMQServer activeMQServer) throws Exception {
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void configureAMQPAcceptorParameters(Map<String, Object> map) {
        map.put("anycastPrefix", "anycast://");
        map.put("multicastPrefix", "multicast://");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    public void configureAddressPolicy(ActiveMQServer activeMQServer) {
        Configuration configuration = activeMQServer.getConfiguration();
        configuration.setJournalType(JournalType.NIO);
        Map addressSettings = configuration.getAddressSettings();
        if (addressSettings.size() == 0) {
            addressSettings.put("#", new AddressSettings());
        }
        Map.Entry entry = (Map.Entry) addressSettings.entrySet().iterator().next();
        AddressSettings addressSettings2 = (AddressSettings) entry.getValue();
        addressSettings2.setAutoCreateQueues(true);
        addressSettings2.setDefaultAddressRoutingType(this.routingType);
        logger.info("server config, isauto? {}", ((AddressSettings) entry.getValue()).isAutoCreateQueues());
        logger.info("server config, default address routing type? {}", ((AddressSettings) entry.getValue()).getDefaultAddressRoutingType());
    }

    @Test(timeout = 60000)
    public void testReattachToDurableNodeAndTryAndReceiveNewlySentMessage() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().createConnection());
        addConnection.setContainerId(getContainerID());
        addConnection.connect();
        AmqpSession createSession = addConnection.createSession();
        createSession.createDurableReceiver("multicast://test-address", getSubscriptionName()).detach();
        AddressQueryResult proxyToAddress = getProxyToAddress("test-address");
        assertNotNull(proxyToAddress);
        assertEquals(Set.of(RoutingType.MULTICAST), proxyToAddress.getRoutingTypes());
        assertEquals(0L, lookupSubscriptionQueue().getMessageCount());
        AmqpReceiver createDurableReceiver = createSession.createDurableReceiver("multicast://test-address", getSubscriptionName());
        createDurableReceiver.flow(1);
        assertEquals(0L, lookupSubscriptionQueue().getMessageCount());
        AmqpSender createSender = createSession.createSender("test-address");
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("msg:1");
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        assertNotNull(createDurableReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(1L, lookupSubscriptionQueue().getDeliveringCount());
        createSender.close();
        createDurableReceiver.close();
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testReattachToDurableNodeAndTryAndReceivePreviouslySentMessage() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().createConnection());
        addConnection.setContainerId(getContainerID());
        addConnection.connect();
        AmqpSession createSession = addConnection.createSession();
        createSession.createDurableReceiver("multicast://test-address", getSubscriptionName()).detach();
        AddressQueryResult proxyToAddress = getProxyToAddress("test-address");
        assertNotNull(proxyToAddress);
        assertEquals(Set.of(RoutingType.MULTICAST), proxyToAddress.getRoutingTypes());
        assertEquals(0L, lookupSubscriptionQueue().getMessageCount());
        AmqpSender createSender = createSession.createSender("test-address");
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("msg:1");
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        assertEquals(1L, lookupSubscriptionQueue().getMessageCount());
        AmqpReceiver createDurableReceiver = createSession.createDurableReceiver("multicast://test-address", getSubscriptionName());
        createDurableReceiver.flow(1);
        assertNotNull(createDurableReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(1L, lookupSubscriptionQueue().getDeliveringCount());
        createSender.close();
        createDurableReceiver.close();
        addConnection.close();
    }

    private String getContainerID() {
        return "myContainerID";
    }

    private String getSubscriptionName() {
        return "mySubscription";
    }

    private Queue lookupSubscriptionQueue() {
        LocalQueueBinding binding = this.server.getPostOffice().getBinding(new SimpleString(getContainerID() + "." + getSubscriptionName()));
        if (binding == null || !(binding instanceof LocalQueueBinding)) {
            throw new AssertionError("Should have found an existing queue binding for the durable subscription");
        }
        return binding.getQueue();
    }

    private AddressQueryResult getProxyToAddress(String str) throws Exception {
        return this.server.addressQuery(SimpleString.toSimpleString(str));
    }
}
