package org.apache.activemq.artemis.tests.integration.client;

import java.lang.management.ManagementFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.class */
public class HangConsumerTest extends ActiveMQTestBase {
    private ActiveMQServer server;
    private Queue queue;
    private ServerLocator locator;
    private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
    ReusableLatch inCall = new ReusableLatch(1);
    Semaphore callbackSemaphore = new Semaphore(1);

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/HangConsumerTest$HangInterceptor.class */
    class HangInterceptor implements Interceptor {
        Semaphore semaphore = new Semaphore(1);
        ReusableLatch reusableLatch = new ReusableLatch(1);
        volatile ActiveMQException pendingException = null;

        HangInterceptor() {
        }

        public void close() throws Exception {
            this.semaphore.acquire();
        }

        public void open() throws Exception {
            this.semaphore.release();
        }

        public boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
            if (packet instanceof SessionReceiveMessage) {
                HangConsumerTest.this.instanceLog.debug("Receiving message");
                try {
                    this.reusableLatch.countDown();
                    this.semaphore.acquire();
                    this.semaphore.release();
                    this.reusableLatch.countUp();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (this.pendingException == null) {
                return true;
            }
            ActiveMQException activeMQException = this.pendingException;
            this.pendingException = null;
            throw activeMQException;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/HangConsumerTest$MyActiveMQServer.class */
    class MyActiveMQServer extends ActiveMQServerImpl {
        MyActiveMQServer(Configuration configuration, MBeanServer mBeanServer, ActiveMQSecurityManager activeMQSecurityManager) {
            super(configuration, mBeanServer, activeMQSecurityManager);
        }

        protected ServerSessionImpl internalCreateSession(String str, String str2, String str3, String str4, int i, RemotingConnection remotingConnection, boolean z, boolean z2, boolean z3, boolean z4, String str5, SessionCallback sessionCallback, OperationContext operationContext, boolean z5, Map<SimpleString, RoutingType> map, String str6) throws Exception {
            return new ServerSessionImpl(str, str2, str3, str4, i, z, z2, z3, getConfiguration().isPersistDeliveryCountBeforeDelivery(), z4, remotingConnection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), str5 == null ? null : new SimpleString(str5), new MyCallback(sessionCallback), operationContext, getPagingManager(), map, str6);
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/HangConsumerTest$MyCallback.class */
    class MyCallback implements SessionCallback {
        final SessionCallback targetCallback;

        public boolean hasCredits(ServerConsumer serverConsumer) {
            return true;
        }

        MyCallback(SessionCallback sessionCallback) {
            this.targetCallback = sessionCallback;
        }

        public void sendProducerCreditsMessage(int i, SimpleString simpleString) {
            this.targetCallback.sendProducerCreditsMessage(i, simpleString);
        }

        public boolean updateDeliveryCountAfterCancel(ServerConsumer serverConsumer, MessageReference messageReference, boolean z) {
            return false;
        }

        public void browserFinished(ServerConsumer serverConsumer) {
        }

        public boolean isWritable(ReadyListener readyListener, Object obj) {
            return true;
        }

        public void afterDelivery() throws Exception {
        }

        public void sendProducerCreditsFailMessage(int i, SimpleString simpleString) {
            this.targetCallback.sendProducerCreditsFailMessage(i, simpleString);
        }

        public int sendMessage(MessageReference messageReference, Message message, ServerConsumer serverConsumer, int i) {
            HangConsumerTest.this.inCall.countDown();
            try {
                HangConsumerTest.this.callbackSemaphore.acquire();
                try {
                    int sendMessage = this.targetCallback.sendMessage(messageReference, message, serverConsumer, i);
                    HangConsumerTest.this.callbackSemaphore.release();
                    HangConsumerTest.this.inCall.countUp();
                    return sendMessage;
                } catch (Throwable th) {
                    HangConsumerTest.this.callbackSemaphore.release();
                    HangConsumerTest.this.inCall.countUp();
                    throw th;
                }
            } catch (InterruptedException e) {
                HangConsumerTest.this.inCall.countUp();
                return -1;
            }
        }

        public int sendLargeMessage(MessageReference messageReference, Message message, ServerConsumer serverConsumer, long j, int i) {
            return this.targetCallback.sendLargeMessage(messageReference, message, serverConsumer, j, i);
        }

        public int sendLargeMessageContinuation(ServerConsumer serverConsumer, byte[] bArr, boolean z, boolean z2) {
            return this.targetCallback.sendLargeMessageContinuation(serverConsumer, bArr, z, z2);
        }

        public void closed() {
            this.targetCallback.closed();
        }

        public void disconnect(ServerConsumer serverConsumer, SimpleString simpleString) {
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = addServer(new MyActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(10L), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())));
        this.server.start();
        this.locator = createInVMNonHALocator();
    }

    @Test
    public void testHangOnDelivery() throws Exception {
        this.queue = this.server.createQueue(new QueueConfiguration(this.QUEUE).setRoutingType(RoutingType.ANYCAST));
        try {
            ClientSession createSession = this.locator.createSessionFactory().createSession(false, false, false);
            ClientSession createSession2 = createInVMNonHALocator().createSessionFactory().createSession();
            ClientProducer createProducer = createSession.createProducer(this.QUEUE);
            ClientConsumer createConsumer = createSession2.createConsumer(this.QUEUE);
            createProducer.send(createSession.createMessage(true));
            blockConsumers();
            createSession.commit();
            createSession2.start();
            awaitBlocking();
            createProducer.send(createSession.createMessage(true));
            createSession.commit();
            this.queue.getMessagesAdded();
            this.queue.getMessageCount();
            releaseConsumers();
            createSession2.rollback();
            this.queue.flushExecutor();
            Wait.waitFor(() -> {
                return getMessageCount(this.queue) == 2;
            });
            Queue queue = this.queue;
            queue.getClass();
            Wait.assertEquals(2L, queue::getMessageCount);
            Queue queue2 = this.queue;
            queue2.getClass();
            Wait.assertEquals(2L, queue2::getMessagesAdded);
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
            ClientMessage receive2 = createConsumer.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
            createSession.commit();
            createSession2.commit();
            createSession.close();
            createSession2.close();
            releaseConsumers();
        } catch (Throwable th) {
            releaseConsumers();
            throw th;
        }
    }

    protected void releaseConsumers() {
        this.callbackSemaphore.release();
    }

    protected void awaitBlocking() throws InterruptedException {
        assertTrue(this.inCall.await(5000L));
    }

    protected void blockConsumers() throws InterruptedException {
        this.callbackSemaphore.acquire();
    }

    @Test
    public void testHangDuplicateQueues() throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        QueueFactory queueFactory = new QueueFactoryImpl(this.server.getExecutorFactory(), this.server.getScheduledPool(), this.server.getAddressSettingsRepository(), this.server.getStorageManager(), this.server) { // from class: org.apache.activemq.artemis.tests.integration.client.HangConsumerTest.1LocalFactory
            public Queue createQueueWith(QueueConfiguration queueConfiguration, PagingManager pagingManager) {
                PageSubscription pageSubscription = getPageSubscription(queueConfiguration, pagingManager);
                HangConsumerTest hangConsumerTest = HangConsumerTest.this;
                final HangConsumerTest hangConsumerTest2 = HangConsumerTest.this;
                PagingStore pagingStore = pageSubscription != null ? pageSubscription.getPagingStore() : null;
                ScheduledExecutorService scheduledExecutorService = this.scheduledExecutor;
                PostOffice postOffice = this.postOffice;
                StorageManager storageManager = this.storageManager;
                HierarchicalRepository hierarchicalRepository = this.addressSettingsRepository;
                ArtemisExecutor executor = this.executorFactory.getExecutor();
                ActiveMQServer activeMQServer = this.server;
                final CountDownLatch countDownLatch2 = countDownLatch;
                final Semaphore semaphore2 = semaphore;
                hangConsumerTest.queue = new QueueImpl(queueConfiguration, pagingStore, pageSubscription, scheduledExecutorService, postOffice, storageManager, hierarchicalRepository, executor, activeMQServer) { // from class: org.apache.activemq.artemis.tests.integration.client.HangConsumerTest.1MyQueueWithBlocking
                    public boolean allowsReferenceCallback() {
                        return false;
                    }

                    public synchronized int deleteMatchingReferences(int i, Filter filter) throws Exception {
                        countDownLatch2.countDown();
                        semaphore2.acquire();
                        semaphore2.release();
                        return super.deleteMatchingReferences(i, filter);
                    }

                    public void deliverScheduledMessages() {
                    }
                };
                return HangConsumerTest.this.queue;
            }
        };
        queueFactory.setPostOffice(this.server.getPostOffice());
        this.server.replaceQueueFactory(queueFactory);
        this.queue = this.server.createQueue(new QueueConfiguration(this.QUEUE).setRoutingType(RoutingType.ANYCAST));
        semaphore.acquire();
        ClientSession createSession = this.locator.createSessionFactory().createSession(false, false, false);
        createSession.createProducer(this.QUEUE).send(createSession.createMessage(true));
        createSession.commit();
        Thread thread = new Thread() { // from class: org.apache.activemq.artemis.tests.integration.client.HangConsumerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    HangConsumerTest.this.server.destroyQueue(HangConsumerTest.this.QUEUE);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        try {
            this.server.createQueue(new QueueConfiguration(this.QUEUE).setRoutingType(RoutingType.ANYCAST));
        } catch (Exception e) {
        }
        semaphore.release();
        this.server.stop();
        thread.join();
        createSession.close();
        this.server.start();
        waitForServerToStart(this.server);
        this.server.stop();
    }

    @Test
    public void testForceDuplicationOnBindings() throws Exception {
        this.queue = this.server.createQueue(new QueueConfiguration(this.QUEUE).setRoutingType(RoutingType.ANYCAST));
        ClientSession createSession = this.locator.createSessionFactory().createSession(false, false, false);
        createSession.createProducer(this.QUEUE).send(createSession.createMessage(true));
        createSession.commit();
        long generateID = this.server.getStorageManager().generateID();
        long generateID2 = this.server.getStorageManager().generateID();
        this.server.getStorageManager().addQueueBinding(generateID2, new LocalQueueBinding(this.QUEUE, new QueueImpl(generateID, this.QUEUE, this.QUEUE, (Filter) null, (SimpleString) null, true, false, false, (ScheduledExecutorService) null, (PostOffice) null, (StorageManager) null, (HierarchicalRepository) null, (ArtemisExecutor) null, (ActiveMQServer) null, (QueueFactory) null), this.server.getNodeID()));
        this.server.getStorageManager().commitBindings(generateID2);
        this.server.stop();
        this.server.start();
        waitForServerToStart(this.server);
        this.server.stop();
    }

    @Test
    public void testExceptionWhileDelivering() throws Exception {
        this.queue = this.server.createQueue(new QueueConfiguration(this.QUEUE).setRoutingType(RoutingType.ANYCAST));
        HangInterceptor hangInterceptor = new HangInterceptor();
        try {
            this.locator.addIncomingInterceptor(hangInterceptor);
            ClientSessionFactory createSessionFactory = this.locator.createSessionFactory();
            ClientSession createSession = createSessionFactory.createSession(false, false, false);
            ClientProducer createProducer = createSession.createProducer(this.QUEUE);
            createSession.createConsumer(this.QUEUE);
            createProducer.send(createSession.createMessage(true));
            createSession.commit();
            hangInterceptor.close();
            createSession.start();
            Assert.assertTrue(hangInterceptor.reusableLatch.await(10L, TimeUnit.SECONDS));
            hangInterceptor.pendingException = new ActiveMQException();
            hangInterceptor.open();
            createSession.close();
            ClientSession createSession2 = createSessionFactory.createSession(false, false);
            createSession2.start();
            ClientMessage receive = createSession2.createConsumer(this.QUEUE).receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
            createSession2.commit();
            hangInterceptor.open();
        } catch (Throwable th) {
            hangInterceptor.open();
            throw th;
        }
    }

    @Test
    public void testDuplicateDestinationsOnTopic() throws Exception {
        for (int i = 0; i < 5; i++) {
            try {
                if (this.server.locateQueue(SimpleString.toSimpleString("tt")) == null) {
                    this.server.createQueue(new QueueConfiguration("tt").setRoutingType(RoutingType.ANYCAST).setFilterString("__AMQX=-1"));
                }
                this.server.stop();
                JournalImpl journalImpl = new JournalImpl(1048576, 2, 2, 0, 0, new NIOSequentialFileFactory(this.server.getConfiguration().getBindingsLocation(), (IOCriticalErrorListener) null, 1), "activemq-bindings", "bindings", 1);
                journalImpl.start();
                LinkedList linkedList = new LinkedList();
                journalImpl.load(linkedList, (List) null, (TransactionFailureCallback) null);
                int i2 = 0;
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    RecordInfo recordInfo = (RecordInfo) it.next();
                    this.instanceLog.debug("info: " + recordInfo);
                    if (recordInfo.getUserRecordType() == 21) {
                        i2++;
                    }
                }
                assertEquals(1L, i2);
                this.instanceLog.debug("Bindings: " + i2);
                journalImpl.stop();
                if (i < 4) {
                    this.server.start();
                }
            } finally {
                try {
                    this.server.stop();
                } catch (Throwable th) {
                }
            }
        }
    }
}
