package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.class */
public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
    private static final String ADDRESS = "address";
    private static final String MESSAGE_ID = "messageId";
    private static final String CORRELATION_ID = "correlationId";
    private static final String MESSAGE_TEXT = "messageText";
    private static final String DURABLE = "durable";
    private static final String PRIORITY = "priority";
    private static final String REPLY_TO = "replyTo";
    private static final String TIME_TO_LIVE = "timeToLive";

    @Test(timeout = 60000)
    public void testCreateQueueReceiver() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveInterceptorTest.1
            public boolean intercept(AMQPMessage aMQPMessage, RemotingConnection remotingConnection) throws ActiveMQException {
                countDownLatch.countDown();
                return true;
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("msg1");
        amqpMessage.setText("Test-Message");
        createSender.send(amqpMessage);
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveInterceptorTest.2
            public boolean intercept(AMQPMessage aMQPMessage, RemotingConnection remotingConnection) throws ActiveMQException {
                countDownLatch2.countDown();
                return true;
            }
        });
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        createReceiver.flow(2);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(countDownLatch2.getCount(), 0L);
        createSender.close();
        createReceiver.close();
        addConnection.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkMessageProperties(AMQPMessage aMQPMessage, Map<String, Object> map) {
        assertNotNull(aMQPMessage);
        assertNotNull(this.server.getNodeID());
        assertNotNull(aMQPMessage.getConnectionID());
        assertEquals(aMQPMessage.getAddress(), map.get(ADDRESS));
        assertEquals(Boolean.valueOf(aMQPMessage.isDurable()), map.get(DURABLE));
        Properties properties = aMQPMessage.getProperties();
        assertEquals(properties.getCorrelationId(), map.get(CORRELATION_ID));
        assertEquals(properties.getReplyTo(), map.get(REPLY_TO));
        assertEquals(properties.getMessageId(), map.get(MESSAGE_ID));
        Header header = aMQPMessage.getHeader();
        assertEquals(header.getDurable(), map.get(DURABLE));
        assertEquals(header.getTtl().toString(), map.get(TIME_TO_LIVE).toString());
        assertEquals(header.getPriority().toString(), map.get(PRIORITY).toString());
        return true;
    }

    @Test(timeout = 60000)
    public void testCheckInterceptedMessageProperties() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        String testName = getTestName();
        final HashMap hashMap = new HashMap();
        hashMap.put(ADDRESS, testName);
        hashMap.put(MESSAGE_ID, "lala200");
        hashMap.put(CORRELATION_ID, "lala-corrId");
        hashMap.put(MESSAGE_TEXT, "Test intercepted message");
        hashMap.put(DURABLE, false);
        hashMap.put(PRIORITY, (short) 8);
        hashMap.put(REPLY_TO, "reply-to-myQueue");
        hashMap.put(TIME_TO_LIVE, 10000L);
        this.server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveInterceptorTest.3
            public boolean intercept(AMQPMessage aMQPMessage, RemotingConnection remotingConnection) throws ActiveMQException {
                countDownLatch.countDown();
                return AmqpSendReceiveInterceptorTest.this.checkMessageProperties(aMQPMessage, hashMap);
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getTestName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setMessageId("lala200");
        amqpMessage.setCorrelationId("lala-corrId");
        amqpMessage.setText("Test intercepted message");
        amqpMessage.setDurable(false);
        amqpMessage.setPriority((short) 8);
        amqpMessage.setReplyToAddress("reply-to-myQueue");
        amqpMessage.setTimeToLive(10000L);
        createSender.send(amqpMessage);
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpSendReceiveInterceptorTest.4
            public boolean intercept(AMQPMessage aMQPMessage, RemotingConnection remotingConnection) throws ActiveMQException {
                countDownLatch2.countDown();
                return AmqpSendReceiveInterceptorTest.this.checkMessageProperties(aMQPMessage, hashMap);
            }
        });
        AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
        createReceiver.flow(2);
        assertNotNull(createReceiver.receive(5L, TimeUnit.SECONDS));
        assertEquals(countDownLatch2.getCount(), 0L);
        createSender.close();
        createReceiver.close();
        addConnection.close();
    }
}
