package org.apache.activemq.artemis.tests.integration.routing;

import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.io.PrintStream;
import java.net.URI;
import java.util.Iterator;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.security.auth.Subject;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest.class */
public class ElasticQueueTest extends ActiveMQTestBase {
    static final String qName = "EQ";
    static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
    final int base_port = 61616;
    final Stack<Worker> workers = new Stack<>();
    final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
    private final String roleNameSharder = "role_name_sharder";
    private final ExecutorService executorService = Executors.newFixedThreadPool(3);
    MBeanServer mBeanServer = createMBeanServer();
    final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() { // from class: org.apache.activemq.artemis.tests.integration.routing.ElasticQueueTest.1
        public Subject authenticate(String str, String str2, RemotingConnection remotingConnection, String str3) {
            Subject subject = null;
            if (validateUser(str, str2)) {
                subject = new Subject();
                subject.getPrincipals().add(new UserPrincipal(str));
                subject.getPrincipals().add(new RolePrincipal("EQ_" + str));
                if (str.equals("BOTH")) {
                    subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER"));
                    subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER"));
                }
            }
            return subject;
        }

        public boolean authorize(Subject subject, Set<Role> set, CheckType checkType, String str) {
            return true;
        }

        public boolean validateUser(String str, String str2) {
            return str.equals("CONSUMER") || str.equals("PRODUCER") || str.equals("BOTH");
        }

        public boolean validateUserAndRole(String str, String str2, Set<Role> set, CheckType checkType) {
            return str.equals("CONSUMER") || str.equals("PRODUCER") || str.equals("BOTH");
        }
    };
    final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true);
    final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true);

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest$ConnectionListener.class */
    static class ConnectionListener implements JmsConnectionListener {
        AtomicInteger connectionCount;
        final AtomicReference<JMSException> failureReason;

        ConnectionListener(AtomicInteger atomicInteger, AtomicReference<JMSException> atomicReference) {
            this.connectionCount = atomicInteger;
            this.failureReason = atomicReference;
        }

        public void onConnectionEstablished(URI uri) {
        }

        public void onConnectionFailure(Throwable th) {
            if (this.failureReason != null) {
                JMSException jMSException = new JMSException("ConnectionFailureViaListener");
                jMSException.setLinkedException(new RuntimeException(th));
                this.failureReason.set(jMSException);
            }
        }

        public void onConnectionInterrupted(URI uri) {
        }

        public void onConnectionRestored(URI uri) {
            this.connectionCount.incrementAndGet();
        }

        public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
        }

        public void onSessionClosed(Session session, Throwable th) {
        }

        public void onConsumerClosed(MessageConsumer messageConsumer, Throwable th) {
        }

        public void onProducerClosed(MessageProducer messageProducer, Throwable th) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest$EQConsumer.class */
    public class EQConsumer extends Worker {
        final AtomicInteger consumedCount;
        final AtomicInteger connectionCount;
        final AtomicInteger delayMillis;
        private final String url;
        long lastConsumed;

        EQConsumer(ElasticQueueTest elasticQueueTest, String str) {
            this(str, 500);
        }

        EQConsumer(String str, int i) {
            super();
            this.consumedCount = new AtomicInteger();
            this.connectionCount = new AtomicInteger();
            this.lastConsumed = 0L;
            this.url = str;
            this.delayMillis = new AtomicInteger(i);
        }

        @Override // java.lang.Runnable
        public void run() {
            JmsConnection createConnection;
            while (!this.done.get()) {
                try {
                    try {
                        createConnection = new JmsConnectionFactory("CONSUMER", "PASSWORD", this.url).createConnection();
                    } catch (JMSException e) {
                    }
                    try {
                        this.connectionCount.incrementAndGet();
                        AtomicReference atomicReference = new AtomicReference();
                        createConnection.addConnectionListener(new ConnectionListener(this.connectionCount, atomicReference));
                        createConnection.start();
                        Session createSession = createConnection.createSession(false, 2);
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(ElasticQueueTest.qName));
                        while (!this.done.get() && atomicReference.get() == null) {
                            Message receiveNoWait = createConsumer.receiveNoWait();
                            if (receiveNoWait != null) {
                                this.consumedCount.incrementAndGet();
                                this.lastConsumed = receiveNoWait.getLongProperty("PID");
                                receiveNoWait.acknowledge();
                            }
                            TimeUnit.MILLISECONDS.sleep(this.delayMillis.get());
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                    } catch (Throwable th) {
                        if (createConnection != null) {
                            try {
                                createConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                    return;
                }
            }
        }

        public long getLastConsumed() {
            return this.lastConsumed;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest$EQProducer.class */
    public class EQProducer extends Worker {
        final AtomicInteger producedCount;
        final AtomicInteger connectionCount;
        private final String url;

        EQProducer(String str) {
            super();
            this.producedCount = new AtomicInteger();
            this.connectionCount = new AtomicInteger();
            this.url = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            URI uri = null;
            while (!this.done.get()) {
                try {
                    JmsConnection createConnection = new JmsConnectionFactory("PRODUCER", "PASSWORD", this.url).createConnection();
                    try {
                        this.connectionCount.incrementAndGet();
                        AtomicReference atomicReference = new AtomicReference();
                        createConnection.addConnectionListener(new ConnectionListener(this.connectionCount, atomicReference));
                        createConnection.start();
                        Session createSession = createConnection.createSession(false, 1);
                        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(ElasticQueueTest.qName));
                        BytesMessage createBytesMessage = createSession.createBytesMessage();
                        createBytesMessage.writeBytes(new byte[1024]);
                        while (!this.done.get() && atomicReference.get() == null) {
                            uri = createConnection.getConnectedURI();
                            createBytesMessage.setLongProperty("PID", this.producedCount.get() + 1);
                            createProducer.send(createBytesMessage);
                            this.producedCount.incrementAndGet();
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                    } catch (Throwable th) {
                        if (createConnection != null) {
                            try {
                                createConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (JMSException e) {
                    System.out.println("expected send failure: " + e.toString() + " PID: " + this.producedCount.get() + ", uri: " + uri);
                }
            }
        }

        public long getLastProduced() {
            return this.producedCount.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest$EQProducerAsyncConsumer.class */
    public class EQProducerAsyncConsumer extends Worker {
        final AtomicInteger producedCount;
        final AtomicInteger connectionCount;
        final AtomicBoolean producerDone;
        final AtomicInteger consumerSleepMillis;
        private final String url;
        final AtomicInteger consumedCount;
        private final String user;
        private long lastConsumed;
        private AtomicReference<JmsConnection> currentConnection;

        EQProducerAsyncConsumer(String str, String str2) {
            super();
            this.producedCount = new AtomicInteger();
            this.connectionCount = new AtomicInteger();
            this.producerDone = new AtomicBoolean();
            this.consumerSleepMillis = new AtomicInteger(1000);
            this.consumedCount = new AtomicInteger();
            this.currentConnection = new AtomicReference<>();
            this.url = str;
            this.user = str2;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.done.get()) {
                JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(this.user, "PASSWORD", this.url);
                AtomicReference atomicReference = new AtomicReference();
                try {
                    JmsConnection createConnection = jmsConnectionFactory.createConnection();
                    try {
                        this.currentConnection.set(createConnection);
                        this.connectionCount.incrementAndGet();
                        createConnection.addConnectionListener(new ConnectionListener(this.connectionCount, atomicReference));
                        createConnection.start();
                        Session createSession = createConnection.createSession(false, 2);
                        createSession.createConsumer(createSession.createQueue(ElasticQueueTest.qName)).setMessageListener(message -> {
                            this.consumedCount.incrementAndGet();
                            try {
                                this.lastConsumed = message.getLongProperty("PID");
                                if (!this.producerDone.get()) {
                                    TimeUnit.MILLISECONDS.sleep(this.consumerSleepMillis.get());
                                }
                                message.acknowledge();
                            } catch (JMSException | InterruptedException e) {
                                JMSException jMSException = new JMSException("ERROR from onMessage");
                                jMSException.setLinkedException(e);
                                atomicReference.set(jMSException);
                                PrintStream printStream = System.out;
                                long j = this.lastConsumed;
                                this.connectionCount.get();
                                printStream.println("OnMessage Got: " + e + ", lastConsumed:" + j + ", connectionCount:" + printStream);
                            }
                        });
                        Session createSession2 = createConnection.createSession(false, 1);
                        MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue(ElasticQueueTest.qName));
                        BytesMessage createBytesMessage = createSession2.createBytesMessage();
                        createBytesMessage.writeBytes(new byte[1024]);
                        while (!this.done.get()) {
                            if (atomicReference.get() != null) {
                                throw ((JMSException) atomicReference.get());
                                break;
                            } else if (this.producerDone.get()) {
                                TimeUnit.SECONDS.sleep(1L);
                            } else {
                                createBytesMessage.setLongProperty("PID", this.producedCount.get() + 1);
                                createProducer.send(createBytesMessage);
                                this.producedCount.incrementAndGet();
                            }
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                    } finally {
                    }
                } catch (JMSException | InterruptedException e) {
                    System.out.println("Exception: " + e.toString() + ", PC=" + this.producedCount.get());
                }
            }
        }

        public long getLastProduced() {
            return this.producedCount.get();
        }

        public long getLastConsumed() {
            return this.lastConsumed;
        }

        public String toString() {
            JmsConnection jmsConnection = this.currentConnection.get();
            return jmsConnection != null ? "EQProducerAsyncConsumer on:" + jmsConnection.getConnectedURI() : super.toString();
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest$Worker.class */
    abstract class Worker implements Runnable {
        final AtomicBoolean done = new AtomicBoolean();

        Worker() {
            ElasticQueueTest.this.workers.push(this);
        }
    }

    @After
    public void cleanup() {
        Iterator<EmbeddedActiveMQ> it = this.nodes.iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Throwable th) {
            }
        }
        this.nodes.clear();
        Iterator<Worker> it2 = this.workers.iterator();
        while (it2.hasNext()) {
            it2.next().done.set(true);
        }
        this.workers.clear();
        this.executorService.shutdownNow();
    }

    String urlForNodes(Stack<EmbeddedActiveMQ> stack) {
        StringBuilder sb = new StringBuilder("failover:(");
        int i = 61616;
        Iterator<EmbeddedActiveMQ> it = stack.iterator();
        while (it.hasNext()) {
            it.next();
            if (i != 61616) {
                sb.append(",");
            }
            int i2 = i;
            i++;
            sb.append("amqp://localhost:").append(i2);
        }
        sb.append(")?failover.randomize=true&failover.maxReconnectAttempts=0&jms.sendTimeout=1000");
        return sb.toString();
    }

    private void prepareNodesAndStartCombinedHeadTail() throws Exception {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setMaxSizeBytes(102400L).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL).setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0L).setSlowConsumerCheckPeriod(1L).setAutoDeleteQueues(false).setAutoDeleteAddresses(false);
        ConfigurationImpl configurationImpl = new ConfigurationImpl();
        configurationImpl.getAddressSettings().put(qName, addressSettings);
        ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration();
        connectionRouterConfiguration.setName("role_name_sharder").setKeyType(KeyType.ROLE_NAME).setKeyFilter("(?<=^EQ_).*");
        configurationImpl.addConnectionRouter(connectionRouterConfiguration);
        for (int i = 0; i < 2; i++) {
            Configuration copy = configurationImpl.copy();
            copy.setName("Node" + i);
            copy.setBrokerInstance(new File(getTestDirfile(), copy.getName()));
            copy.addAcceptorConfiguration("tcp", "tcp://localhost:" + (61616 + i) + "?router=role_name_sharder;amqpCredits=1000;amqpLowCredits=300");
            this.nodes.add(new EmbeddedActiveMQ().setConfiguration(copy));
            this.nodes.get(i).setSecurityManager(this.customSecurityManager);
            this.nodes.get(i).setMbeanServer(this.mBeanServer);
        }
        ((ConnectionRouterConfiguration) this.nodes.get(0).getConfiguration().getConnectionRouters().get(0)).setLocalTargetFilter("PRODUCER|CONSUMER");
        this.nodes.get(0).start();
    }

    @Test(timeout = 60000)
    public void testScale0_1() throws Exception {
        prepareNodesAndStartCombinedHeadTail();
        EQConsumer eQConsumer = new EQConsumer(this, urlForNodes(this.nodes));
        this.executorService.submit(eQConsumer);
        assertTrue(Wait.waitFor(() -> {
            return eQConsumer.connectionCount.get() > 1;
        }, 5000L, 200L));
        EQProducer eQProducer = new EQProducer(urlForNodes(this.nodes));
        this.executorService.submit(eQProducer);
        assertTrue(Wait.waitFor(() -> {
            return eQProducer.connectionCount.get() > 1;
        }, 10000L, 200L));
        AddressControl addressControl = (AddressControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, this.mBeanServer);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl.getAddressLimitPercent();
            System.out.println("Want 100% on Head&Tail, usage % " + addressLimitPercent);
            return addressLimitPercent == 100;
        }, 10000L, 500L));
        ConnectionRouterControl connectionRouterControl = (ConnectionRouterControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getConnectionRouterObjectName("role_name_sharder"), ConnectionRouterControl.class, this.mBeanServer);
        connectionRouterControl.setLocalTargetFilter("CONSUMER");
        ((ConnectionRouterConfiguration) this.nodes.get(1).getConfiguration().getConnectionRouters().get(0)).setLocalTargetFilter("PRODUCER");
        this.nodes.get(1).start();
        AddressControl addressControl2 = (AddressControl) ManagementControlHelper.createProxy(this.node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, this.mBeanServer);
        assertTrue("Producer is on Head, Node1", Wait.waitFor(() -> {
            try {
                int addressLimitPercent = addressControl2.getAddressLimitPercent();
                System.out.println("Node1 (head) usage % " + addressLimitPercent);
                return addressLimitPercent > 10;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        }, 5000L, 200L));
        eQConsumer.delayMillis.set(0);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl.getAddressLimitPercent();
            System.out.println("Want 0, Node0 (tail) usage % " + addressLimitPercent);
            return addressLimitPercent == 0;
        }, 20000L, 500L));
        connectionRouterControl.setLocalTargetFilter("");
        ((ConnectionRouterControl) ManagementControlHelper.createProxy(this.node1NameBuilder.getConnectionRouterObjectName("role_name_sharder"), ConnectionRouterControl.class, this.mBeanServer)).setLocalTargetFilter("CONSUMER|PRODUCER");
        this.nodes.get(0).stop();
        eQConsumer.delayMillis.set(500);
        assertTrue("New head&tail, Node1 full", Wait.waitFor(() -> {
            int addressLimitPercent = addressControl2.getAddressLimitPercent();
            System.out.println("Node1 usage % " + addressLimitPercent);
            return addressLimitPercent == 100;
        }, 10000L, 200L));
        eQProducer.done.set(true);
        eQConsumer.delayMillis.set(0);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl2.getAddressLimitPercent();
            System.out.println("Want 0, on producer complete, Node1 usage % " + addressLimitPercent);
            return addressLimitPercent == 0;
        }, 10000L, 200L));
        assertTrue("Got all produced", Wait.waitFor(() -> {
            return pidInRange("head&tail", eQProducer.getLastProduced(), eQConsumer.getLastConsumed());
        }, 4000L, 100L));
        eQConsumer.done.set(true);
        this.nodes.get(1).stop();
    }

    @Test(timeout = 60000)
    public void testScale0_1_CombinedProducerConsumerConnectionWithProducerRole() throws Exception {
        prepareNodesAndStartCombinedHeadTail();
        EQProducerAsyncConsumer eQProducerAsyncConsumer = new EQProducerAsyncConsumer(urlForNodes(this.nodes), "PRODUCER");
        this.executorService.submit(eQProducerAsyncConsumer);
        AddressControl addressControl = (AddressControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, this.mBeanServer);
        assertTrue(Wait.waitFor(() -> {
            try {
                int addressLimitPercent = addressControl.getAddressLimitPercent();
                System.out.println("Head&Tail usage % " + addressLimitPercent);
                return addressLimitPercent == 100;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        }, 10000L, 200L));
        assertTrue("producer got full error and reconnected", Wait.waitFor(() -> {
            return eQProducerAsyncConsumer.connectionCount.get() > 2;
        }));
        long lastProduced = eQProducerAsyncConsumer.getLastProduced();
        ((ConnectionRouterControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getConnectionRouterObjectName("role_name_sharder"), ConnectionRouterControl.class, this.mBeanServer)).setLocalTargetFilter("CONSUMER");
        ((ConnectionRouterConfiguration) this.nodes.get(1).getConfiguration().getConnectionRouters().get(0)).setLocalTargetFilter("PRODUCER");
        this.nodes.get(1).start();
        AddressControl addressControl2 = (AddressControl) ManagementControlHelper.createProxy(this.node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, this.mBeanServer);
        assertTrue(Wait.waitFor(() -> {
            try {
                addressControl2.pause();
                return true;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        }, 10000L, 200L));
        EQConsumer eQConsumer = new EQConsumer(urlForNodes(this.nodes), 0);
        this.executorService.submit(eQConsumer);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl.getAddressLimitPercent();
            System.out.println("Tail usage % " + addressLimitPercent);
            return addressLimitPercent == 0;
        }, 10000L, 200L));
        assertTrue(Wait.waitFor(() -> {
            PrintStream printStream = System.out;
            eQConsumer.getLastConsumed();
            printStream.println("drain tail, lastProduced: " + lastProduced + ", consumed: " + printStream);
            return lastProduced == eQConsumer.getLastConsumed();
        }, 5000L, 100L));
        eQConsumer.done.set(true);
        ((ConnectionRouterControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getConnectionRouterObjectName("role_name_sharder"), ConnectionRouterControl.class, this.mBeanServer)).setLocalTargetFilter("");
        this.nodes.get(0).stop();
        addressControl2.resume();
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl2.getAddressLimitPercent();
            System.out.println("Head&Tail usage % " + addressLimitPercent);
            return addressLimitPercent == 100;
        }, 10000L, 200L));
        eQProducerAsyncConsumer.producerDone.set(true);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl2.getAddressLimitPercent();
            System.out.println("Node1 usage % " + addressLimitPercent + ", eqProducerConsumer: " + eQProducerAsyncConsumer);
            return addressLimitPercent == 0;
        }, 10000L, 500L));
        assertTrue(Wait.waitFor(() -> {
            return pidInRange("head&tail", eQProducerAsyncConsumer.getLastProduced(), eQProducerAsyncConsumer.getLastConsumed());
        }, 5000L, 100L));
        eQProducerAsyncConsumer.done.set(true);
        this.nodes.get(1).stop();
    }

    @Test(timeout = 60000)
    public void testScale0_1_CombinedRoleConnection() throws Exception {
        prepareNodesAndStartCombinedHeadTail();
        EQProducerAsyncConsumer eQProducerAsyncConsumer = new EQProducerAsyncConsumer(urlForNodes(this.nodes), "BOTH");
        this.executorService.submit(eQProducerAsyncConsumer);
        AddressControl addressControl = (AddressControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, this.mBeanServer);
        assertTrue(Wait.waitFor(() -> {
            try {
                int addressLimitPercent = addressControl.getAddressLimitPercent();
                System.out.println("Head&Tail usage % " + addressLimitPercent);
                return addressLimitPercent == 100;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        }, 20000L, 200L));
        assertTrue("producer got full error and reconnected", Wait.waitFor(() -> {
            return eQProducerAsyncConsumer.connectionCount.get() > 0;
        }));
        ConnectionRouterControl connectionRouterControl = (ConnectionRouterControl) ManagementControlHelper.createProxy(this.node0NameBuilder.getConnectionRouterObjectName("role_name_sharder"), ConnectionRouterControl.class, this.mBeanServer);
        connectionRouterControl.setTargetKeyFilter("(?<=^EQ_)CONSUMER");
        addressControl.block();
        System.out.println("Tail blocked!");
        ((ConnectionRouterConfiguration) this.nodes.get(1).getConfiguration().getConnectionRouters().get(0)).setKeyFilter("(?<=^EQ_)PRODUCER");
        ((ConnectionRouterConfiguration) this.nodes.get(1).getConfiguration().getConnectionRouters().get(0)).setLocalTargetFilter((String) null);
        this.nodes.get(1).getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(qName).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new QueueConfiguration(qName).setRoutingType(RoutingType.ANYCAST)));
        this.nodes.get(1).start();
        AddressControl addressControl2 = (AddressControl) ManagementControlHelper.createProxy(this.node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, this.mBeanServer);
        assertTrue(Wait.waitFor(() -> {
            try {
                addressControl2.pause();
                return true;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        }, 5000L, 100L));
        ((ConnectionRouterControl) ManagementControlHelper.createProxy(this.node1NameBuilder.getConnectionRouterObjectName("role_name_sharder"), ConnectionRouterControl.class, this.mBeanServer)).setLocalTargetFilter("PRODUCER");
        System.out.println("Head enabled for producers... limit: " + addressControl2.getAddressLimitPercent());
        eQProducerAsyncConsumer.consumerSleepMillis.set(0);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl.getAddressLimitPercent();
            System.out.println("Want 0, tail usage % " + addressLimitPercent);
            return addressLimitPercent == 0;
        }, 20000L, 200L));
        System.out.println("Tail drained!");
        connectionRouterControl.setLocalTargetFilter((String) null);
        addressControl2.resume();
        eQProducerAsyncConsumer.consumerSleepMillis.set(2000);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl2.getAddressLimitPercent();
            System.out.println("want 100%, head&tail usage % " + addressLimitPercent);
            return addressLimitPercent == 100;
        }, 20000L, 200L));
        eQProducerAsyncConsumer.producerDone.set(true);
        assertTrue(Wait.waitFor(() -> {
            int addressLimitPercent = addressControl2.getAddressLimitPercent();
            System.out.println("Want 0, head&tail usage % " + addressLimitPercent);
            return addressLimitPercent == 0;
        }, 20000L, 200L));
        assertTrue(Wait.waitFor(() -> {
            return pidInRange("head&tail", eQProducerAsyncConsumer.getLastProduced(), eQProducerAsyncConsumer.getLastConsumed());
        }, 20000L, 200L));
        eQProducerAsyncConsumer.done.set(true);
        this.nodes.get(1).stop();
    }

    private boolean pidInRange(String str, long j, long j2) {
        System.out.println(String.format("pidInRange - %s, lastProduced: %d, lastConsumed: %d", str, Long.valueOf(j), Long.valueOf(j2)));
        return j2 == j || j2 == j + 1;
    }
}
