package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.class */
public class BridgeTest extends ActiveMQTestBase {
    private ActiveMQServer server0;
    private ActiveMQServer server1;
    private ServerLocator locator;
    private final boolean netty;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTest$1MyInterceptor, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest$1MyInterceptor.class */
    public class C1MyInterceptor implements Interceptor {
        public boolean ignoreSends = true;
        public CountDownLatch latch;

        C1MyInterceptor(int i) {
            this.latch = new CountDownLatch(i);
        }

        public boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
            if ((!this.ignoreSends || !(packet instanceof SessionSendMessage)) && ((!this.ignoreSends || !(packet instanceof SessionSendLargeMessage)) && (!this.ignoreSends || !(packet instanceof SessionSendContinuationMessage) || ((SessionSendContinuationMessage) packet).isContinues()))) {
                IntegrationTestLogger.LOGGER.info(packet);
                return true;
            }
            IntegrationTestLogger.LOGGER.info("IGNORED: " + packet);
            this.latch.countDown();
            return false;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest$StopInterceptor.class */
    public static class StopInterceptor implements Interceptor {
        static ActiveMQServer serverToStop;
        static Thread thread;
        static final ReusableLatch latch = new ReusableLatch(0);
        static int count = 0;

        public static void reset() {
            latch.setCount(1);
            serverToStop = null;
            count = 0;
            thread = null;
        }

        public synchronized boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
            if (!(packet instanceof SessionSendMessage)) {
                return true;
            }
            int i = count + 1;
            count = i;
            if (i != 100) {
                return true;
            }
            try {
                System.out.println("Stopping server after " + count + " messages");
                thread = new Thread("***Server Restarter***") { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTest.StopInterceptor.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            System.out.println("Stopping server");
                            StopInterceptor.latch.countDown();
                            StopInterceptor.serverToStop.stop(false);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                thread.start();
                latch.await();
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return true;
            }
        }
    }

    @Parameterized.Parameters(name = "isNetty={0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public BridgeTest(boolean z) {
        this.netty = z;
    }

    protected boolean isNetty() {
        return this.netty;
    }

    @Before
    public void setUp() throws Exception {
        StopInterceptor.reset();
        super.setUp();
    }

    private String getConnector() {
        return isNetty() ? NETTY_CONNECTOR_FACTORY : INVM_CONNECTOR_FACTORY;
    }

    @Test
    public void testSimpleBridge() throws Exception {
        internaltestSimpleBridge(false, false);
    }

    @Test
    public void testSimpleBridgeFiles() throws Exception {
        internaltestSimpleBridge(false, true);
    }

    @Test
    public void testSimpleBridgeLargeMessageNullPersistence() throws Exception {
        internaltestSimpleBridge(true, false);
    }

    @Test
    public void testSimpleBridgeLargeMessageFiles() throws Exception {
        internaltestSimpleBridge(true, true);
    }

    @Test
    public void testLargeMessageBridge() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, true, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, true, hashMap2);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(26214400).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        this.server1.getConfiguration().setQueueConfigurations(arrayList4);
        this.server1.start();
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory addSessionFactory = addSessionFactory(this.locator.createSessionFactory(transportConfiguration));
        ClientSessionFactory addSessionFactory2 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration2));
        ClientSession createSession = addSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = addSessionFactory2.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        createSession2.start();
        byte[] bArr = new byte[5242880];
        SimpleString simpleString = new SimpleString("testkey");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty(simpleString, i);
            createMessage.getBodyBuffer().writeBytes(bArr);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(500000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
            readLargeMessages(receive, 10);
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
        createSession2.close();
        addSessionFactory.close();
        addSessionFactory2.close();
        closeFields();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            assertEquals(0L, loadQueues(this.server0).size());
        }
        System.out.println((System.currentTimeMillis() - currentTimeMillis) + "ms");
    }

    public void internaltestSimpleBridge(boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, z2, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, z2, hashMap2);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        this.server1.getConfiguration().setQueueConfigurations(arrayList4);
        this.server1.start();
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory addSessionFactory = addSessionFactory(this.locator.createSessionFactory(transportConfiguration));
        ClientSessionFactory addSessionFactory2 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration2));
        ClientSession createSession = addSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = addSessionFactory2.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        createSession2.start();
        byte[] bArr = new byte[1024];
        SimpleString simpleString = new SimpleString("testkey");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            if (z) {
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            createMessage.putIntProperty(simpleString, i);
            createMessage.getBodyBuffer().writeBytes(bArr);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
            if (z) {
                readLargeMessages(receive, 10);
            }
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
        createSession2.close();
        addSessionFactory.close();
        addSessionFactory2.close();
        closeFields();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            assertEquals(0L, loadQueues(this.server0).size());
        }
    }

    @Test
    public void testLostMessageSimpleMessage() throws Exception {
        internalTestMessageLoss(false);
    }

    @Test
    public void testLostMessageLargeMessage() throws Exception {
        internalTestMessageLoss(true);
    }

    public void internalTestMessageLoss(boolean z) throws Exception {
        C1MyInterceptor c1MyInterceptor = new C1MyInterceptor(3);
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, true, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, true, hashMap2);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration callTimeout = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(100L).setReconnectAttempts(-1).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(512).setStaticConnectors(arrayList).setCallTimeout(500L);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(callTimeout);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        this.server1.getConfiguration().setQueueConfigurations(arrayList4);
        this.server1.start();
        this.server1.getRemotingService().addIncomingInterceptor(c1MyInterceptor);
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory addSessionFactory = addSessionFactory(this.locator.createSessionFactory(transportConfiguration));
        ClientSessionFactory addSessionFactory2 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration2));
        ClientSession createSession = addSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = addSessionFactory2.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        createSession2.start();
        byte[] bArr = new byte[1024];
        SimpleString simpleString = new SimpleString("testkey");
        for (int i = 0; i < 1; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            if (z) {
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            createMessage.putIntProperty(simpleString, i);
            createMessage.getBodyBuffer().writeBytes(bArr);
            createProducer.send(createMessage);
        }
        assertTrue("where is the countDown?", c1MyInterceptor.latch.await(30L, TimeUnit.SECONDS));
        c1MyInterceptor.ignoreSends = false;
        this.server1.getRemotingService().removeIncomingInterceptor(c1MyInterceptor);
        IntegrationTestLogger.LOGGER.info("No longer ignoring packets.");
        for (int i2 = 0; i2 < 1; i2++) {
            ClientMessage receive = createConsumer.receive(30000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
            if (z) {
                readLargeMessages(receive, 10);
            }
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
        createSession2.close();
        addSessionFactory.close();
        addSessionFactory2.close();
        closeFields();
        assertEquals("there should be no queues", 0L, loadQueues(this.server0).size());
    }

    private void addTargetParameters(Map<String, Object> map) {
        if (isNetty()) {
            map.put("port", 61617);
        } else {
            map.put("serverId", 1);
        }
    }

    private void readLargeMessages(ClientMessage clientMessage, int i) {
        byte[] bArr = new byte[1024];
        for (int i2 = 0; i2 < i; i2++) {
            clientMessage.getBodyBuffer().readBytes(bArr);
        }
    }

    @Test
    public void testWithFilter() throws Exception {
        internalTestWithFilter(false, false);
    }

    @Test
    public void testWithFilterFiles() throws Exception {
        internalTestWithFilter(false, true);
    }

    @Test
    public void testWithFilterLargeMessages() throws Exception {
        internalTestWithFilter(true, false);
    }

    @Test
    public void testWithFilterLargeMessagesFiles() throws Exception {
        internalTestWithFilter(true, true);
    }

    public void internalTestWithFilter(boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, z2, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, z2, hashMap2);
        HashMap hashMap3 = new HashMap();
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setFilterString("animal='goat'").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        this.server1.getConfiguration().setQueueConfigurations(arrayList4);
        this.server1.start();
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory createSessionFactory = this.locator.createSessionFactory(transportConfiguration);
        ClientSessionFactory createSessionFactory2 = this.locator.createSessionFactory(transportConfiguration2);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        createSession2.start();
        SimpleString simpleString = new SimpleString("testkey");
        SimpleString simpleString2 = new SimpleString("animal");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty(simpleString, i);
            createMessage.putStringProperty(simpleString2, new SimpleString("monkey"));
            if (z) {
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            createProducer.send(createMessage);
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage createMessage2 = createSession.createMessage(true);
            createMessage2.putIntProperty(simpleString, i2);
            createMessage2.putStringProperty(simpleString2, new SimpleString("goat"));
            if (z) {
                createMessage2.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            }
            createProducer.send(createMessage2);
        }
        for (int i3 = 0; i3 < 10; i3++) {
            ClientMessage receive = createConsumer.receive(4000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals("goat", receive.getStringProperty(simpleString2));
            Assert.assertEquals(Integer.valueOf(i3), receive.getObjectProperty(simpleString));
            receive.acknowledge();
            if (z) {
                readLargeMessages(receive, 10);
            }
        }
        createSession.commit();
        createSession2.commit();
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
        createSession2.close();
        createSessionFactory.close();
        createSessionFactory2.close();
        closeFields();
        if (z2) {
            Map<Long, AtomicInteger> loadQueues = loadQueues(this.server0);
            assertEquals(1L, loadQueues.size());
            assertNotNull(loadQueues.get(loadQueues.keySet().iterator().next()));
            assertEquals(10L, loadQueues.get(r0).intValue());
        }
    }

    @Test
    public void testStartLater() throws Exception {
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, true, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, true, hashMap2);
        HashMap hashMap3 = new HashMap();
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("jms.queue.forwardAddress").setRetryInterval(100L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory createSessionFactory = this.locator.createSessionFactory(transportConfiguration);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        SimpleString simpleString = new SimpleString("testkey");
        SimpleString simpleString2 = new SimpleString("animal");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.getBodyBuffer().writeBytes(new byte[1024]);
            createMessage.putIntProperty(simpleString, i);
            createMessage.putStringProperty(simpleString2, new SimpleString("monkey" + i));
            createProducer.send(createMessage);
        }
        this.server1.start();
        Thread.sleep(1000L);
        ClientSessionFactory createSessionFactory2 = this.locator.createSessionFactory(transportConfiguration2);
        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
        try {
            createSession2.createQueue("jms.queue.forwardAddress", "forwardAddress");
        } catch (Throwable th) {
            th.printStackTrace();
        }
        ClientConsumer createConsumer = createSession2.createConsumer("forwardAddress");
        createSession2.start();
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            receive.acknowledge();
        }
        createSession2.commit();
        Assert.assertNull(createConsumer.receiveImmediate());
        createConsumer.close();
        createSession2.deleteQueue("forwardAddress");
        createSession2.close();
        createSessionFactory2.close();
        this.server1.stop();
        createSession.close();
        createSessionFactory.close();
        closeFields();
        assertEquals(0L, loadQueues(this.server0).size());
    }

    @Test
    public void testWithDuplicates() throws Exception {
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, true, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, true, hashMap2);
        HashMap hashMap3 = new HashMap();
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("jms.queue.forwardAddress").setRetryInterval(100L).setReconnectAttemptsOnSameNode(-1).setConfirmationWindowSize(0).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory createSessionFactory = this.locator.createSessionFactory(transportConfiguration);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        SimpleString simpleString = new SimpleString("testkey");
        SimpleString simpleString2 = new SimpleString("animal");
        for (int i = 0; i < 1000; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.getBodyBuffer().writeBytes(new byte[1024]);
            createMessage.putIntProperty(simpleString, i);
            createMessage.putStringProperty(simpleString2, new SimpleString("monkey" + i));
            createProducer.send(createMessage);
        }
        this.server1.start();
        long[] jArr = new long[100];
        LinkedListIterator it = this.server0.locateQueue(new SimpleString("queue0")).iterator();
        for (int i2 = 0; i2 < 100; i2++) {
            it.hasNext();
            jArr[i2] = ((MessageReference) it.next()).getMessage().getMessageID();
        }
        it.close();
        DuplicateIDCache duplicateIDCache = this.server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat("jms.queue.forwardAddress"));
        TransactionImpl transactionImpl = new TransactionImpl(this.server1.getStorageManager());
        for (long j : jArr) {
            duplicateIDCache.addToCache(BridgeImpl.getDuplicateBytes(this.server0.getNodeManager().getUUID(), j), transactionImpl);
        }
        transactionImpl.commit();
        Thread.sleep(1000L);
        ClientSessionFactory createSessionFactory2 = this.locator.createSessionFactory(transportConfiguration2);
        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
        try {
            createSession2.createQueue("jms.queue.forwardAddress", "forwardAddress");
        } catch (Throwable th) {
            th.printStackTrace();
        }
        ClientConsumer createConsumer = createSession2.createConsumer("forwardAddress");
        createSession2.start();
        for (int i3 = 100; i3 < 1000; i3++) {
            ClientMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            assertEquals(i3, receive.getIntProperty(simpleString).intValue());
            receive.acknowledge();
        }
        createSession2.commit();
        Assert.assertNull(createConsumer.receiveImmediate());
        createConsumer.close();
        createSession2.deleteQueue("forwardAddress");
        createSession2.close();
        createSessionFactory2.close();
        this.server1.stop();
        createSession.close();
        createSessionFactory.close();
        closeFields();
        assertEquals(0L, loadQueues(this.server0).size());
    }

    private void closeFields() throws Exception {
        this.locator.close();
        this.server0.stop();
        this.server1.stop();
    }

    @Test
    public void testWithTransformer() throws Exception {
        internaltestWithTransformer(false);
    }

    @Test
    public void testWithTransformerFiles() throws Exception {
        internaltestWithTransformer(true);
    }

    public void internaltestWithTransformer(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, false, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, false, hashMap2);
        HashMap hashMap3 = new HashMap();
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setTransformerClassName(SimpleTransformer.class.getName()).setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        this.server1.getConfiguration().setQueueConfigurations(arrayList4);
        this.server1.start();
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory createSessionFactory = this.locator.createSessionFactory(transportConfiguration);
        ClientSessionFactory createSessionFactory2 = this.locator.createSessionFactory(transportConfiguration2);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        createSession2.start();
        SimpleString simpleString = new SimpleString("wibble");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putStringProperty(simpleString, new SimpleString("bing"));
            createMessage.getBodyBuffer().writeString("doo be doo be doo be doo");
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(200L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(new SimpleString("bong"), (SimpleString) receive.getObjectProperty(simpleString));
            Assert.assertEquals("dee be dee be dee be dee", receive.getBodyBuffer().readString());
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
        createSession2.close();
        createSessionFactory.close();
        createSessionFactory2.close();
        if (this.server0.getConfiguration().isPersistenceEnabled()) {
            assertEquals(0L, loadQueues(this.server0).size());
        }
    }

    @Test
    public void testSawtoothLoad() throws Exception {
        HashMap hashMap = new HashMap();
        ActiveMQServer createClusteredServerWithParams = createClusteredServerWithParams(isNetty(), 0, true, hashMap);
        createClusteredServerWithParams.getConfiguration().setThreadPoolMaxSize(10);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        ActiveMQServer createClusteredServerWithParams2 = createClusteredServerWithParams(isNetty(), 1, true, hashMap2);
        createClusteredServerWithParams2.getConfiguration().setThreadPoolMaxSize(10);
        HashMap hashMap3 = new HashMap();
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        final TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        createClusteredServerWithParams.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        createClusteredServerWithParams.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        createClusteredServerWithParams.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        createClusteredServerWithParams2.getConfiguration().setQueueConfigurations(arrayList4);
        try {
            createClusteredServerWithParams2.start();
            createClusteredServerWithParams.start();
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final Semaphore semaphore = new Semaphore(10000);
            for (int i = 0; i < 3; i++) {
                ArrayList arrayList5 = new ArrayList();
                arrayList5.add(new Thread() { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTest.1ConsumerThread
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            ServerLocator addServerLocator = BridgeTest.this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration2}));
                            ClientSessionFactory createSessionFactory = BridgeTest.this.createSessionFactory(addServerLocator);
                            ClientSession createSession = createSessionFactory.createSession(false, false);
                            createSession.start();
                            ClientConsumer createConsumer = createSession.createConsumer("queue1");
                            for (int i2 = 0; i2 < 300; i2++) {
                                ClientMessage receive = createConsumer.receive(5000L);
                                Assert.assertNotNull(receive);
                                receive.acknowledge();
                                semaphore.release();
                                if (i2 % 1000 == 0) {
                                    createSession.commit();
                                }
                            }
                            createSession.commit();
                            createSession.close();
                            createSessionFactory.close();
                            addServerLocator.close();
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicInteger.incrementAndGet();
                        }
                    }
                });
                arrayList5.add(new Thread(150, transportConfiguration, atomicInteger, semaphore) { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTest.1ProducerThread
                    final int nmsg;
                    final /* synthetic */ TransportConfiguration val$server0tc;
                    final /* synthetic */ AtomicInteger val$errors;
                    final /* synthetic */ Semaphore val$semop;

                    {
                        this.val$server0tc = transportConfiguration;
                        this.val$errors = atomicInteger;
                        this.val$semop = semaphore;
                        this.nmsg = r5;
                    }

                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        ServerLocator addServerLocator = BridgeTest.this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{this.val$server0tc}));
                        addServerLocator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false);
                        ClientSessionFactory clientSessionFactory = null;
                        ClientSession clientSession = null;
                        try {
                            try {
                                clientSessionFactory = BridgeTest.this.createSessionFactory(addServerLocator);
                                clientSession = clientSessionFactory.createSession(false, true, true);
                                ClientProducer createProducer = clientSession.createProducer(new SimpleString("testAddress"));
                                for (int i2 = 0; i2 < this.nmsg; i2++) {
                                    Assert.assertEquals(0L, this.val$errors.get());
                                    ClientMessage createMessage = clientSession.createMessage(true);
                                    createMessage.putIntProperty("seq", i2);
                                    if (i2 % 100 == 0) {
                                        createMessage.setPriority((byte) (RandomUtil.randomPositiveInt() % 9));
                                    } else {
                                        createMessage.setPriority((byte) 5);
                                    }
                                    createMessage.getBodyBuffer().writeBytes(new byte[50]);
                                    createProducer.send(createMessage);
                                    Assert.assertTrue(this.val$semop.tryAcquire(1, 10L, TimeUnit.SECONDS));
                                }
                                try {
                                    clientSession.close();
                                    clientSessionFactory.close();
                                    addServerLocator.close();
                                } catch (Exception e) {
                                    this.val$errors.incrementAndGet();
                                }
                            } catch (Throwable th) {
                                th.printStackTrace(System.out);
                                this.val$errors.incrementAndGet();
                                try {
                                    clientSession.close();
                                    clientSessionFactory.close();
                                    addServerLocator.close();
                                } catch (Exception e2) {
                                    this.val$errors.incrementAndGet();
                                }
                            }
                        } catch (Throwable th2) {
                            try {
                                clientSession.close();
                                clientSessionFactory.close();
                                addServerLocator.close();
                            } catch (Exception e3) {
                                this.val$errors.incrementAndGet();
                            }
                            throw th2;
                        }
                    }
                });
                arrayList5.add(new Thread(150, transportConfiguration, atomicInteger, semaphore) { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTest.1ProducerThread
                    final int nmsg;
                    final /* synthetic */ TransportConfiguration val$server0tc;
                    final /* synthetic */ AtomicInteger val$errors;
                    final /* synthetic */ Semaphore val$semop;

                    {
                        this.val$server0tc = transportConfiguration;
                        this.val$errors = atomicInteger;
                        this.val$semop = semaphore;
                        this.nmsg = r5;
                    }

                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        ServerLocator addServerLocator = BridgeTest.this.addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{this.val$server0tc}));
                        addServerLocator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false);
                        ClientSessionFactory clientSessionFactory = null;
                        ClientSession clientSession = null;
                        try {
                            try {
                                clientSessionFactory = BridgeTest.this.createSessionFactory(addServerLocator);
                                clientSession = clientSessionFactory.createSession(false, true, true);
                                ClientProducer createProducer = clientSession.createProducer(new SimpleString("testAddress"));
                                for (int i2 = 0; i2 < this.nmsg; i2++) {
                                    Assert.assertEquals(0L, this.val$errors.get());
                                    ClientMessage createMessage = clientSession.createMessage(true);
                                    createMessage.putIntProperty("seq", i2);
                                    if (i2 % 100 == 0) {
                                        createMessage.setPriority((byte) (RandomUtil.randomPositiveInt() % 9));
                                    } else {
                                        createMessage.setPriority((byte) 5);
                                    }
                                    createMessage.getBodyBuffer().writeBytes(new byte[50]);
                                    createProducer.send(createMessage);
                                    Assert.assertTrue(this.val$semop.tryAcquire(1, 10L, TimeUnit.SECONDS));
                                }
                                try {
                                    clientSession.close();
                                    clientSessionFactory.close();
                                    addServerLocator.close();
                                } catch (Exception e) {
                                    this.val$errors.incrementAndGet();
                                }
                            } catch (Throwable th) {
                                th.printStackTrace(System.out);
                                this.val$errors.incrementAndGet();
                                try {
                                    clientSession.close();
                                    clientSessionFactory.close();
                                    addServerLocator.close();
                                } catch (Exception e2) {
                                    this.val$errors.incrementAndGet();
                                }
                            }
                        } catch (Throwable th2) {
                            try {
                                clientSession.close();
                                clientSessionFactory.close();
                                addServerLocator.close();
                            } catch (Exception e3) {
                                this.val$errors.incrementAndGet();
                            }
                            throw th2;
                        }
                    }
                });
                Iterator it = arrayList5.iterator();
                while (it.hasNext()) {
                    ((Thread) it.next()).start();
                }
                Iterator it2 = arrayList5.iterator();
                while (it2.hasNext()) {
                    ((Thread) it2.next()).join();
                }
                assertEquals(0L, atomicInteger.get());
            }
            assertEquals(0L, loadQueues(createClusteredServerWithParams).size());
        } finally {
            try {
                createClusteredServerWithParams.stop();
            } catch (Exception e) {
            }
            try {
                createClusteredServerWithParams2.stop();
            } catch (Exception e2) {
            }
        }
    }

    @Test
    public void testBridgeWithPaging() throws Exception {
        ClientMessage receive;
        ActiveMQServer activeMQServer = null;
        ActiveMQServer activeMQServer2 = null;
        try {
            HashMap hashMap = new HashMap();
            activeMQServer = createClusteredServerWithParams(isNetty(), 0, true, 1024, 10240, hashMap);
            HashMap hashMap2 = new HashMap();
            addTargetParameters(hashMap2);
            activeMQServer2 = createClusteredServerWithParams(isNetty(), 1, true, 1024, 10240, hashMap2);
            activeMQServer2.getConfiguration().setJournalBufferTimeout_AIO(10).setJournalBufferTimeout_NIO(10);
            HashMap hashMap3 = new HashMap();
            TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
            TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
            hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
            activeMQServer.getConfiguration().setConnectorConfigurations(hashMap3);
            activeMQServer.getConfiguration().setIDCacheSize(20000).setJournalBufferTimeout_NIO(10).setJournalBufferTimeout_AIO(10);
            ArrayList arrayList = new ArrayList();
            arrayList.add(transportConfiguration2.getName());
            BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1).setStaticConnectors(arrayList);
            staticConnectors.setCallTimeout(1000L);
            staticConnectors.setUseDuplicateDetection(true);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(staticConnectors);
            activeMQServer.getConfiguration().setBridgeConfigurations(arrayList2);
            CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(name);
            activeMQServer.getConfiguration().setQueueConfigurations(arrayList3);
            CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
            ArrayList arrayList4 = new ArrayList();
            arrayList4.add(name2);
            activeMQServer2.getConfiguration().setQueueConfigurations(arrayList4);
            ArrayList arrayList5 = new ArrayList();
            arrayList5.add(StopInterceptor.class.getName());
            activeMQServer2.getConfiguration().setIncomingInterceptorClassNames(arrayList5);
            StopInterceptor.serverToStop = activeMQServer;
            activeMQServer2.start();
            activeMQServer.start();
            this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
            ClientSessionFactory createSessionFactory = this.locator.createSessionFactory(transportConfiguration);
            ClientSessionFactory createSessionFactory2 = this.locator.createSessionFactory(transportConfiguration2);
            ClientSession createSession = createSessionFactory.createSession(false, false, true);
            ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
            ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
            ClientConsumer createConsumer = createSession2.createConsumer("queue1");
            createSession2.start();
            SimpleString simpleString = new SimpleString("testkey");
            for (int i = 0; i < 200; i++) {
                ClientMessage createMessage = createSession.createMessage(true);
                createMessage.getBodyBuffer().writeBytes(new byte[512]);
                createMessage.putIntProperty(simpleString, i);
                createProducer.send(createMessage);
            }
            createSession.commit();
            assertTrue(StopInterceptor.latch.await(1L, TimeUnit.HOURS));
            StopInterceptor.thread.join(15000L);
            if (StopInterceptor.thread.isAlive()) {
                System.out.println(threadDump("Still alive, stop didn't work!!!"));
                fail("Thread that should restart the server still alive");
            }
            activeMQServer.start();
            HashMap hashMap4 = new HashMap();
            for (int i2 = 0; i2 < 200 && (receive = createConsumer.receive(5000L)) != null; i2++) {
                Integer intProperty = receive.getIntProperty(simpleString);
                AtomicInteger atomicInteger = (AtomicInteger) hashMap4.get(intProperty);
                if (intProperty.intValue() != i2) {
                    System.err.println("Message " + atomicInteger + " received out of order, expected to be " + i2 + " it's acceptable but not the ideal!");
                }
                if (atomicInteger == null) {
                    atomicInteger = new AtomicInteger();
                    hashMap4.put(intProperty, atomicInteger);
                }
                atomicInteger.incrementAndGet();
                if (i2 % 500 == 0) {
                    System.out.println("received " + i2);
                }
            }
            boolean z = false;
            if (createConsumer.receiveImmediate() != null) {
                System.err.println("Unexpected message received");
                z = true;
            }
            for (int i3 = 0; i3 < 200; i3++) {
                AtomicInteger atomicInteger2 = (AtomicInteger) hashMap4.get(Integer.valueOf(i3));
                if (atomicInteger2 == null) {
                    System.err.println("Msg " + i3 + " wasn't received");
                    z = true;
                } else if (atomicInteger2.get() > 1) {
                    System.err.println("msg " + i3 + " was received " + atomicInteger2.get() + " times");
                    z = true;
                }
            }
            assertFalse("Test failed", z);
            createSession.close();
            createSession2.close();
            createSessionFactory.close();
            createSessionFactory2.close();
            if (this.locator != null) {
                this.locator.close();
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th) {
            }
            try {
                activeMQServer2.stop();
            } catch (Throwable th2) {
            }
            assertEquals(0L, loadQueues(activeMQServer).size());
        } catch (Throwable th3) {
            if (this.locator != null) {
                this.locator.close();
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th4) {
            }
            try {
                activeMQServer2.stop();
            } catch (Throwable th5) {
            }
            throw th3;
        }
    }

    @Test
    public void testBridgeWithLargeMessage() throws Exception {
        ActiveMQServer activeMQServer = null;
        ActiveMQServer activeMQServer2 = null;
        ServerLocator serverLocator = null;
        try {
            HashMap hashMap = new HashMap();
            activeMQServer = createClusteredServerWithParams(isNetty(), 0, true, 10240, 1048576, hashMap);
            HashMap hashMap2 = new HashMap();
            addTargetParameters(hashMap2);
            activeMQServer2 = createClusteredServerWithParams(isNetty(), 1, true, hashMap2);
            HashMap hashMap3 = new HashMap();
            TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
            TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
            hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
            activeMQServer.getConfiguration().setConnectorConfigurations(hashMap3);
            ArrayList arrayList = new ArrayList();
            arrayList.add(transportConfiguration2.getName());
            BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(arrayList);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(staticConnectors);
            activeMQServer.getConfiguration().setBridgeConfigurations(arrayList2);
            CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(name);
            activeMQServer.getConfiguration().setQueueConfigurations(arrayList3);
            CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("forwardAddress").setName("queue1");
            ArrayList arrayList4 = new ArrayList();
            arrayList4.add(name2);
            activeMQServer2.getConfiguration().setQueueConfigurations(arrayList4);
            activeMQServer2.start();
            activeMQServer.start();
            serverLocator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
            ClientSessionFactory createSessionFactory = serverLocator.createSessionFactory(transportConfiguration);
            ClientSessionFactory createSessionFactory2 = serverLocator.createSessionFactory(transportConfiguration2);
            ClientSession createSession = createSessionFactory.createSession(false, true, true);
            ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
            ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
            ClientConsumer createConsumer = createSession2.createConsumer("queue1");
            createSession2.start();
            SimpleString simpleString = new SimpleString("testkey");
            for (int i = 0; i < 50; i++) {
                ClientMessage createMessage = createSession.createMessage(true);
                createMessage.setBodyInputStream(createFakeLargeStream(1024L));
                createMessage.putIntProperty(simpleString, i);
                createProducer.send(createMessage);
            }
            createSession.commit();
            for (int i2 = 0; i2 < 50; i2++) {
                ClientMessage receive = createConsumer.receive(5000L);
                Assert.assertNotNull(receive);
                Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
                ActiveMQBuffer bodyBuffer = receive.getBodyBuffer();
                for (int i3 = 0; i3 < 1024; i3++) {
                    assertEquals(getSamplebyte(i3), bodyBuffer.readByte());
                }
                receive.acknowledge();
            }
            createSession2.commit();
            Assert.assertNull(createConsumer.receiveImmediate());
            createSession.close();
            createSession2.close();
            createSessionFactory.close();
            createSessionFactory2.close();
            if (serverLocator != null) {
                serverLocator.close();
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th) {
            }
            try {
                activeMQServer2.stop();
            } catch (Throwable th2) {
            }
            assertEquals(0L, loadQueues(activeMQServer).size());
        } catch (Throwable th3) {
            if (serverLocator != null) {
                serverLocator.close();
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th4) {
            }
            try {
                activeMQServer2.stop();
            } catch (Throwable th5) {
            }
            throw th3;
        }
    }

    @Test
    public void testNullForwardingAddress() throws Exception {
        HashMap hashMap = new HashMap();
        this.server0 = createClusteredServerWithParams(isNetty(), 0, false, hashMap);
        HashMap hashMap2 = new HashMap();
        addTargetParameters(hashMap2);
        this.server1 = createClusteredServerWithParams(isNetty(), 1, false, hashMap2);
        HashMap hashMap3 = new HashMap();
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), hashMap2);
        hashMap3.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(hashMap3);
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConfiguration2.getName());
        BridgeConfiguration staticConnectors = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setRetryInterval(1000L).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(5120).setStaticConnectors(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(staticConnectors);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList2);
        CoreQueueConfiguration name = new CoreQueueConfiguration().setAddress("testAddress").setName("queue0");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(name);
        this.server0.getConfiguration().setQueueConfigurations(arrayList3);
        CoreQueueConfiguration name2 = new CoreQueueConfiguration().setAddress("testAddress").setName("queue1");
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(name2);
        this.server1.getConfiguration().setQueueConfigurations(arrayList4);
        this.server1.start();
        this.server0.start();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        ClientSessionFactory createSessionFactory = this.locator.createSessionFactory(transportConfiguration);
        ClientSessionFactory createSessionFactory2 = this.locator.createSessionFactory(transportConfiguration2);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory2.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(new SimpleString("testAddress"));
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        createSession2.start();
        byte[] bArr = new byte[1024];
        SimpleString simpleString = new SimpleString("testkey");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty(simpleString, i);
            createMessage.getBodyBuffer().writeBytes(bArr);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(200L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
        createSession2.close();
        createSessionFactory.close();
        createSessionFactory2.close();
        closeFields();
    }

    @Test
    public void testInjectedTransformer() throws Exception {
        SimpleString simpleString = new SimpleString("myAddress");
        SimpleString simpleString2 = new SimpleString("myQueue");
        ServiceRegistryImpl serviceRegistryImpl = new ServiceRegistryImpl();
        Transformer transformer = new Transformer() { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTest.1
            public ServerMessage transform(ServerMessage serverMessage) {
                return null;
            }
        };
        serviceRegistryImpl.addBridgeTransformer("myBridge", transformer);
        ActiveMQServer addServer = addServer(new ActiveMQServerImpl(createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)), (MBeanServer) null, (ActiveMQSecurityManager) null, (ActiveMQServer) null, serviceRegistryImpl));
        addServer.start();
        addServer.waitForActivation(100L, TimeUnit.MILLISECONDS);
        addServer.deployQueue(simpleString, simpleString2, (SimpleString) null, false, false);
        ArrayList arrayList = new ArrayList();
        arrayList.add("in-vm");
        addServer.deployBridge(new BridgeConfiguration().setName("myBridge").setQueueName(simpleString2.toString()).setForwardingAddress(simpleString.toString()).setStaticConnectors(arrayList));
        BridgeImpl bridgeImpl = (Bridge) addServer.getClusterManager().getBridges().get("myBridge");
        assertNotNull(bridgeImpl);
        assertEquals(transformer, bridgeImpl.getTransformer());
    }

    protected Map<Long, AtomicInteger> loadQueues(ActiveMQServer activeMQServer) throws Exception {
        JournalImpl journalImpl = new JournalImpl(activeMQServer.getConfiguration().getJournalFileSize(), activeMQServer.getConfiguration().getJournalMinFiles(), activeMQServer.getConfiguration().getJournalPoolFiles(), 0, 0, new NIOSequentialFileFactory(activeMQServer.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
        LinkedList<RecordInfo> linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        journalImpl.start();
        journalImpl.load(linkedList, linkedList2, (TransactionFailureCallback) null);
        HashMap hashMap = new HashMap();
        for (RecordInfo recordInfo : linkedList) {
            Object newObjectEncoding = DescribeJournal.newObjectEncoding(recordInfo);
            if (recordInfo.getUserRecordType() == 32) {
                DescribeJournal.ReferenceDescribe referenceDescribe = (DescribeJournal.ReferenceDescribe) newObjectEncoding;
                AtomicInteger atomicInteger = (AtomicInteger) hashMap.get(Long.valueOf(referenceDescribe.refEncoding.queueID));
                if (atomicInteger == null) {
                    hashMap.put(Long.valueOf(referenceDescribe.refEncoding.queueID), new AtomicInteger(1));
                } else {
                    atomicInteger.incrementAndGet();
                }
            }
        }
        journalImpl.stop();
        return hashMap;
    }
}
