package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.class */
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
    @Test(timeout = 60000)
    public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNull(createReceiver.receiveNoWait());
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessagesExpired);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testExpiryThroughTTL() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setTimeToLive(1L);
        amqpMessage.setText("Test-Message");
        amqpMessage.setDurable(true);
        amqpMessage.setApplicationProperty("key1", "Value1");
        createSender.send(amqpMessage);
        createSender.close();
        Thread.sleep(100L);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNull(createReceiver.receiveNoWait());
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessagesExpired);
        addConnection.close();
        this.server.stop();
        this.server.start();
        AmqpConnection addConnection2 = addConnection(createAmqpClient().connect());
        AmqpReceiver createReceiver2 = addConnection2.createSession().createReceiver(getDeadLetterAddress());
        createReceiver2.flow(1);
        AmqpMessage receive = createReceiver2.receive(5L, TimeUnit.SECONDS);
        Assert.assertEquals(1L, receive.getTimeToLive());
        System.out.println("received.heandler.TTL" + receive.getTimeToLive());
        Assert.assertNotNull(receive);
        Assert.assertEquals("Value1", receive.getApplicationProperty("key1"));
        addConnection2.close();
    }

    @Test(timeout = 60000)
    public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(0L, proxyToQueue.getMessagesExpired());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
        amqpMessage.setTimeToLive(60000L);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNull(createReceiver.receiveNoWait());
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessagesExpired);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendMessageThatIsExpiredUsingTTLWhenAbsoluteIsZero() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setAbsoluteExpiryTime(0L);
        amqpMessage.setTimeToLive(1000L);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        Thread.sleep(1000L);
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNull(createReceiver.receiveNoWait());
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessagesExpired);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
        amqpMessage.setTimeToLive(10L);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        Thread.sleep(50L);
        assertEquals(1L, proxyToQueue.getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(0L, proxyToQueue.getMessagesExpired());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setTimeToLive(5000L);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        assertEquals(1L, proxyToQueue.getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(0L, proxyToQueue.getMessagesExpired());
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        Queue proxyToQueue = getProxyToQueue(getQueueName());
        assertNotNull(proxyToQueue);
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setTimeToLive(10L);
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        createSender.close();
        Thread.sleep(50L);
        assertEquals(1L, proxyToQueue.getMessageCount());
        AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
        createReceiver.flow(1);
        assertNull(createReceiver.receiveNoWait());
        Objects.requireNonNull(proxyToQueue);
        Wait.assertEquals(1L, proxyToQueue::getMessagesExpired);
        addConnection.close();
    }

    @Test(timeout = 60000)
    public void testExpiredMessageLandsInDLQ() throws Throwable {
        internalSendExpiry(false);
    }

    @Test(timeout = 60000)
    public void testExpiredMessageLandsInDLQAndExistsAfterRestart() throws Throwable {
        internalSendExpiry(true);
    }

    public void internalSendExpiry(boolean z) throws Throwable {
        AmqpClient createAmqpClient = createAmqpClient();
        AmqpConnection connect = createAmqpClient.connect();
        try {
            AmqpSender createSender = connect.createSession().createSender(getQueueName());
            AmqpMessage amqpMessage = new AmqpMessage();
            amqpMessage.setDurable(true);
            amqpMessage.setText("Test-Message");
            amqpMessage.setDeliveryAnnotation("shouldDisappear", 1);
            amqpMessage.setAbsoluteExpiryTime(System.currentTimeMillis() + 1000);
            createSender.send(amqpMessage);
            Queue locateQueue = this.server.locateQueue(SimpleString.toSimpleString(getDeadLetterAddress()));
            assertTrue("Message not movied to DLQ", Wait.waitFor(() -> {
                return locateQueue.getMessageCount() > 0;
            }, 5000L, 500L));
            connect.close();
            if (z) {
                this.server.stop();
                this.server.start();
            }
            connect = createAmqpClient.connect();
            AmqpReceiver createReceiver = connect.createSession().createReceiver(getDeadLetterAddress());
            createReceiver.flow(20);
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals(getQueueName(), receive.getMessageAnnotation(Message.HDR_ORIGINAL_ADDRESS.toString()));
            Assert.assertNull(receive.getDeliveryAnnotation("shouldDisappear"));
            Assert.assertNull(createReceiver.receiveNoWait());
            connect.close();
        } catch (Throwable th) {
            connect.close();
            throw th;
        }
    }
}
