package org.apache.activemq.artemis.tests.integration.amqp.largemessages;

import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.class */
public class AMQPLargeMessageOverCoreBridgeTest extends AmqpClientTestSupport {
    private ActiveMQServer server2;

    @Before
    public void setServers() throws Exception {
        this.server.setIdentity("server1");
        this.server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:6666?protocols=CORE");
        this.server.start();
        createAddressAndQueues(this.server);
        this.server2 = createServer(5673, false);
        this.server2.start();
        createAddressAndQueues(this.server2);
    }

    private void receiveTextMessages(int i, String str, String str2, int i2) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient(new URI("tcp://localhost:" + i)).connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(str);
        createReceiver.flow(i2 + 1);
        for (int i3 = 0; i3 < i2; i3++) {
            AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull(receive);
            Assert.assertEquals(str2, receive.getText());
            receive.accept();
        }
        Assert.assertNull(createReceiver.receiveNoWait());
        createReceiver.close();
        addConnection.close();
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(5672, false);
    }

    public void sendTextMessages(int i, String str, String str2, int i2) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient(new URI("tcp://localhost:" + i)).connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(str);
            for (int i3 = 0; i3 < i2; i3++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setMessageId("MessageID:" + i3);
                amqpMessage.setDurable(true);
                amqpMessage.setText(str2);
                createSender.send(amqpMessage);
            }
        } finally {
            addConnection.close();
        }
    }

    @Test(timeout = 60000)
    public void testCoreBridgeDivert() throws Exception {
        internalTest(true);
    }

    @Test(timeout = 60000)
    public void testCoreBridgeNoDivert() throws Exception {
        internalTest(false);
    }

    private void internalTest(boolean z) throws Exception {
        this.server2.getConfiguration().addConnectorConfiguration("otherside", "tcp://localhost:6666");
        if (z) {
            this.server2.deployDivert(new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(getQueueName(0)).setForwardingAddress(getQueueName(1)));
        }
        this.server2.deployBridge(new BridgeConfiguration().setName(getTestName()).setQueueName(getQueueName(1)).setForwardingAddress(getQueueName(2)).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("otherside")));
        StringBuffer stringBuffer = new StringBuffer();
        while (stringBuffer.length() < 307200) {
            stringBuffer.append("Some large stuff ");
        }
        sendTextMessages(5673, getQueueName(z ? 0 : 1), stringBuffer.toString(), 10);
        this.server.stop();
        this.server.start();
        receiveTextMessages(5672, getQueueName(2), stringBuffer.toString(), 10);
        if (z) {
            receiveTextMessages(5673, getQueueName(0), stringBuffer.toString(), 10);
        } else {
            receiveTextMessages(5673, getQueueName(0), stringBuffer.toString(), 0);
        }
        receiveTextMessages(5673, getQueueName(1), stringBuffer.toString(), 0);
    }
}
