package org.apache.activemq.artemis.tests.integration.proton;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/proton/ProtonTest.class */
public class ProtonTest extends ActiveMQTestBase {
    ConnectionFactory factory;
    private final int protocol;
    private ActiveMQServer server;
    private final String coreAddress = "jms.queue.exampleQueue";
    private final String address;
    private Connection connection;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/proton/ProtonTest$AnythingSerializable.class */
    public static class AnythingSerializable implements Serializable {
        private int count;

        public AnythingSerializable(int i) {
            this.count = i;
        }

        public int getCount() {
            return this.count;
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{"AMQP", 0}, new Object[]{"ActiveMQ (InVM)", 1}, new Object[]{"ActiveMQ (Netty)", 2}, new Object[]{"AMQP_ANONYMOUS", 3});
    }

    public ProtonTest(String str, int i) {
        this.protocol = i;
        if (i == 0 || i == 3) {
            this.address = this.coreAddress;
        } else {
            this.address = "exampleQueue";
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        disableCheckThread();
        this.server = createServer(true, true);
        HashMap hashMap = new HashMap();
        hashMap.put("port", "5672");
        hashMap.put("protocols", "AMQP");
        this.server.getConfiguration().getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, hashMap));
        this.server.start();
        this.server.createQueue(new SimpleString(this.coreAddress), new SimpleString(this.coreAddress), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "1"), new SimpleString(this.coreAddress + "1"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "2"), new SimpleString(this.coreAddress + "2"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "3"), new SimpleString(this.coreAddress + "3"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "4"), new SimpleString(this.coreAddress + "4"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "5"), new SimpleString(this.coreAddress + "5"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "6"), new SimpleString(this.coreAddress + "6"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "7"), new SimpleString(this.coreAddress + "7"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "8"), new SimpleString(this.coreAddress + "8"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "9"), new SimpleString(this.coreAddress + "9"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString(this.coreAddress + "10"), new SimpleString(this.coreAddress + "10"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic1"), new SimpleString("amqp_testtopic1"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic2"), new SimpleString("amqp_testtopic2"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic3"), new SimpleString("amqp_testtopic3"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic4"), new SimpleString("amqp_testtopic4"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic5"), new SimpleString("amqp_testtopic5"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic6"), new SimpleString("amqp_testtopic6"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic7"), new SimpleString("amqp_testtopic7"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic8"), new SimpleString("amqp_testtopic8"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic9"), new SimpleString("amqp_testtopic9"), (SimpleString) null, true, false);
        this.server.createQueue(new SimpleString("amqp_testtopic10"), new SimpleString("amqp_testtopic10"), (SimpleString) null, true, false);
        this.connection = createConnection();
    }

    @After
    public void tearDown() throws Exception {
        try {
            Thread.sleep(250L);
            if (this.connection != null) {
                this.connection.close();
            }
            long currentTimeMillis = System.currentTimeMillis() + 1000;
            while (currentTimeMillis > System.currentTimeMillis() && this.server.getRemotingService().getConnections().size() != 0) {
                Thread.sleep(1L);
            }
            Assert.assertEquals("The remoting connection wasn't removed after connection.close()", 0L, this.server.getRemotingService().getConnections().size());
            this.server.stop();
        } finally {
            super.tearDown();
        }
    }

    @Test
    public void testTemporaryQueue() throws Throwable {
        Session createSession = this.connection.createSession(false, 1);
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        System.out.println("queue:" + createTemporaryQueue.getQueueName());
        MessageProducer createProducer = createSession.createProducer(createTemporaryQueue);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("Message temporary");
        createProducer.send(createTextMessage);
        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
        this.connection.start();
        Assert.assertNotNull(createConsumer.receive(5000L));
    }

    public void testBrowser() throws Throwable {
        boolean z = false;
        for (int i = 0; i < 10; i++) {
            z = runWithTimeout(new ActiveMQTestBase.RunnerWithEX() { // from class: org.apache.activemq.artemis.tests.integration.proton.ProtonTest.1
                public void run() throws Throwable {
                    Queue createQueue = ProtonTest.this.createQueue(ProtonTest.this.address);
                    Session createSession = ProtonTest.this.connection.createSession(false, 1);
                    MessageProducer createProducer = createSession.createProducer(createQueue);
                    for (int i2 = 0; i2 < 50; i2++) {
                        TextMessage createTextMessage = createSession.createTextMessage();
                        createTextMessage.setText("msg:" + i2);
                        createProducer.send(createTextMessage);
                    }
                    ProtonTest.this.connection.close();
                    org.apache.activemq.artemis.core.server.Queue bindable = ProtonTest.this.server.getPostOffice().getBinding(new SimpleString(ProtonTest.this.coreAddress)).getBindable();
                    ProtonTest.this.connection = ProtonTest.this.createConnection();
                    Enumeration enumeration = ProtonTest.this.connection.createSession(false, 1).createBrowser(createQueue).getEnumeration();
                    int i3 = 0;
                    while (enumeration.hasMoreElements()) {
                        TextMessage textMessage = (Message) enumeration.nextElement();
                        Assert.assertNotNull("" + i3, textMessage);
                        Assert.assertTrue("" + textMessage, textMessage instanceof TextMessage);
                        int i4 = i3;
                        i3++;
                        Assert.assertEquals(textMessage.getText(), "msg:" + i4);
                    }
                    Assert.assertEquals(i3, 50);
                    ProtonTest.this.connection.close();
                    Assert.assertEquals(ProtonTest.this.getMessageCount(bindable), 50);
                }
            }, 1000L);
            if (z) {
                break;
            }
            System.err.println("Had to make it fail!!!");
            tearDown();
            setUp();
        }
        Assert.assertTrue("Test had to interrupt on all occasions.. this is beyond the expected for the test", z);
    }

    @Test
    public void testConnection() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue(this.address));
        org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(SimpleString.toSimpleString(this.coreAddress));
        assertEquals(1L, locateQueue.getConsumerCount());
        createConsumer.close();
        for (int i = 0; i < 100 && locateQueue.getConsumerCount() != 0; i++) {
            Thread.sleep(500L);
        }
        assertEquals(0L, locateQueue.getConsumerCount());
        createSession.close();
    }

    @Test
    public void testMessagesSentTransactional() throws Exception {
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        new Random().nextBytes(new byte[2048]);
        for (int i = 0; i < 1000; i++) {
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("msg:" + i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        this.connection.close();
        org.apache.activemq.artemis.core.server.Queue queue = (org.apache.activemq.artemis.core.server.Queue) this.server.getPostOffice().getBinding(new SimpleString(this.coreAddress)).getBindable();
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (currentTimeMillis > System.currentTimeMillis() && getMessageCount(queue) != 1000) {
            Thread.sleep(1L);
        }
        Assert.assertEquals(1000, getMessageCount(queue));
    }

    @Test
    public void testMessagesSentTransactionalRolledBack() throws Exception {
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        new Random().nextBytes(new byte[2048]);
        for (int i = 0; i < 1; i++) {
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("msg:" + i);
            createProducer.send(createTextMessage);
        }
        createSession.close();
        this.connection.close();
        Assert.assertEquals(getMessageCount((org.apache.activemq.artemis.core.server.Queue) this.server.getPostOffice().getBinding(new SimpleString(this.coreAddress)).getBindable()), 0L);
    }

    @Test
    public void testCancelMessages() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        new Random().nextBytes(new byte[2048]);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("msg:" + i);
            createProducer.send(createTextMessage);
        }
        this.connection.close();
        org.apache.activemq.artemis.core.server.Queue queue = (org.apache.activemq.artemis.core.server.Queue) this.server.getPostOffice().getBinding(new SimpleString(this.coreAddress)).getBindable();
        long currentTimeMillis2 = System.currentTimeMillis() + 5000;
        while (currentTimeMillis2 > System.currentTimeMillis() && getMessageCount(queue) != 10) {
            Thread.sleep(1L);
        }
        Assert.assertEquals(10, getMessageCount(queue));
        this.connection = createConnection();
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        Thread.sleep(100L);
        createConsumer.close();
        this.connection.close();
        Assert.assertEquals(10, getMessageCount(queue));
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testClientAckMessages() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        new Random().nextBytes(new byte[2048]);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("msg:" + i);
            createProducer.send(createTextMessage);
        }
        this.connection.close();
        org.apache.activemq.artemis.core.server.Queue queue = (org.apache.activemq.artemis.core.server.Queue) this.server.getPostOffice().getBinding(new SimpleString(this.coreAddress)).getBindable();
        long currentTimeMillis2 = System.currentTimeMillis() + 5000;
        while (currentTimeMillis2 > System.currentTimeMillis() && getMessageCount(queue) != 10) {
            Thread.sleep(1L);
        }
        Assert.assertEquals(10, getMessageCount(queue));
        this.connection = createConnection();
        MessageConsumer createConsumer = this.connection.createSession(false, 2).createConsumer(createQueue);
        for (int i2 = 0; i2 < 10; i2++) {
            TextMessage receive = createConsumer.receive(5000L);
            if (receive == null) {
                System.out.println("ProtonTest.testManyMessages");
            }
            Assert.assertNotNull("" + i2, receive);
            Assert.assertTrue("" + receive, receive instanceof TextMessage);
            Assert.assertEquals(receive.getText(), "msg:" + i2);
            receive.acknowledge();
        }
        createConsumer.close();
        this.connection.close();
        Assert.assertEquals(0L, getMessageCount(queue));
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testMessagesReceivedInParallel() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        final Queue createQueue = createQueue(this.address);
        final ArrayList arrayList = new ArrayList();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.proton.ProtonTest.2
            @Override // java.lang.Runnable
            public void run() {
                Connection connection = null;
                try {
                    try {
                        connection = ProtonTest.this.createConnection();
                        connection.start();
                        MessageConsumer createConsumer = connection.createSession(false, 1).createConsumer(createQueue);
                        long j = 0;
                        for (int i = 50000; i > 0; i--) {
                            try {
                                long j2 = j + 1;
                                j = j2;
                                if (j2 % 1000 == 0) {
                                    System.out.println("received " + j + " messages");
                                }
                                Assert.assertNotNull("Could not receive message count=" + i + " on consumer", createConsumer.receive(5000L));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        try {
                            if (connection != ProtonTest.this.connection) {
                                connection.close();
                            }
                        } catch (Throwable th) {
                        }
                    } catch (Throwable th2) {
                        arrayList.add(th2);
                        th2.printStackTrace();
                        try {
                            if (connection != ProtonTest.this.connection) {
                                connection.close();
                            }
                        } catch (Throwable th3) {
                        }
                    }
                } catch (Throwable th4) {
                    try {
                        if (connection != ProtonTest.this.connection) {
                            connection.close();
                        }
                    } catch (Throwable th5) {
                    }
                    throw th4;
                }
            }
        });
        Session createSession = this.connection.createSession(false, 1);
        thread.start();
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(1);
        for (int i = 0; i < 50000; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeUTF("Hello world!!!!" + i);
            createBytesMessage.setIntProperty("count", i);
            createProducer.send(createBytesMessage);
        }
        thread.join();
        Iterator it = arrayList.iterator();
        if (it.hasNext()) {
            throw ((Throwable) it.next());
        }
        org.apache.activemq.artemis.core.server.Queue queue = (org.apache.activemq.artemis.core.server.Queue) this.server.getPostOffice().getBinding(new SimpleString(this.coreAddress)).getBindable();
        this.connection.close();
        Assert.assertEquals(0L, getMessageCount(queue));
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        System.out.println("Microbenchamrk ran in " + currentTimeMillis2 + " milliseconds, sending/receiving 50000");
        System.out.println(((int) ((50000.0d / currentTimeMillis2) * 1000.0d)) + " messages per second");
    }

    @Test
    public void testSimpleBinary() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        byte[] bArr = new byte[16];
        for (int i = 0; i <= 15; i++) {
            bArr[i] = (byte) i;
        }
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i2 = 0; i2 < 500; i2++) {
            System.out.println("Sending " + i2);
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(bArr);
            createBytesMessage.setIntProperty("count", i2);
            createProducer.send(createBytesMessage);
        }
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        for (int i3 = 0; i3 < 500; i3++) {
            BytesMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull("Could not receive message count=" + i3 + " on consumer", receive);
            receive.reset();
            byte[] bArr2 = new byte[(int) receive.getBodyLength()];
            receive.readBytes(bArr2);
            System.out.println("Received " + ByteUtil.bytesToHex(bArr2, 1) + " count - " + receive.getIntProperty("count"));
            Assert.assertArrayEquals(bArr, bArr2);
        }
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testSimpleDefault() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        byte[] bArr = new byte[16];
        for (int i = 0; i <= 15; i++) {
            bArr[i] = (byte) i;
        }
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i2 = 0; i2 < 500; i2++) {
            System.out.println("Sending " + i2);
            Message createMessage = createSession.createMessage();
            createMessage.setIntProperty("count", i2);
            createProducer.send(createMessage);
        }
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        for (int i3 = 0; i3 < 500; i3++) {
            Assert.assertNotNull("Could not receive message count=" + i3 + " on consumer", createConsumer.receive(5000L));
        }
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testSimpleMap() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 100; i++) {
            System.out.println("Sending " + i);
            MapMessage createMapMessage = createSession.createMapMessage();
            createMapMessage.setInt("i", i);
            createMapMessage.setIntProperty("count", i);
            createProducer.send(createMapMessage);
        }
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertNotNull("Could not receive message count=" + i2 + " on consumer", createConsumer.receive(5000L));
            Assert.assertEquals(i2, r0.getInt("i"));
            Assert.assertEquals(i2, r0.getIntProperty("count"));
        }
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testSimpleStream() throws Throwable {
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 100; i++) {
            StreamMessage createStreamMessage = createSession.createStreamMessage();
            createStreamMessage.writeInt(i);
            createStreamMessage.writeBoolean(true);
            createStreamMessage.writeString("test");
            createProducer.send(createStreamMessage);
        }
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        for (int i2 = 0; i2 < 100; i2++) {
            StreamMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull("Could not receive message count=" + i2 + " on consumer", receive);
            Assert.assertEquals(i2, receive.readInt());
            Assert.assertEquals(true, Boolean.valueOf(receive.readBoolean()));
            Assert.assertEquals("test", receive.readString());
        }
    }

    @Test
    public void testSimpleText() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 100; i++) {
            System.out.println("Sending " + i);
            createProducer.send(createSession.createTextMessage("text" + i));
        }
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull("Could not receive message count=" + i2 + " on consumer", receive);
            Assert.assertEquals("text" + i2, receive.getText());
        }
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testSimpleObject() throws Throwable {
        long currentTimeMillis = System.currentTimeMillis();
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i = 0; i < 1; i++) {
            System.out.println("Sending " + i);
            createProducer.send(createSession.createObjectMessage(new AnythingSerializable(i)));
        }
        MessageConsumer createConsumer = this.connection.createSession(false, 1).createConsumer(createQueue);
        for (int i2 = 0; i2 < 1; i2++) {
            Assert.assertNotNull("Could not receive message count=" + i2 + " on consumer", createConsumer.receive(5000L));
            Assert.assertEquals(i2, ((AnythingSerializable) r0.getObject()).getCount());
        }
        System.out.println("taken = " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @Test
    public void testSelector() throws Exception {
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("msg:0");
        createProducer.send(createTextMessage);
        TextMessage createTextMessage2 = createSession.createTextMessage();
        createTextMessage2.setText("msg:1");
        createTextMessage2.setStringProperty("color", "RED");
        createProducer.send(createTextMessage2);
        this.connection.start();
        TextMessage receive = createSession.createConsumer(createQueue, "color = 'RED'").receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("msg:1", receive.getText());
        Assert.assertEquals(receive.getStringProperty("color"), "RED");
        this.connection.close();
    }

    @Test
    public void testProperties() throws Exception {
        Queue createQueue = createQueue(this.address);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("msg:0");
        createTextMessage.setBooleanProperty("true", true);
        createTextMessage.setBooleanProperty("false", false);
        createTextMessage.setStringProperty("foo", "bar");
        createTextMessage.setDoubleProperty("double", 66.6d);
        createTextMessage.setFloatProperty("float", 56.789f);
        createTextMessage.setIntProperty("int", 8);
        createTextMessage.setByteProperty("byte", (byte) 10);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        this.connection.start();
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        TextMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("msg:0", receive.getText());
        Assert.assertEquals(Boolean.valueOf(receive.getBooleanProperty("true")), true);
        Assert.assertEquals(Boolean.valueOf(receive.getBooleanProperty("false")), false);
        Assert.assertEquals(receive.getStringProperty("foo"), "bar");
        Assert.assertEquals(receive.getDoubleProperty("double"), 66.6d, 1.0E-4d);
        Assert.assertEquals(receive.getFloatProperty("float"), 56.78900146484375d, 1.0E-4d);
        Assert.assertEquals(receive.getIntProperty("int"), 8L);
        Assert.assertEquals(receive.getByteProperty("byte"), 10L);
        Assert.assertNotNull(createConsumer.receive(5000L));
        this.connection.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Queue createQueue(String str) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        try {
            Queue createQueue = createSession.createQueue(str);
            createSession.close();
            return createQueue;
        } catch (Throwable th) {
            createSession.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Connection createConnection() throws JMSException {
        Connection createConnection;
        if (this.protocol == 3) {
            this.factory = new JmsConnectionFactory("amqp://localhost:5672");
            createConnection = this.factory.createConnection();
            createConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.artemis.tests.integration.proton.ProtonTest.3
                public void onException(JMSException jMSException) {
                    jMSException.printStackTrace();
                }
            });
            createConnection.start();
        } else if (this.protocol == 0) {
            this.factory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
            createConnection = this.factory.createConnection();
            createConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.artemis.tests.integration.proton.ProtonTest.4
                public void onException(JMSException jMSException) {
                    jMSException.printStackTrace();
                }
            });
            createConnection.start();
        } else {
            if (this.protocol == 1) {
                new TransportConfiguration(INVM_CONNECTOR_FACTORY);
                this.factory = new ActiveMQConnectionFactory("vm:/0");
            } else {
                this.factory = new ActiveMQConnectionFactory();
            }
            createConnection = this.factory.createConnection("guest", "guest");
            createConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.artemis.tests.integration.proton.ProtonTest.5
                public void onException(JMSException jMSException) {
                    jMSException.printStackTrace();
                }
            });
            createConnection.start();
        }
        return createConnection;
    }
}
