package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.class */
public class LargeHeadersClusterTest extends ClusterTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final SimpleString queueName = SimpleString.of("queues.0");
    private static final int NUMBER_OF_MESSAGES = 154;

    @Parameter(index = 0)
    public String protocol;

    @Parameters(name = "protocol={0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{"AMQP"}, new Object[]{"CORE"}, new Object[]{"OPENWIRE"});
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
    }

    private void startServers(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupServers();
        setRedistributionDelay(0L);
        setupCluster(messageLoadBalancingType);
        AddressSettings expiryAddress = new AddressSettings().setRedistributionDelay(0L).setExpiryAddress(SimpleString.of("queues.expiry"));
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", expiryAddress);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", expiryAddress);
        startServers(0);
        startServers(1);
        createQueue(SimpleString.of("queues.expiry"));
        createQueue(queueName);
    }

    private void createQueue(SimpleString simpleString) throws Exception {
        QueueConfiguration routingType = QueueConfiguration.of(simpleString).setRoutingType(RoutingType.ANYCAST);
        this.servers[0].createQueue(routingType);
        this.servers[1].createQueue(routingType);
    }

    protected boolean isNetty() {
        return true;
    }

    private ConnectionFactory getJmsConnectionFactory(int i) {
        if (this.protocol.equals("AMQP")) {
            return new JmsConnectionFactory("amqp://localhost:" + (61616 + i));
        }
        if (this.protocol.equals("OPENWIRE")) {
            return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + i));
        }
        if (this.protocol.equals("CORE")) {
            return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + i));
        }
        Assertions.fail("Protocol " + this.protocol + " unknown");
        return null;
    }

    @TestTemplate
    public void testGrowingHeaders() throws Exception {
        startServers(MessageLoadBalancingType.ON_DEMAND);
        ConnectionFactory jmsConnectionFactory = getJmsConnectionFactory(0);
        ConnectionFactory jmsConnectionFactory2 = getJmsConnectionFactory(1);
        Connection createConnection = jmsConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(queueName.toString()));
            StringBuffer stringBuffer = new StringBuffer();
            for (int i = 0; i < 9500; i++) {
                stringBuffer.append("-");
            }
            try {
                AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler();
                for (int i2 = 0; i2 < 1000; i2++) {
                    try {
                        if (i2 % 100 == 0) {
                            logger.info("Sent {} messages", Integer.valueOf(i2));
                        }
                        TextMessage createTextMessage = createSession.createTextMessage("hello " + i2);
                        createTextMessage.setStringProperty("large", stringBuffer.toString());
                        createTextMessage.setBooleanProperty("newSender", false);
                        createProducer.send(createTextMessage);
                        createProducer.send(createTextMessage);
                        stringBuffer.append("-");
                    } catch (Throwable th) {
                        logger.warn("error at {}", Integer.valueOf(i2), th);
                    }
                }
                if (!this.protocol.equals("AMQP")) {
                    Assertions.assertTrue(assertionLoggerHandler.findText(new String[]{"AMQ144012"}));
                }
                assertionLoggerHandler.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                Connection createConnection2 = jmsConnectionFactory2.createConnection();
                try {
                    Session createSession2 = createConnection2.createSession(false, 1);
                    MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue("queues.0"));
                    createConnection2.start();
                    receiveAllMessages(createConsumer, 1, message -> {
                        logger.debug("received {}", message);
                    });
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    createConnection = jmsConnectionFactory.createConnection();
                    try {
                        Session createSession3 = createConnection.createSession(false, 1);
                        MessageProducer createProducer2 = createSession3.createProducer(createSession3.createQueue(queueName.toString()));
                        for (int i3 = 0; i3 < 1000; i3++) {
                            try {
                                if (i3 % 100 == 0) {
                                    logger.info("Sent {} messages", Integer.valueOf(i3));
                                }
                                TextMessage createTextMessage2 = createSession3.createTextMessage("newSender " + i3);
                                createTextMessage2.setBooleanProperty("newSender", true);
                                createProducer2.send(createTextMessage2);
                                createProducer2.send(createTextMessage2);
                            } catch (Throwable th2) {
                                logger.warn(th2.getMessage(), th2);
                            }
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                        Connection createConnection3 = jmsConnectionFactory2.createConnection();
                        try {
                            Session createSession4 = createConnection3.createSession(false, 1);
                            MessageConsumer createConsumer2 = createSession4.createConsumer(createSession4.createQueue("queues.0"));
                            createConnection3.start();
                            receiveAllMessages(createConsumer2, 1000, message2 -> {
                                try {
                                    if (message2.getBooleanProperty("newSender")) {
                                        atomicBoolean.set(true);
                                    }
                                } catch (Exception e) {
                                }
                            });
                            if (createConnection3 != null) {
                                createConnection3.close();
                            }
                            Assertions.assertTrue(atomicBoolean.get());
                        } finally {
                            if (createConnection3 != null) {
                                try {
                                    createConnection3.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            }
                        }
                    } finally {
                        if (createConnection != null) {
                            try {
                                createConnection.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private int receiveAllMessages(MessageConsumer messageConsumer, int i, Consumer<Message> consumer) throws JMSException {
        int i2 = 0;
        while (true) {
            Message receive = i2 < i ? messageConsumer.receive(10000L) : messageConsumer.receive(1000L);
            if (receive == null) {
                return i2;
            }
            i2++;
            if (consumer != null) {
                consumer.accept(receive);
            }
        }
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
    }

    protected void setRedistributionDelay(long j) {
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        this.servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        this.servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        this.servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
        this.servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
        this.servers[0].getConfiguration().setJournalBufferSize_NIO(20480);
        this.servers[0].getConfiguration().setJournalBufferSize_AIO(20480);
        this.servers[1].getConfiguration().setJournalBufferSize_NIO(20480);
        this.servers[1].getConfiguration().setJournalBufferSize_AIO(20480);
        this.servers[0].getConfiguration().getAddressSettings().clear();
        this.servers[0].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(10L));
        this.servers[1].getConfiguration().getAddressSettings().clear();
        this.servers[1].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(10L));
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        stopServers(0, 1);
        clearServer(0, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    public ConfigurationImpl createBasicConfig(int i) {
        ConfigurationImpl createBasicConfig = super.createBasicConfig(i);
        createBasicConfig.setMessageExpiryScanPeriod(100L);
        return createBasicConfig;
    }
}
