package org.apache.activemq.artemis.tests.integration.paging;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.class */
public class PagingSendTest extends ActiveMQTestBase {
    public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
    private ServerLocator locator;
    private ActiveMQServer server;

    protected boolean isNetty() {
        return false;
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        new ConfigurationImpl();
        this.server = newActiveMQServer();
        this.server.start();
        waitForServerToStart(this.server);
        this.locator = createFactory(isNetty());
    }

    private ActiveMQServer newActiveMQServer() throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(10240L).setMaxSizeBytes(20480L));
        return createServer;
    }

    @Test
    public void testSameMessageOverAndOverBlocking() throws Exception {
        dotestSameMessageOverAndOver(true);
    }

    @Test
    public void testSameMessageOverAndOverNonBlocking() throws Exception {
        dotestSameMessageOverAndOver(false);
    }

    public void dotestSameMessageOverAndOver(boolean z) throws Exception {
        this.locator.setBlockOnNonDurableSend(z).setBlockOnDurableSend(z).setBlockOnAcknowledge(z);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.createQueue(ADDRESS, ADDRESS, (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(ADDRESS);
        ClientMessage createMessage = createSession.createMessage(true);
        createMessage.getBodyBuffer().writeBytes(new byte[1024]);
        for (int i = 0; i < 200; i++) {
            createProducer.send(createMessage);
        }
        createSession.close();
        ClientSession createSession2 = createSessionFactory.createSession((String) null, (String) null, false, true, true, false, 0);
        ClientConsumer createConsumer = createSession2.createConsumer(ADDRESS);
        createSession2.start();
        for (int i2 = 0; i2 < 200; i2++) {
            ClientMessage receive = createConsumer.receive(10000L);
            Assert.assertNotNull(receive);
            if (i2 == 100) {
                createSession2.commit();
            }
            receive.acknowledge();
        }
        createConsumer.close();
        createSession2.close();
    }

    @Test
    public void testOrderOverTX() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(true, true, 0);
        createSession.createQueue(ADDRESS, ADDRESS, (SimpleString) null, true);
        final ClientSession createSession2 = createSessionFactory.createSession(false, false);
        final ClientProducer createProducer = createSession2.createProducer(ADDRESS);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread() { // from class: org.apache.activemq.artemis.tests.integration.paging.PagingSendTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int i = 0;
                for (int i2 = 0; i2 < 1000; i2++) {
                    try {
                        ClientMessage createMessage = createSession2.createMessage(true);
                        createMessage.getBodyBuffer().writeBytes(new byte[1024]);
                        createMessage.putIntProperty("count", i2);
                        createProducer.send(createMessage);
                        if (i2 % 100 == 0 && i2 > 0) {
                            createSession2.commit();
                            int i3 = i;
                            i++;
                            if (i3 > 2) {
                                countDownLatch.countDown();
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        atomicInteger.incrementAndGet();
                        return;
                    }
                }
                createSession2.commit();
            }
        };
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        createSession.start();
        thread.start();
        assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        for (int i = 0; i < 1000; i++) {
            ClientMessage receive = createConsumer.receive(10000L);
            Assert.assertNotNull(receive);
            assertEquals(i, receive.getIntProperty("count").intValue());
            receive.acknowledge();
        }
        thread.join();
        createSession.close();
        createSession2.close();
        assertEquals(0L, atomicInteger.get());
    }

    @Test
    public void testPagingDoesNotDuplicateBatchMessages() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false);
        SimpleString simpleString = new SimpleString("testQueue");
        createSession.createQueue(simpleString, simpleString, (SimpleString) null, true);
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(10240L).setMaxSizeBytes(16384L));
        sendMessageBatch(20, createSession, simpleString);
        checkBatchMessagesAreNotPagedTwice(this.server.locateQueue(simpleString));
        for (int i = 0; i < 10; i++) {
            assertEquals(20, processCountThroughIterator(r0));
        }
    }

    @Test
    public void testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws Exception {
        int i;
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false);
        SimpleString simpleString = new SimpleString("testQueue");
        createSession.createQueue(simpleString, simpleString, (SimpleString) null, true);
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(10240L).setMaxSizeBytes(16384L));
        int i2 = 0;
        while (true) {
            i = i2;
            if (this.server.getPagingManager().getPageStore(simpleString).isPaging()) {
                break;
            }
            sendMessageBatch(20, createSession, simpleString);
            i2 = i + 20;
        }
        sendMessageBatch(20, createSession, simpleString);
        int i3 = i + 20;
        checkBatchMessagesAreNotPagedTwice(this.server.locateQueue(simpleString));
        for (int i4 = 0; i4 < 10; i4++) {
            assertEquals(i3, processCountThroughIterator(r0));
        }
    }

    public List<String> sendMessageBatch(int i, ClientSession clientSession, SimpleString simpleString) throws ActiveMQException {
        ArrayList arrayList = new ArrayList();
        ClientProducer createProducer = clientSession.createProducer(simpleString);
        for (int i2 = 0; i2 < i; i2++) {
            ClientMessage createMessage = clientSession.createMessage(true);
            createMessage.getBodyBuffer().writeBytes(new byte[1024]);
            String uuid = UUID.randomUUID().toString();
            createMessage.putStringProperty("id", uuid);
            createMessage.putIntProperty("seq", i2);
            arrayList.add(uuid);
            createProducer.send(createMessage);
        }
        clientSession.commit();
        return arrayList;
    }

    public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception {
        LinkedListIterator linkedListIterator = queue.totalIterator();
        HashSet hashSet = new HashSet();
        int i = 0;
        while (linkedListIterator.hasNext()) {
            if (!hashSet.add(((MessageReference) linkedListIterator.next()).getMessage().getStringProperty("id"))) {
                i++;
            }
        }
        assertTrue(i == 0);
    }

    protected int processCountThroughIterator(Queue queue) throws Exception {
        LinkedListIterator linkedListIterator = queue.totalIterator();
        int i = 0;
        while (linkedListIterator.hasNext()) {
            i++;
        }
        return i;
    }
}
