package org.apache.activemq.artemis.tests.integration.amqp;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.class */
public class AmqpIngressTimestampTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public int amqpMinLargeMessageSize = 102400;

    @Parameterized.Parameter(0)
    public boolean restart;

    @Parameterized.Parameter(1)
    public boolean large;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest$Protocol.class */
    public enum Protocol {
        CORE,
        AMQP,
        OPENWIRE
    }

    @Parameterized.Parameters(name = "restart={0}, large={1}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true, true}, new Object[]{false, false}, new Object[]{true, false}, new Object[]{false, true});
    }

    @Test(timeout = 60000)
    public void testIngressTimestampSendCore() throws Exception {
        internalTestIngressTimestamp(Protocol.CORE);
    }

    @Test(timeout = 60000)
    public void testIngressTimestampSendAMQP() throws Exception {
        internalTestIngressTimestamp(Protocol.AMQP);
    }

    @Test(timeout = 60000)
    public void testIngressTimestampSendOpenWire() throws Exception {
        internalTestIngressTimestamp(Protocol.OPENWIRE);
    }

    private void internalTestIngressTimestamp(Protocol protocol) throws Exception {
        String randomString = RandomUtil.randomString();
        this.server.createQueue(new QueueConfiguration(randomString).setRoutingType(RoutingType.ANYCAST));
        this.server.getAddressSettingsRepository().addMatch(randomString, new AddressSettings().setEnableIngressTimestamp(true));
        long currentTimeMillis = System.currentTimeMillis();
        if (protocol == Protocol.CORE) {
            sendMessagesCore(randomString, 1, true, getMessagePayload());
        } else if (protocol == Protocol.OPENWIRE) {
            sendMessagesOpenWire(randomString, 1, true, getMessagePayload());
        } else {
            sendMessages(randomString, 1, true, getMessagePayload());
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.restart) {
            this.server.stop();
            this.server.start();
            assertTrue(this.server.waitForActivation(3L, TimeUnit.SECONDS));
        }
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(randomString);
        Queue proxyToQueue = getProxyToQueue(randomString);
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessageCount, 2000L, 100L, false);
        createReceiver.flow(1);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull(receive);
        logger.info("{}", receive);
        Object messageAnnotation = receive.getMessageAnnotation("x-opt-ingress-time");
        assertNotNull(messageAnnotation);
        assertTrue(messageAnnotation instanceof Long);
        long longValue = ((Long) messageAnnotation).longValue();
        assertTrue("Ingress timstamp " + longValue + " should be >= " + longValue + " and <= " + currentTimeMillis, longValue >= currentTimeMillis && longValue <= currentTimeMillis2);
        createReceiver.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        addConnection.close();
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,OPENWIRE,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected void setData(AmqpMessage amqpMessage) throws Exception {
        amqpMessage.setBytes(getMessagePayload());
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void configureAMQPAcceptorParameters(Map<String, Object> map) {
        map.put("amqpMinLargeMessageSize", 102400);
    }

    private byte[] getMessagePayload() {
        StringBuilder sb = new StringBuilder();
        if (this.large) {
            for (int i = 0; i < 2048000; i++) {
                sb.append("AB");
            }
        } else {
            sb.append("AB");
        }
        return sb.toString().getBytes();
    }
}
