package org.apache.activemq.network;

import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource;

/* loaded from: input_file:org/apache/activemq/network/CompressionOverNetworkTest.class */
public class CompressionOverNetworkTest {
    protected static final int RECEIVE_TIMEOUT_MILLS = 10000;
    protected static final int MESSAGE_COUNT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
    protected AbstractApplicationContext context;
    protected Connection localConnection;
    protected Connection remoteConnection;
    protected BrokerService localBroker;
    protected BrokerService remoteBroker;
    protected Session localSession;
    protected Session remoteSession;
    protected ActiveMQDestination included;

    @Test
    public void testCompressedOverCompressedNetwork() throws Exception {
        this.localConnection.setUseCompression(true);
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder sb = new StringBuilder("test-");
        for (int i = 0; i < 100; i++) {
            sb.append(UUID.randomUUID().toString());
        }
        createProducer.send(this.localSession.createTextMessage(sb.toString()));
        ActiveMQTextMessage receive = createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertNotNull(receive);
        ActiveMQTextMessage activeMQTextMessage = receive;
        Assert.assertTrue(activeMQTextMessage.isCompressed());
        Assert.assertEquals(sb.toString(), activeMQTextMessage.getText());
    }

    @Test
    public void testTextMessageCompression() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder sb = new StringBuilder("test-");
        for (int i = 0; i < 100; i++) {
            sb.append(UUID.randomUUID().toString());
        }
        createProducer.send(this.localSession.createTextMessage(sb.toString()));
        ActiveMQTextMessage receive = createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertNotNull(receive);
        ActiveMQTextMessage activeMQTextMessage = receive;
        Assert.assertTrue(activeMQTextMessage.isCompressed());
        Assert.assertEquals(sb.toString(), activeMQTextMessage.getText());
    }

    @Test
    public void testBytesMessageCompression() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder sb = new StringBuilder("test-");
        for (int i = 0; i < 100; i++) {
            sb.append(UUID.randomUUID().toString());
        }
        byte[] bytes = sb.toString().getBytes("UTF-8");
        BytesMessage createBytesMessage = this.localSession.createBytesMessage();
        createBytesMessage.writeBytes(bytes);
        createProducer.send(createBytesMessage);
        ActiveMQBytesMessage receive = createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertNotNull(receive);
        ActiveMQBytesMessage activeMQBytesMessage = receive;
        Assert.assertTrue(activeMQBytesMessage.isCompressed());
        Assert.assertTrue(activeMQBytesMessage.getContent().getLength() < bytes.length);
        byte[] bArr = new byte[bytes.length];
        Assert.assertEquals(bytes.length, activeMQBytesMessage.readBytes(bArr));
        Assert.assertEquals(-1L, activeMQBytesMessage.readBytes(bArr));
        for (int i2 = 0; i2 < bytes.length; i2++) {
            Assert.assertEquals(bytes[i2], bArr[i2]);
        }
    }

    @Test
    public void testStreamMessageCompression() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        StreamMessage createStreamMessage = this.localSession.createStreamMessage();
        for (int i = 0; i < 100; i++) {
            createStreamMessage.writeString("test string: " + i);
        }
        createProducer.send(createStreamMessage);
        ActiveMQStreamMessage receive = createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertNotNull(receive);
        ActiveMQStreamMessage activeMQStreamMessage = receive;
        Assert.assertTrue(activeMQStreamMessage.isCompressed());
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertEquals("test string: " + i2, activeMQStreamMessage.readString());
        }
    }

    @Test
    public void testMapMessageCompression() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        MapMessage createMapMessage = this.localSession.createMapMessage();
        for (int i = 0; i < 100; i++) {
            createMapMessage.setString(Integer.toString(i), "test string: " + i);
        }
        createProducer.send(createMapMessage);
        ActiveMQMapMessage receive = createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertNotNull(receive);
        ActiveMQMapMessage activeMQMapMessage = receive;
        Assert.assertTrue(activeMQMapMessage.isCompressed());
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertEquals("test string: " + i2, activeMQMapMessage.getString(Integer.toString(i2)));
        }
    }

    @Test
    public void testObjectMessageCompression() throws Exception {
        MessageConsumer createConsumer = this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        createProducer.setDeliveryMode(1);
        waitForConsumerRegistration(this.localBroker, 1, this.included);
        StringBuilder sb = new StringBuilder("test-");
        for (int i = 0; i < 100; i++) {
            sb.append(UUID.randomUUID().toString());
        }
        createProducer.send(this.localSession.createObjectMessage(sb.toString()));
        ActiveMQObjectMessage receive = createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertNotNull(receive);
        ActiveMQObjectMessage activeMQObjectMessage = receive;
        Assert.assertTrue(activeMQObjectMessage.isCompressed());
        Assert.assertEquals(sb.toString(), activeMQObjectMessage.getObject());
    }

    private void waitForConsumerRegistration(final BrokerService brokerService, final int i, final ActiveMQDestination activeMQDestination) throws Exception {
        Assert.assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.CompressionOverNetworkTest.1
            public boolean isSatisified() throws Exception {
                Object[] array = ((NetworkConnector) brokerService.getNetworkConnectors().get(0)).bridges.values().toArray();
                if (array.length <= 0) {
                    return false;
                }
                CompressionOverNetworkTest.LOG.info(brokerService + " bridges " + Arrays.toString(array));
                DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) array[0];
                ConcurrentHashMap localSubscriptionMap = demandForwardingBridgeSupport.getLocalSubscriptionMap();
                CompressionOverNetworkTest.LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + localSubscriptionMap);
                if (localSubscriptionMap.isEmpty()) {
                    return false;
                }
                for (DemandSubscription demandSubscription : localSubscriptionMap.values()) {
                    if (demandSubscription.getLocalInfo().getDestination().equals(activeMQDestination)) {
                        CompressionOverNetworkTest.LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
                        return demandSubscription.size() >= i;
                    }
                }
                return false;
            }
        }));
    }

    @Before
    public void setUp() throws Exception {
        doSetUp(true);
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    protected void doTearDown() throws Exception {
        this.localConnection.close();
        this.remoteConnection.close();
        this.localBroker.stop();
        this.remoteBroker.stop();
    }

    protected void doSetUp(boolean z) throws Exception {
        this.localBroker = createLocalBroker();
        this.localBroker.setDeleteAllMessagesOnStartup(z);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        this.remoteBroker = createRemoteBroker();
        this.remoteBroker.setDeleteAllMessagesOnStartup(z);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        this.localConnection = activeMQConnectionFactory.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        this.remoteConnection = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI()).createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.included = new ActiveMQTopic("include.test.bar");
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected String getRemoteBrokerURI() {
        return "org/apache/activemq/network/remoteBroker.xml";
    }

    protected String getLocalBrokerURI() {
        return "org/apache/activemq/network/localBroker.xml";
    }

    protected BrokerService createBroker(String str) throws Exception {
        new BrokerFactoryBean(new ClassPathResource(str));
        BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean(new ClassPathResource(str));
        brokerFactoryBean.afterPropertiesSet();
        BrokerService broker = brokerFactoryBean.getBroker();
        Iterator it = broker.getNetworkConnectors().iterator();
        while (it.hasNext()) {
            ((NetworkConnector) it.next()).setUseCompression(true);
        }
        return broker;
    }

    protected BrokerService createLocalBroker() throws Exception {
        return createBroker(getLocalBrokerURI());
    }

    protected BrokerService createRemoteBroker() throws Exception {
        return createBroker(getRemoteBrokerURI());
    }
}
