package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.class */
public class AmqpMessageDivertsTest extends AmqpClientTestSupport implements Transformer {
    static final AtomicInteger divertCount = new AtomicInteger(0);
    String largeString = createLargeString();

    protected String createLargeString() {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < 512000; i++) {
            stringBuffer.append((char) (97 + (i % 20)));
        }
        return stringBuffer.toString();
    }

    @Test(timeout = 60000)
    public void testQueueReceiverReadMessageWithDivert() throws Exception {
        runQueueReceiverReadMessageWithDivert(ComponentConfigurationRoutingType.ANYCAST.toString());
    }

    @Test(timeout = 60000)
    public void testQueueReceiverReadMessageWithDivertDefaultRouting() throws Exception {
        runQueueReceiverReadMessageWithDivert(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
    }

    public void runQueueReceiverReadMessageWithDivert(String str) throws Exception {
        String str2 = getQueueName() + "Divert";
        this.server.createQueue(new QueueConfiguration(SimpleString.toSimpleString(str2)).setRoutingType(RoutingType.ANYCAST));
        this.server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), str2, true, (String) null, (String) null, str);
        sendMessages(getQueueName(), 1);
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(str2);
        Queue proxyToQueue = getProxyToQueue(str2);
        assertEquals(1L, proxyToQueue.getMessageCount());
        createReceiver.flow(1);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        createReceiver.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Test
    public void testDivertTransformerWithProperties() throws Exception {
        testDivertTransformerWithProperties(false);
    }

    @Test
    public void testDivertTransformerWithPropertiesRebootServer() throws Exception {
        testDivertTransformerWithProperties(true);
    }

    public void testDivertTransformerWithProperties(boolean z) throws Exception {
        divertCount.set(0);
        String str = getQueueName() + "Divert";
        this.server.createQueue(new QueueConfiguration(SimpleString.toSimpleString(str)).setRoutingType(RoutingType.ANYCAST));
        this.server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), str, true, (String) null, AmqpMessageDivertsTest.class.getName(), ComponentConfigurationRoutingType.ANYCAST.toString());
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        Queue proxyToQueue = getProxyToQueue(str);
        AmqpSender createSender = createSession.createSender(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setDurable(true);
        amqpMessage.setApplicationProperty("addLarge", false);
        amqpMessage.setApplicationProperty("always", "here");
        amqpMessage.setBytes(new byte[10]);
        createSender.send(amqpMessage);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessageCount);
        AmqpMessage amqpMessage2 = new AmqpMessage();
        amqpMessage2.setDurable(true);
        amqpMessage2.setApplicationProperty("addLarge", false);
        amqpMessage2.setApplicationProperty("always", "here");
        amqpMessage2.setBytes(new byte[307200]);
        createSender.send(amqpMessage2);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(2L, proxyToQueue::getMessageCount);
        if (z) {
            AtomicInteger atomicInteger = divertCount;
            Objects.requireNonNull(atomicInteger);
            Wait.assertEquals(2, atomicInteger::get);
            addConnection.close();
            this.server.stop();
            this.server.start();
            addConnection = addConnection(createAmqpClient().connect());
            createSession = addConnection.createSession();
        } else {
            AmqpMessage amqpMessage3 = new AmqpMessage();
            amqpMessage3.setDurable(false);
            amqpMessage3.setBytes(new byte[307200]);
            amqpMessage3.setApplicationProperty("addLarge", true);
            amqpMessage3.setApplicationProperty("always", "here");
            createSender.send(amqpMessage3);
            AtomicInteger atomicInteger2 = divertCount;
            Objects.requireNonNull(atomicInteger2);
            Wait.assertEquals(3, atomicInteger2::get);
        }
        AmqpReceiver createReceiver = createSession.createReceiver(str);
        Queue proxyToQueue2 = getProxyToQueue(str);
        assertEquals(z ? 2L : 3L, proxyToQueue2.getMessageCount());
        createReceiver.flow(2);
        for (int i = 0; i < 2; i++) {
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals("here", receive.getApplicationProperty("always"));
            Assert.assertEquals("mundo", receive.getApplicationProperty("oi"));
            receive.accept();
        }
        if (!z) {
            createReceiver.flow(1);
            AmqpMessage receive2 = createReceiver.receive(5L, TimeUnit.SECONDS);
            receive2.accept();
            Assert.assertEquals("mundo", receive2.getApplicationProperty("oi"));
            Assert.assertEquals(this.largeString, receive2.getApplicationProperty("largeString"));
        }
        createReceiver.close();
        Objects.requireNonNull(proxyToQueue2);
        Wait.assertEquals(0L, proxyToQueue2::getMessageCount);
        addConnection.close();
    }

    public Message transform(Message message) {
        divertCount.incrementAndGet();
        if (message.getBooleanProperty("addLarge").booleanValue()) {
            message.putStringProperty("largeString", this.largeString);
        }
        message.putBooleanProperty("oi", true);
        message.putStringProperty("oi", "mundo");
        message.reencode();
        return message;
    }
}
