package org.apache.activemq.artemis.tests.integration.client;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.class */
public class SlowConsumerTest extends ActiveMQTestBase {
    private boolean isNetty;
    private ActiveMQServer server;
    private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
    private ServerLocator locator;

    @Parameterized.Parameters(name = "isNetty={0}")
    public static Collection getParameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public SlowConsumerTest(boolean z) {
        this.isNetty = false;
        this.isNetty = z;
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(false, this.isNetty);
        AddressSettings slowConsumerPolicy = new AddressSettings().setSlowConsumerCheckPeriod(2L).setSlowConsumerThreshold(10L).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
        this.server.start();
        this.server.getAddressSettingsRepository().addMatch(this.QUEUE.toString(), slowConsumerPolicy);
        this.locator = createFactory(this.isNetty);
    }

    @Test
    public void testSlowConsumerKilled() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(this.locator).createSession(false, true, true, false));
        addClientSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer(this.QUEUE));
        for (int i = 0; i < 25; i++) {
            addClientProducer.send(createTextMessage(addClientSession, "m" + i));
        }
        ClientConsumer addClientConsumer = addClientConsumer(addClientSession.createConsumer(this.QUEUE));
        addClientSession.start();
        Thread.sleep(3000L);
        try {
            addClientConsumer.receiveImmediate();
            fail();
        } catch (ActiveMQObjectClosedException e) {
            assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
        }
    }

    @Test
    public void testSlowConsumerNotification() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(this.locator).createSession(false, true, true, false));
        addClientSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        AddressSettings slowConsumerPolicy = new AddressSettings().setSlowConsumerCheckPeriod(2L).setSlowConsumerThreshold(10L).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
        this.server.getAddressSettingsRepository().removeMatch(this.QUEUE.toString());
        this.server.getAddressSettingsRepository().addMatch(this.QUEUE.toString(), slowConsumerPolicy);
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer(this.QUEUE));
        for (int i = 0; i < 25; i++) {
            addClientProducer.send(createTextMessage(addClientSession, "m" + i));
        }
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        addClientSession.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), randomSimpleString, (SimpleString) null, false);
        ClientConsumer createConsumer = addClientSession.createConsumer(randomSimpleString.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageHandler(new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.SlowConsumerTest.1
            public void onMessage(ClientMessage clientMessage) {
                Assert.assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), clientMessage.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
                Assert.assertEquals(SlowConsumerTest.this.QUEUE.toString(), clientMessage.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
                Assert.assertEquals(1, clientMessage.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
                if (SlowConsumerTest.this.isNetty) {
                    Assert.assertTrue(clientMessage.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
                } else {
                    Assert.assertEquals(SimpleString.toSimpleString("invm:0"), clientMessage.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
                }
                Assert.assertNotNull(clientMessage.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
                Assert.assertNotNull(clientMessage.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
                Assert.assertNotNull(clientMessage.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
                try {
                    clientMessage.acknowledge();
                } catch (ActiveMQException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }
        });
        addClientConsumer(addClientSession.createConsumer(this.QUEUE));
        addClientSession.start();
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
    }

    @Test
    public void testSlowConsumerSpared() throws Exception {
        ClientSession addClientSession = addClientSession(createSessionFactory(this.locator).createSession(true, true));
        addClientSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer(this.QUEUE));
        for (int i = 0; i < 5; i++) {
            addClientProducer.send(createTextMessage(addClientSession, "m" + i));
        }
        ClientConsumer addClientConsumer = addClientConsumer(addClientSession.createConsumer(this.QUEUE));
        addClientSession.start();
        Thread.sleep(3000L);
        for (int i2 = 0; i2 < 5; i2++) {
            assertNotNull(addClientConsumer.receive(500L));
        }
    }

    @Test
    public void testFastThenSlowConsumerSpared() throws Exception {
        ClientMessage receive;
        this.locator.setAckBatchSize(0);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession addClientSession = addClientSession(createSessionFactory.createSession(true, true));
        final ClientSession addClientSession2 = addClientSession(createSessionFactory.createSession(true, true));
        addClientSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        final ClientProducer addClientProducer = addClientProducer(addClientSession2.createProducer(this.QUEUE));
        final AtomicLong atomicLong = new AtomicLong(0L);
        new Thread(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.client.SlowConsumerTest.2
            @Override // java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                ClientMessage createTextMessage = SlowConsumerTest.this.createTextMessage(addClientSession2, "m", true);
                while (System.currentTimeMillis() < currentTimeMillis + 3000) {
                    try {
                        addClientProducer.send(createTextMessage);
                        atomicLong.incrementAndGet();
                    } catch (ActiveMQException e) {
                        e.printStackTrace();
                        return;
                    }
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                while (System.currentTimeMillis() < currentTimeMillis2 + 10000) {
                    try {
                        addClientProducer.send(createTextMessage);
                        atomicLong.incrementAndGet();
                        Thread.sleep(1000L);
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        return;
                    }
                }
            }
        }).start();
        ClientConsumer addClientConsumer = addClientConsumer(addClientSession.createConsumer(this.QUEUE));
        addClientSession.start();
        long j = 0;
        do {
            receive = addClientConsumer.receive(1500L);
            if (receive != null) {
                receive.acknowledge();
                j++;
            }
        } while (receive != null);
        assertEquals(atomicLong.longValue(), j);
    }

    @Test
    public void testSlowWildcardConsumer() throws Exception {
        SimpleString simpleString = new SimpleString("a.b");
        SimpleString simpleString2 = new SimpleString("a.c");
        SimpleString simpleString3 = new SimpleString("a.*");
        SimpleString simpleString4 = new SimpleString("Q1");
        SimpleString simpleString5 = new SimpleString("Q2");
        SimpleString simpleString6 = new SimpleString("Q");
        this.server.getAddressSettingsRepository().addMatch(simpleString3.toString(), new AddressSettings().setSlowConsumerCheckPeriod(2L).setSlowConsumerThreshold(10L).setSlowConsumerPolicy(SlowConsumerPolicy.KILL));
        ClientSession addClientSession = addClientSession(createSessionFactory(this.locator).createSession(false, true, true, false));
        addClientSession.createQueue(simpleString, simpleString4, (SimpleString) null, false);
        addClientSession.createQueue(simpleString2, simpleString5, (SimpleString) null, false);
        addClientSession.createQueue(simpleString3, simpleString6, (SimpleString) null, false);
        ClientProducer createProducer = addClientSession.createProducer(simpleString);
        ClientProducer createProducer2 = addClientSession.createProducer(simpleString2);
        for (int i = 0; i < 20; i++) {
            createProducer.send(createTextMessage(addClientSession, "m1" + i));
            createProducer2.send(createTextMessage(addClientSession, "m2" + i));
        }
        ClientConsumer addClientConsumer = addClientConsumer(addClientSession.createConsumer(simpleString6));
        addClientSession.start();
        Thread.sleep(3000L);
        try {
            addClientConsumer.receiveImmediate();
            fail();
        } catch (ActiveMQObjectClosedException e) {
            assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
        }
    }
}
