package org.apache.activemq.artemis.tests.integration.stomp;

import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/stomp/StompTest.class */
public class StompTest extends StompTestBase {
    private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;

    @Test
    public void testSendManyMessages() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.artemis.tests.integration.stomp.StompTest.1
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        String str = "SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��";
        for (int i = 1; i <= 1000; i++) {
            sendFrame(str);
        }
        assertTrue(countDownLatch.await(60L, TimeUnit.SECONDS));
    }

    @Test
    public void testConnect() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\nrequest-id: 1\n\n��");
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("CONNECTED"));
        Assert.assertTrue(receiveFrame.indexOf("response-id:1") >= 0);
    }

    @Test
    public void testDisconnectAndError() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\nrequest-id: 1\n\n��");
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("CONNECTED"));
        Assert.assertTrue(receiveFrame.indexOf("response-id:1") >= 0);
        sendFrame("DISCONNECT\n\n��");
        waitForFrameToTakeEffect();
        String str = "SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��";
        assertChannelClosed();
    }

    @Test
    public void testSendMessage() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��");
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertEquals("getJMSPriority", 4L, receive.getJMSPriority());
        Assert.assertTrue(Math.abs(System.currentTimeMillis() - receive.getJMSTimestamp()) < 1000);
    }

    @Test
    public void testSendMessageToNonExistentQueue() throws Exception {
        String randomString = RandomUtil.randomString();
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\ndestination:" + getQueuePrefix() + randomString + "\n\nHello World��");
        receiveFrame(1000L);
        MessageConsumer createConsumer = this.session.createConsumer(ActiveMQJMSClient.createQueue(randomString));
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertEquals("getJMSPriority", 4L, receive.getJMSPriority());
        Assert.assertTrue(Math.abs(System.currentTimeMillis() - receive.getJMSTimestamp()) < 1500);
        assertNotNull(this.server.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + randomString)));
        createConsumer.close();
        assertNull(this.server.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + randomString)));
    }

    @Test
    public void testSendMessageWithLeadingNewLine() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��\n");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��\n");
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertTrue(Math.abs(System.currentTimeMillis() - receive.getJMSTimestamp()) < 1000);
    }

    @Test
    public void testSendMessageWithReceipt() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\nreceipt: 1234\n\nHello World��");
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("RECEIPT"));
        Assert.assertTrue(receiveFrame.indexOf("receipt-id:1234") >= 0);
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertTrue(Math.abs(System.currentTimeMillis() - receive.getJMSTimestamp()) < 1000);
    }

    @Test
    public void testSendMessageWithContentLength() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        byte[] bArr = {1, 0, 0, 4};
        String str = "SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\ncontent-length:" + bArr.length + "\n\n";
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.write(str.getBytes(StandardCharsets.UTF_8));
        byteArrayOutputStream.write(bArr);
        byteArrayOutputStream.write(0);
        byteArrayOutputStream.flush();
        sendFrame(new String(byteArrayOutputStream.toByteArray()));
        BytesMessage receive = createConsumer.receive(10000L);
        Assert.assertNotNull(receive);
        assertEquals(bArr.length, receive.getBodyLength());
        assertEquals(bArr[0], receive.readByte());
        assertEquals(bArr[1], receive.readByte());
        assertEquals(bArr[2], receive.readByte());
        assertEquals(bArr[3], receive.readByte());
    }

    @Test
    public void testJMSXGroupIdCanBeSet() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\nJMSXGroupID: TEST\n\nHello World��");
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertEquals("TEST", receive.getStringProperty("JMSXGroupID"));
    }

    @Test
    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue, "foo = 'abc'");
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\nfoo:abc\nbar:123\ndestination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��");
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertEquals("foo", "abc", receive.getStringProperty("foo"));
        Assert.assertEquals("bar", "123", receive.getStringProperty("bar"));
    }

    @Test
    public void testSendMessageWithStandardHeaders() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SEND\ncorrelation-id:c123\npersistent:true\npriority:3\ntype:t345\nJMSXGroupID:abc\nfoo:abc\nbar:123\ndestination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��");
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertEquals("JMSCorrelationID", "c123", receive.getJMSCorrelationID());
        Assert.assertEquals("getJMSType", "t345", receive.getJMSType());
        Assert.assertEquals("getJMSPriority", 3L, receive.getJMSPriority());
        Assert.assertEquals(2L, receive.getJMSDeliveryMode());
        Assert.assertEquals("foo", "abc", receive.getStringProperty("foo"));
        Assert.assertEquals("bar", "123", receive.getStringProperty("bar"));
        Assert.assertEquals("JMSXGroupID", "abc", receive.getStringProperty("JMSXGroupID"));
    }

    @Test
    public void testSendMessageWithLongHeaders() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < 1024; i++) {
            stringBuffer.append("a");
        }
        sendFrame("SEND\ncorrelation-id:c123\npersistent:true\npriority:3\ntype:t345\nJMSXGroupID:abc\nfoo:abc\n" + ("longHeader:" + stringBuffer.toString() + "\n") + "destination:" + getQueuePrefix() + getQueueName() + "\n\nHello World��");
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("Hello World", receive.getText());
        Assert.assertEquals("JMSCorrelationID", "c123", receive.getJMSCorrelationID());
        Assert.assertEquals("getJMSType", "t345", receive.getJMSType());
        Assert.assertEquals("getJMSPriority", 3L, receive.getJMSPriority());
        Assert.assertEquals(2L, receive.getJMSDeliveryMode());
        Assert.assertEquals("foo", "abc", receive.getStringProperty("foo"));
        Assert.assertEquals("longHeader", 1024L, receive.getStringProperty("longHeader").length());
        Assert.assertEquals("JMSXGroupID", "abc", receive.getStringProperty("JMSXGroupID"));
    }

    @Test
    public void testSubscribeWithAutoAck() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\nfff��");
        sendMessage(getName());
        String receiveFrame = receiveFrame(10000L);
        System.out.println("-------- frame received: " + receiveFrame);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        sendFrame("DISCONNECT\n\n\n��");
        Assert.assertNull(this.session.createConsumer(this.queue).receive(1000L));
    }

    @Test
    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\n��");
        byte[] bArr = {1, 2, 3, 4, 5};
        sendMessage(bArr, (Destination) this.queue);
        String receiveFrame = receiveFrame(10000L);
        System.out.println("Message: " + receiveFrame);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Matcher matcher = Pattern.compile("Content-length:\\s*(\\d+)", 2).matcher(receiveFrame);
        Assert.assertTrue(matcher.find());
        Assert.assertEquals("5", matcher.group(1));
        Assert.assertFalse(Pattern.compile("type:\\s*null", 2).matcher(receiveFrame).find());
        Assert.assertTrue(receiveFrame.indexOf(new String(bArr)) > -1);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testSubscribeWithMessageSentWithProperties() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\n��");
        MessageProducer createProducer = this.session.createProducer(this.queue);
        BytesMessage createBytesMessage = this.session.createBytesMessage();
        createBytesMessage.setStringProperty("S", "value");
        createBytesMessage.setBooleanProperty("n", false);
        createBytesMessage.setByteProperty("byte", (byte) 9);
        createBytesMessage.setDoubleProperty("d", 2.0d);
        createBytesMessage.setFloatProperty("f", 6.0f);
        createBytesMessage.setIntProperty("i", 10);
        createBytesMessage.setLongProperty("l", 121L);
        createBytesMessage.setShortProperty("s", (short) 12);
        createBytesMessage.writeBytes("Hello World".getBytes(StandardCharsets.UTF_8));
        createProducer.send(createBytesMessage);
        String receiveFrame = receiveFrame(10000L);
        Assert.assertNotNull(receiveFrame);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("S:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("n:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("byte:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("d:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("f:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("i:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("l:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("s:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("Hello World") > 0);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testSubscribeWithID() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\nid: mysubid\n\n��");
        sendMessage(getName());
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("subscription:") > 0);
        Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testBodyWithUTF8() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\n��");
        System.out.println("AêñüC");
        sendMessage("AêñüC");
        String receiveFrame = receiveFrame(10000L);
        System.out.println(receiveFrame);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf("AêñüC") > 0);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testMessagesAreInOrder() throws Exception {
        String[] strArr = new String[10];
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\n��");
        for (int i = 0; i < 10; i++) {
            strArr[i] = getName() + i;
            sendMessage(strArr[i]);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertTrue("Message not in order", receiveFrame(1000L).indexOf(strArr[i2]) >= 0);
        }
        waitForFrameToTakeEffect();
        for (int i3 = 0; i3 < 10; i3++) {
            strArr[i3] = getName() + ":second:" + i3;
            sendMessage(strArr[i3]);
        }
        for (int i4 = 0; i4 < 10; i4++) {
            Assert.assertTrue("Message not in order", receiveFrame(1000L).indexOf(strArr[i4]) >= 0);
        }
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testSubscribeWithAutoAckAndSelector() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nselector: foo = 'zzz'\nack:auto\n\n��");
        sendMessage("Ignored message", "foo", "1234");
        sendMessage("Real message", "foo", "zzz");
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue("Should have received the real message but got: " + receiveFrame, receiveFrame.indexOf("Real message") > 0);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testSubscribeWithClientAck() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:client\n\n��");
        sendMessage(getName());
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Matcher matcher = Pattern.compile("message-id:\\s*(\\S+)", 2).matcher(receiveFrame);
        Assert.assertTrue(matcher.find());
        sendFrame("ACK\nmessage-id: " + matcher.group(1) + "\n\n��");
        sendFrame("DISCONNECT\n\n\n��");
        Assert.assertNull(this.session.createConsumer(this.queue).receive(1000L));
    }

    @Test
    public void testRedeliveryWithClientAck() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:client\n\n��");
        sendMessage(getName());
        Assert.assertTrue(receiveFrame(10000L).startsWith("MESSAGE"));
        sendFrame("DISCONNECT\n\n\n��");
        Message receive = this.session.createConsumer(this.queue).receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertTrue(receive.getJMSRedelivered());
    }

    @Test
    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
        assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
    }

    @Test
    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
        assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
    }

    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean z) throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:client\n\n��");
        sendMessage(getName());
        Assert.assertTrue(receiveFrame(10000L).startsWith("MESSAGE"));
        log.info("Reconnecting!");
        if (z) {
            sendFrame("DISCONNECT\n\n\n��");
            waitForFrameToTakeEffect();
            reconnect();
        } else {
            reconnect(100L);
            waitForFrameToTakeEffect();
        }
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("MESSAGE"));
        sendFrame("DISCONNECT\n\n\n��");
        waitForFrameToTakeEffect();
        reconnect();
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nreceipt: 1234\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendMessage("shouldBeNextMessage");
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        System.out.println(receiveFrame);
        Assert.assertTrue(receiveFrame.contains("shouldBeNextMessage"));
    }

    @Test
    public void testUnsubscribe() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\n��");
        sendMessage("first message");
        Assert.assertTrue(receiveFrame(10000L).startsWith("MESSAGE"));
        sendFrame("UNSUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nreceipt:567\n\n\n��");
        waitForReceipt();
        sendMessage("second message");
        String receiveFrame = receiveFrame(1000L);
        log.info("Received frame: " + receiveFrame);
        Assert.assertNull("No message should have been received since subscription was removed", receiveFrame);
    }

    @Test
    public void testUnsubscribeWithID() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nid: mysubid\nack:auto\n\n��");
        sendMessage("first message");
        Assert.assertTrue(receiveFrame(10000L).startsWith("MESSAGE"));
        sendFrame("UNSUBSCRIBE\nid:mysubid\nreceipt: 345\n\n\n��");
        waitForReceipt();
        sendMessage("second message");
        String receiveFrame = receiveFrame(1000L);
        log.info("Received frame: " + receiveFrame);
        Assert.assertNull("No message should have been received since subscription was removed", receiveFrame);
    }

    @Test
    public void testTransactionCommit() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(1000L).startsWith("CONNECTED"));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\ntransaction: tx1\nreceipt: 123\n\n\nHello World��");
        waitForReceipt();
        assertNull(createConsumer.receive(100L));
        sendFrame("COMMIT\ntransaction: tx1\nreceipt:456\n\n\n��");
        waitForReceipt();
        Assert.assertNotNull("Should have received a message", createConsumer.receive(1000L));
    }

    @Test
    public void testSuccessiveTransactionsWithSameID() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(1000L).startsWith("CONNECTED"));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\ntransaction: tx1\n\n\nHello World��");
        sendFrame("COMMIT\ntransaction: tx1\n\n\n��");
        Assert.assertNotNull("Should have received a message", createConsumer.receive(1000L));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\ntransaction: tx1\n\n\nHello World��");
        sendFrame("COMMIT\ntransaction: tx1\n\n\n��");
        Assert.assertNotNull("Should have received a message", createConsumer.receive(1000L));
    }

    @Test
    public void testBeginSameTransactionTwice() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(1000L).startsWith("CONNECTED"));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        Assert.assertTrue(receiveFrame(1000L).startsWith("ERROR"));
    }

    @Test
    public void testTransactionRollback() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(1000L).startsWith("CONNECTED"));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\ntransaction: tx1\n\nfirst message��");
        sendFrame("ABORT\ntransaction: tx1\n\n\n��");
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\ntransaction: tx1\n\nsecond message��");
        sendFrame("COMMIT\ntransaction: tx1\nreceipt:789\n\n\n��");
        waitForReceipt();
        TextMessage receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("second message", receive.getText());
    }

    @Test
    public void testSubscribeToTopic() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 12\n\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendMessage(getName(), (Destination) this.topic);
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        sendFrame("UNSUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 1234\n\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendMessage(getName(), (Destination) this.topic);
        String receiveFrame2 = receiveFrame(1000L);
        log.info("Received frame: " + receiveFrame2);
        Assert.assertNull("No message should have been received since subscription was removed", receiveFrame2);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testSubscribeToNonExistentQueue() throws Exception {
        String randomString = RandomUtil.randomString();
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + randomString + "\nreceipt: 12\n\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendMessage(getName(), (Destination) ActiveMQJMSClient.createQueue(randomString));
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        assertNotNull(this.server.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + randomString)));
        sendFrame("UNSUBSCRIBE\ndestination:" + getQueuePrefix() + randomString + "\nreceipt: 1234\n\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        assertNull(this.server.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + randomString)));
        sendMessage(getName(), (Destination) ActiveMQJMSClient.createQueue(randomString));
        String receiveFrame2 = receiveFrame(1000L);
        log.info("Received frame: " + receiveFrame2);
        Assert.assertNull("No message should have been received since subscription was removed", receiveFrame2);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\nclient-id: myclientid\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        String str = "SUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\ndurable-subscriber-name: " + getName() + "\n\n\n��";
        sendFrame(str);
        waitForFrameToTakeEffect();
        sendFrame("DISCONNECT\n\n\n��");
        waitForFrameToTakeEffect();
        sendMessage(getName(), (Destination) this.topic);
        reconnect(100L);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\nclient-id: myclientid\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame(str);
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        sendFrame("UNSUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 1234\n\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testDurableSubscriber() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\nclient-id: myclientid\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        String str = "SUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 12\ndurable-subscriber-name: " + getName() + "\n\n\n��";
        sendFrame(str);
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendFrame(str);
        Assert.assertTrue(receiveFrame(10000L).startsWith("ERROR"));
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testSubscribeToTopicWithNoLocal() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 12\nno-local: true\n\n\n��");
        Assert.assertTrue(receiveFrame(10000L).startsWith("RECEIPT"));
        sendFrame("SEND\ndestination:" + getTopicPrefix() + getTopicName() + "\n\nHello World��");
        String receiveFrame = receiveFrame(2000L);
        log.info("Received frame: " + receiveFrame);
        Assert.assertNull("No message should have been received since subscription was removed", receiveFrame);
        sendMessage(getName(), (Destination) this.topic);
        String receiveFrame2 = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame2.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame2.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame2.indexOf(getName()) > 0);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testClientAckNotPartOfTransaction() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:client\n\n\n��");
        sendMessage(getName());
        String receiveFrame = receiveFrame(10000L);
        Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
        Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
        Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        Assert.assertTrue(receiveFrame.indexOf("message-id:") > 0);
        Matcher matcher = Pattern.compile("message-id:\\s*(\\S+)", 2).matcher(receiveFrame);
        Assert.assertTrue(matcher.find());
        String group = matcher.group(1);
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame("ACK\nmessage-id:" + group + "\ntransaction: tx1\n\nsecond message��");
        sendFrame("ABORT\ntransaction: tx1\n\n\n��");
        Assert.assertNull("No message should have been received as the message was acked even though the transaction has been aborted", receiveFrame(1000L));
        sendFrame("UNSUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\n\n��");
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testMultiProtocolConsumers() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.topic);
        MessageConsumer createConsumer2 = this.session.createConsumer(this.topic);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(5000L).startsWith("CONNECTED"));
        sendFrame("SUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 12\n\n\n��");
        Assert.assertTrue(receiveFrame(5000L).startsWith("RECEIPT"));
        MessageProducer createProducer = this.session.createProducer(this.topic);
        TextMessage createTextMessage = this.session.createTextMessage(getName());
        for (int i = 1; i <= 1000; i++) {
            createProducer.send(createTextMessage);
            Assert.assertNotNull(createConsumer.receive(5000L));
            Assert.assertNotNull(createConsumer2.receive(5000L));
            String receiveFrame = receiveFrame(5000L);
            Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
            Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
            Assert.assertTrue(receiveFrame.indexOf(getName()) > 0);
        }
        createConsumer.close();
        createConsumer2.close();
        sendFrame("UNSUBSCRIBE\ndestination:" + getTopicPrefix() + getTopicName() + "\nreceipt: 1234\n\n\n��");
        Assert.assertTrue(receiveFrame(5000L).startsWith("RECEIPT"));
        sendMessage(getName(), (Destination) this.topic);
        String receiveFrame2 = receiveFrame(5000L);
        log.info("Received frame: " + receiveFrame2);
        Assert.assertNull("No message should have been received since subscription was removed", receiveFrame2);
        sendFrame("DISCONNECT\n\n\n��");
    }

    @Test
    public void testUnexpectedAck() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        Assert.assertTrue(receiveFrame(100000L).startsWith("CONNECTED"));
        sendFrame("ACK\nmessage-id:888888\n\n��");
        String receiveFrame = receiveFrame(100000L);
        assertNotNull(receiveFrame);
        System.out.println("received frame: " + receiveFrame);
        assertTrue(receiveFrame.startsWith("ERROR"));
        sendFrame("DISCONNECT\n\n\n��");
    }
}
