package org.apache.activemq.artemis.tests.integration.amqp;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.class */
public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Timeout(60)
    @Test
    public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpReceiverWithFiltersTest.1
            @Override // org.apache.activemq.transport.amqp.client.AmqpValidator
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
                    markAsInvalid("Broker should not return unsupported filter on attach.");
                }
            }
        });
        HashMap hashMap = new HashMap();
        hashMap.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
        Source source = new Source();
        source.setAddress(getQueueName());
        source.setFilter(hashMap);
        source.setDurable(TerminusDurability.NONE);
        source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        addConnection.createSession().createReceiver(source);
        Assertions.assertEquals(1L, this.server.getTotalConsumerCount());
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Timeout(60)
    @Test
    public void testSupportedFiltersAreListedAsSupported() throws Exception {
        AmqpClient createAmqpClient = createAmqpClient();
        createAmqpClient.setValidator(new AmqpValidator() { // from class: org.apache.activemq.artemis.tests.integration.amqp.AmqpReceiverWithFiltersTest.2
            @Override // org.apache.activemq.transport.amqp.client.AmqpValidator
            public void inspectOpenedResource(Receiver receiver) {
                if (receiver.getRemoteSource() == null) {
                    markAsInvalid("Link opened with null source.");
                }
                if (AmqpSupport.findFilter(receiver.getRemoteSource().getFilter(), org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) {
                    markAsInvalid("Broker should return selector filter on attach.");
                }
            }
        });
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        addConnection.createSession().createReceiver(getQueueName(), "color = red");
        addConnection.getStateInspector().assertValid();
        addConnection.close();
    }

    @Timeout(60)
    @Test
    public void testReceivedUnsignedFilter() throws Exception {
        AmqpConnection connect = createAmqpClient().connect();
        try {
            AmqpSession createSession = connect.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName());
            for (int i = 0; i < 101; i++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setText("Test-Message");
                amqpMessage.setApplicationProperty("myNewID", new UnsignedInteger(i));
                createSender.send(amqpMessage);
            }
            AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), "myNewID < 50");
            ArrayList arrayList = new ArrayList(100);
            createReceiver.flow(204);
            for (int i2 = 0; i2 < 50; i2++) {
                AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                Assertions.assertNotNull(receive);
                logger.debug("Read message: {}", receive.getApplicationProperty("myNewID"));
                Assertions.assertNotNull(receive);
                arrayList.add(receive);
            }
            Assertions.assertNull(createReceiver.receiveNoWait());
            connect.close();
        } catch (Throwable th) {
            connect.close();
            throw th;
        }
    }
}
