package org.apache.activemq.artemis.tests.integration.openwire.interop;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.MapMessage;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.StreamMessage;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireInterceptor;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportListener;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.class */
public class GeneralInteropTest extends BasicOpenWireTest {

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest$SimpleSerializable.class */
    private static class SimpleSerializable implements Serializable {
        private static final long serialVersionUID = -1034113865185130710L;
        public String objName = "simple-serializable";
        public int intVal = 9999;
        public long longVal = 88888888;

        private SimpleSerializable() {
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest, org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
    }

    @Test
    public void testReceivingFromCore() throws Exception {
        sendTextMessageUsingCoreJms(this.queueName, "HelloWorld");
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(createDestination);
        TextMessage receive = createConsumer.receive(5000L);
        assertEquals("HelloWorld", receive.getText());
        assertEquals(createDestination, receive.getJMSDestination());
        sendMapMessageUsingCoreJms(this.queueName);
        MapMessage receive2 = createConsumer.receive(5000L);
        assertEquals(createDestination, receive2.getJMSDestination());
        assertTrue(receive2.getBoolean("aboolean"));
        assertEquals(4L, receive2.getByte("abyte"));
        byte[] bytes = receive2.getBytes("abytes");
        assertEquals(2L, bytes.length);
        assertEquals(4L, bytes[0]);
        assertEquals(5L, bytes[1]);
        assertEquals(97L, receive2.getChar("achar"));
        assertTrue(Double.valueOf(receive2.getDouble("adouble")).equals(Double.valueOf(4.4d)));
        assertTrue(Float.valueOf(receive2.getFloat("afloat")).equals(Float.valueOf(4.5f)));
        assertEquals(40L, receive2.getInt("aint"));
        assertEquals(80L, receive2.getLong("along"));
        assertEquals(65L, receive2.getShort("ashort"));
        assertEquals("hello", receive2.getString("astring"));
        SimpleSerializable simpleSerializable = new SimpleSerializable();
        sendObjectMessageUsingCoreJms(this.queueName, simpleSerializable);
        SimpleSerializable simpleSerializable2 = (SimpleSerializable) createConsumer.receive(5000L).getObject();
        assertEquals(simpleSerializable.objName, simpleSerializable2.objName);
        assertEquals(simpleSerializable.intVal, simpleSerializable2.intVal);
        assertEquals(simpleSerializable.longVal, simpleSerializable2.longVal);
        sendStreamMessageUsingCoreJms(this.queueName);
        StreamMessage receive3 = createConsumer.receive(5000L);
        assertEquals(createDestination, receive3.getJMSDestination());
        assertTrue(receive3.readBoolean());
        assertEquals(2L, receive3.readByte());
        receive3.readBytes(new byte[2]);
        assertEquals(6L, r0[0]);
        assertEquals(7L, r0[1]);
        assertEquals(98L, receive3.readChar());
        assertTrue(Double.valueOf(receive3.readDouble()).equals(Double.valueOf(6.5d)));
        assertTrue(Float.valueOf(receive3.readFloat()).equals(Float.valueOf(93.9f)));
        assertEquals(7657L, receive3.readInt());
        assertEquals(239999L, receive3.readLong());
        assertEquals(-31314L, receive3.readShort());
        assertEquals("hello streammessage", receive3.readString());
        byte[] bytes2 = "HelloWorld".getBytes(StandardCharsets.UTF_8);
        sendBytesMessageUsingCoreJms(this.queueName, bytes2);
        BytesMessage receive4 = createConsumer.receive(5000L);
        receive4.readBytes(new byte[bytes2.length]);
        for (int i = 0; i < bytes2.length; i++) {
            assertEquals("failed at " + i, bytes2[i], r0[i]);
        }
        assertTrue(receive4.readBoolean());
        assertEquals(99999L, receive4.readLong());
        assertEquals(104L, receive4.readChar());
        assertEquals(987L, receive4.readInt());
        assertEquals(1099L, receive4.readShort());
        assertEquals("hellobytes", receive4.readUTF());
        sendMessageUsingCoreJms(this.queueName);
        Message receive5 = createConsumer.receive(5000L);
        assertEquals(createDestination, receive5.getJMSDestination());
        assertEquals("HelloMessage", receive5.getStringProperty("stringProperty"));
        assertFalse(receive5.getBooleanProperty("booleanProperty"));
        assertEquals(99999L, receive5.getLongProperty("longProperty"));
        assertEquals(979L, receive5.getIntProperty("intProperty"));
        assertEquals(1099L, receive5.getShortProperty("shortProperty"));
        assertEquals("HelloMessage", receive5.getStringProperty("stringProperty"));
    }

    @Test
    public void testMutipleReceivingFromCore() throws Exception {
        sendMultipleTextMessagesUsingCoreJms(this.queueName, "HelloWorld", 100);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createDestination = createDestination(createSession, (byte) 1);
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(createDestination);
        for (int i = 0; i < 100; i++) {
            TextMessage receive = createConsumer.receive(5000L);
            assertEquals("HelloWorld" + i, receive.getText());
            assertEquals(createDestination, receive.getJMSDestination());
        }
    }

    @Test
    public void testFailoverReceivingFromCore() throws Exception {
        SimpleString simpleString = new SimpleString("DLA");
        SimpleString simpleString2 = new SimpleString("DLQ1");
        this.server.createQueue(new QueueConfiguration(simpleString2).setAddress(simpleString).setDurable(false));
        this.server.getAddressSettingsRepository().addMatch(this.queueName, new AddressSettings().setDeadLetterAddress(simpleString));
        sendMultipleTextMessagesUsingCoreJms(this.queueName, "HelloWorld", 100);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?randomize=false&timeout=400&reconnectDelay=500&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500&nested.wireFormat.maxInactivityDurationInitalDelay=500&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400");
        activeMQConnectionFactory.setSendAcksAsync(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(false);
        activeMQConnectionFactory.getPrefetchPolicy().setAll(10);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        try {
            createConnection.setClientID("test.consumer.queue." + this.queueName);
            createConnection.start();
            Session createSession = createConnection.createSession(false, 2);
            Queue createQueue = createSession.createQueue(this.queueName);
            QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource("queue." + this.queueName);
            QueueControl queueControl2 = (QueueControl) this.server.getManagementService().getResource("queue." + simpleString2.toString());
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            TextMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("HelloWorld0", receive.getText());
            receive.acknowledge();
            Wait.assertEquals(1L, () -> {
                return queueControl.getMessagesAcknowledged();
            }, 3000L, 100L);
            Wait.assertEquals(10, () -> {
                return queueControl.getDeliveringCount();
            }, 3000L, 100L);
            TextMessage receive2 = createConsumer.receive(5000L);
            assertNotNull(receive2);
            assertTrue(receive2 instanceof TextMessage);
            assertEquals("HelloWorld1", receive2.getText());
            this.server.getRemotingService().addIncomingInterceptor(new OpenWireInterceptor() { // from class: org.apache.activemq.artemis.tests.integration.openwire.interop.GeneralInteropTest.1
                public boolean intercept(Command command, RemotingConnection remotingConnection) throws ActiveMQException {
                    if (!command.isMessageAck()) {
                        return true;
                    }
                    GeneralInteropTest.this.server.getRemotingService().removeIncomingInterceptor(this);
                    return false;
                }
            });
            receive2.acknowledge();
            Wait.waitFor(() -> {
                return (queueControl2.getMessageCount() == 1 && queueControl.getMessagesAcknowledged() == 1) || (queueControl2.getMessageCount() == 0 && queueControl.getMessagesAcknowledged() == 2);
            }, 3000L, 100L);
            Wait.assertEquals(10, () -> {
                return queueControl.getDeliveringCount();
            }, 3000L, 100L);
            TextMessage receive3 = createConsumer.receive(5000L);
            assertNotNull(receive3);
            assertTrue(receive3 instanceof TextMessage);
            assertEquals("HelloWorld2", receive3.getText());
            receive3.acknowledge();
            Wait.waitFor(() -> {
                return (queueControl2.getMessageCount() == 1 && queueControl.getMessagesAcknowledged() == 2) || (queueControl2.getMessageCount() == 0 && queueControl.getMessagesAcknowledged() == 3);
            }, 3000L, 100L);
            Wait.assertEquals(10, () -> {
                return queueControl.getDeliveringCount();
            }, 30000L, 100L);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test
    public void testFailoverReceivingFromCoreWithAckAfterInterrupt() throws Exception {
        sendMultipleTextMessagesUsingCoreJms(this.queueName, "HelloWorld", 100);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.MaxInactivityDuration=5000)");
        activeMQConnectionFactory.setSendAcksAsync(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(false);
        activeMQConnectionFactory.getPrefetchPolicy().setAll(10);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        try {
            createConnection.setClientID("test.consumer.queue." + this.queueName);
            createConnection.start();
            Session createSession = createConnection.createSession(false, 2);
            Queue createQueue = createSession.createQueue(this.queueName);
            QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource("queue." + this.queueName);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            TextMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            assertTrue(receive instanceof TextMessage);
            assertEquals("HelloWorld0", receive.getText());
            receive.acknowledge();
            Wait.assertEquals(1L, () -> {
                return queueControl.getMessagesAcknowledged();
            }, 3000L, 100L);
            Wait.assertEquals(10, () -> {
                return queueControl.getDeliveringCount();
            }, 3000L, 100L);
            TextMessage receive2 = createConsumer.receive(5000L);
            assertNotNull(receive2);
            assertTrue(receive2 instanceof TextMessage);
            assertEquals("HelloWorld1", receive2.getText());
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            createConnection.addTransportListener(new TransportListener() { // from class: org.apache.activemq.artemis.tests.integration.openwire.interop.GeneralInteropTest.2
                public void onCommand(Object obj) {
                }

                public void onException(IOException iOException) {
                }

                public void transportInterupted() {
                    countDownLatch.countDown();
                }

                public void transportResumed() {
                }
            });
            for (ServerSession serverSession : this.server.getSessions()) {
                if (createSession.toString().contains(serverSession.getName())) {
                    serverSession.getRemotingConnection().fail(new ActiveMQDisconnectedException());
                }
            }
            assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            receive2.acknowledge();
            Wait.assertEquals(1L, () -> {
                return queueControl.getMessagesAcknowledged();
            }, 3000L, 100L);
            Wait.assertEquals(10, () -> {
                return queueControl.getDeliveringCount();
            }, 3000L, 100L);
            TextMessage receive3 = createConsumer.receive(5000L);
            assertNotNull(receive3);
            assertTrue(receive3 instanceof TextMessage);
            assertEquals("HelloWorld1", receive3.getText());
            receive3.acknowledge();
            Wait.assertEquals(2L, () -> {
                return queueControl.getMessagesAcknowledged();
            }, 3000L, 100L);
            Wait.assertEquals(10, () -> {
                return queueControl.getDeliveringCount();
            }, 30000L, 100L);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test
    public void testReceiveTwiceTheSameCoreMessage() throws Exception {
        sendMultipleTextMessagesUsingCoreJms(this.queueName, "HelloAgain", 1);
        Connection createConnection = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.MaxInactivityDuration=5000)").createConnection();
        try {
            createConnection.setClientID("clientId");
            createConnection.start();
            Session createSession = createConnection.createSession(false, 2);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            Message receive = createConsumer.receive(4000L);
            assertNotNull(receive);
            String jMSMessageID = receive.getJMSMessageID();
            createConsumer.close();
            Message receive2 = createSession.createConsumer(createQueue).receive(4000L);
            assertNotNull(receive2);
            assertEquals(jMSMessageID, receive2.getJMSMessageID());
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    private void sendMultipleTextMessagesUsingCoreJms(String str, String str2, int i) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            connection.setClientID("PROD");
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
            for (int i2 = 0; i2 < i; i2++) {
                createProducer.send(createSession.createTextMessage(str2 + i2));
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendMessageUsingCoreJms(String str) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            Session createSession = connection.createSession(false, 1);
            Message createMessage = createSession.createMessage();
            createMessage.setBooleanProperty("booleanProperty", false);
            createMessage.setLongProperty("longProperty", 99999L);
            createMessage.setByteProperty("byteProperty", (byte) 5);
            createMessage.setIntProperty("intProperty", 979);
            createMessage.setShortProperty("shortProperty", (short) 1099);
            createMessage.setStringProperty("stringProperty", "HelloMessage");
            createSession.createProducer(createSession.createQueue(str)).send(createMessage);
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendBytesMessageUsingCoreJms(String str, byte[] bArr) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            Session createSession = connection.createSession(false, 1);
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(bArr);
            createBytesMessage.writeBoolean(true);
            createBytesMessage.writeLong(99999L);
            createBytesMessage.writeChar('h');
            createBytesMessage.writeInt(987);
            createBytesMessage.writeShort((short) 1099);
            createBytesMessage.writeUTF("hellobytes");
            createSession.createProducer(createSession.createQueue(str)).send(createBytesMessage);
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendObjectMessageUsingCoreJms(String str, Serializable serializable) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            Session createSession = connection.createSession(false, 1);
            createSession.createProducer(createSession.createQueue(str)).send(createSession.createObjectMessage(serializable));
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendStreamMessageUsingCoreJms(String str) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            Session createSession = connection.createSession(false, 1);
            StreamMessage createStreamMessage = createSession.createStreamMessage();
            createStreamMessage.writeBoolean(true);
            createStreamMessage.writeByte((byte) 2);
            createStreamMessage.writeBytes(new byte[]{6, 7});
            createStreamMessage.writeChar('b');
            createStreamMessage.writeDouble(6.5d);
            createStreamMessage.writeFloat(93.9f);
            createStreamMessage.writeInt(7657);
            createStreamMessage.writeLong(239999L);
            createStreamMessage.writeShort((short) -31314);
            createStreamMessage.writeString("hello streammessage");
            createSession.createProducer(createSession.createQueue(str)).send(createStreamMessage);
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendMapMessageUsingCoreJms(String str) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            Session createSession = connection.createSession(false, 1);
            MapMessage createMapMessage = createSession.createMapMessage();
            createMapMessage.setBoolean("aboolean", true);
            createMapMessage.setByte("abyte", (byte) 4);
            createMapMessage.setBytes("abytes", new byte[]{4, 5});
            createMapMessage.setChar("achar", 'a');
            createMapMessage.setDouble("adouble", 4.4d);
            createMapMessage.setFloat("afloat", 4.5f);
            createMapMessage.setInt("aint", 40);
            createMapMessage.setLong("along", 80L);
            createMapMessage.setShort("ashort", (short) 65);
            createMapMessage.setString("astring", "hello");
            createSession.createProducer(createSession.createQueue(str)).send(createMapMessage);
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendTextMessageUsingCoreJms(String str, String str2) throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            Session createSession = connection.createSession(false, 1);
            createSession.createProducer(createSession.createQueue(str)).send(createSession.createTextMessage(str2));
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testSendingToCoreJms() throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            sendTextMessageUsingOpenWire("HelloWorld");
            TextMessage receive = createConsumer.receive(5000L);
            assertEquals("HelloWorld", receive.getText());
            assertEquals(receive.getJMSDestination(), createQueue);
            sendMapMessageUsingOpenWire();
            MapMessage receive2 = createConsumer.receive(5000L);
            assertEquals(receive2.getJMSDestination(), createQueue);
            assertTrue(receive2.getBoolean("aboolean"));
            assertEquals(4L, receive2.getByte("abyte"));
            byte[] bytes = receive2.getBytes("abytes");
            assertEquals(2L, bytes.length);
            assertEquals(4L, bytes[0]);
            assertEquals(5L, bytes[1]);
            assertEquals(97L, receive2.getChar("achar"));
            assertTrue(Double.valueOf(receive2.getDouble("adouble")).equals(Double.valueOf(4.4d)));
            assertTrue(Float.valueOf(receive2.getFloat("afloat")).equals(Float.valueOf(4.5f)));
            assertEquals(40L, receive2.getInt("aint"));
            assertEquals(80L, receive2.getLong("along"));
            assertEquals(65L, receive2.getShort("ashort"));
            assertEquals("hello", receive2.getString("astring"));
            SimpleSerializable simpleSerializable = new SimpleSerializable();
            sendObjectMessageUsingOpenWire(simpleSerializable);
            ObjectMessage receive3 = createConsumer.receive(5000L);
            assertEquals(receive3.getJMSDestination(), createQueue);
            SimpleSerializable simpleSerializable2 = (SimpleSerializable) receive3.getObject();
            assertEquals(simpleSerializable.objName, simpleSerializable2.objName);
            assertEquals(simpleSerializable.intVal, simpleSerializable2.intVal);
            assertEquals(simpleSerializable.longVal, simpleSerializable2.longVal);
            sendStreamMessageUsingOpenWire(this.queueName);
            StreamMessage receive4 = createConsumer.receive(5000L);
            assertEquals(receive4.getJMSDestination(), createQueue);
            assertTrue(receive4.readBoolean());
            assertEquals(2L, receive4.readByte());
            receive4.readBytes(new byte[2]);
            assertEquals(6L, r0[0]);
            assertEquals(7L, r0[1]);
            assertEquals(98L, receive4.readChar());
            assertTrue(Double.valueOf(receive4.readDouble()).equals(Double.valueOf(6.5d)));
            assertTrue(Float.valueOf(receive4.readFloat()).equals(Float.valueOf(93.9f)));
            assertEquals(7657L, receive4.readInt());
            assertEquals(239999L, receive4.readLong());
            assertEquals(-31314L, receive4.readShort());
            assertEquals("hello streammessage", receive4.readString());
            byte[] bytes2 = "HelloWorld".getBytes(StandardCharsets.UTF_8);
            sendBytesMessageUsingOpenWire(bytes2);
            BytesMessage receive5 = createConsumer.receive(5000L);
            assertEquals(receive5.getJMSDestination(), createQueue);
            receive5.readBytes(new byte[bytes2.length]);
            for (int i = 0; i < bytes2.length; i++) {
                assertEquals(bytes2[i], r0[i]);
            }
            sendMessageUsingOpenWire(this.queueName);
            Message receive6 = createConsumer.receive(5000L);
            assertEquals(receive6.getJMSDestination(), createQueue);
            assertEquals("HelloMessage", receive6.getStringProperty("stringProperty"));
            assertFalse(receive6.getBooleanProperty("booleanProperty"));
            assertEquals(99999L, receive6.getLongProperty("longProperty"));
            assertEquals(979L, receive6.getIntProperty("intProperty"));
            assertEquals(1099L, receive6.getShortProperty("shortProperty"));
            assertEquals("HelloMessage", receive6.getStringProperty("stringProperty"));
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testMultipleSendingToCoreJms() throws Exception {
        Connection connection = null;
        try {
            connection = this.coreCf.createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(this.queueName);
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            sendMultipleTextMessagesUsingOpenWire("HelloWorld", 100);
            for (int i = 0; i < 100; i++) {
                TextMessage receive = createConsumer.receive(5000L);
                assertEquals(receive.getJMSDestination(), createQueue);
                assertEquals("HelloWorld" + i, receive.getText());
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void sendMultipleTextMessagesUsingOpenWire(String str, int i) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQMessageProducer createProducer = createSession.createProducer(createDestination(createSession, (byte) 1));
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createSession.createTextMessage(str + i2));
        }
    }

    private void sendMessageUsingOpenWire(String str) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQMessageProducer createProducer = createSession.createProducer(createDestination(createSession, (byte) 1));
        Message createMessage = createSession.createMessage();
        createMessage.setBooleanProperty("booleanProperty", false);
        createMessage.setLongProperty("longProperty", 99999L);
        createMessage.setByteProperty("byteProperty", (byte) 5);
        createMessage.setIntProperty("intProperty", 979);
        createMessage.setShortProperty("shortProperty", (short) 1099);
        createMessage.setStringProperty("stringProperty", "HelloMessage");
        createProducer.send(createMessage);
    }

    private void sendBytesMessageUsingOpenWire(byte[] bArr) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQMessageProducer createProducer = createSession.createProducer(createDestination(createSession, (byte) 1));
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(bArr);
        createBytesMessage.writeBoolean(true);
        createBytesMessage.writeLong(99999L);
        createBytesMessage.writeChar('h');
        createBytesMessage.writeInt(987);
        createBytesMessage.writeShort((short) 1099);
        createBytesMessage.writeUTF("hellobytes");
        createProducer.send(createBytesMessage);
    }

    private void sendStreamMessageUsingOpenWire(String str) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQMessageProducer createProducer = createSession.createProducer(createDestination(createSession, (byte) 1));
        StreamMessage createStreamMessage = createSession.createStreamMessage();
        createStreamMessage.writeBoolean(true);
        createStreamMessage.writeByte((byte) 2);
        createStreamMessage.writeBytes(new byte[]{6, 7});
        createStreamMessage.writeChar('b');
        createStreamMessage.writeDouble(6.5d);
        createStreamMessage.writeFloat(93.9f);
        createStreamMessage.writeInt(7657);
        createStreamMessage.writeLong(239999L);
        createStreamMessage.writeShort((short) -31314);
        createStreamMessage.writeString("hello streammessage");
        createProducer.send(createStreamMessage);
    }

    private void sendObjectMessageUsingOpenWire(SimpleSerializable simpleSerializable) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        createSession.createProducer(createDestination(createSession, (byte) 1)).send(createSession.createObjectMessage(simpleSerializable));
    }

    private void sendMapMessageUsingOpenWire() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQMessageProducer createProducer = createSession.createProducer(createDestination(createSession, (byte) 1));
        MapMessage createMapMessage = createSession.createMapMessage();
        createMapMessage.setBoolean("aboolean", true);
        createMapMessage.setByte("abyte", (byte) 4);
        createMapMessage.setBytes("abytes", new byte[]{4, 5});
        createMapMessage.setChar("achar", 'a');
        createMapMessage.setDouble("adouble", 4.4d);
        createMapMessage.setFloat("afloat", 4.5f);
        createMapMessage.setInt("aint", 40);
        createMapMessage.setLong("along", 80L);
        createMapMessage.setShort("ashort", (short) 65);
        createMapMessage.setString("astring", "hello");
        createProducer.send(createMapMessage);
    }

    private void sendTextMessageUsingOpenWire(String str) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        createSession.createProducer(createDestination(createSession, (byte) 1)).send(createSession.createTextMessage(str));
    }
}
