package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import javax.naming.InitialContext;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.class */
public class TopicDurableTests extends JMSClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    public void addConfiguration(ActiveMQServer activeMQServer) {
        activeMQServer.getConfiguration().setAddressQueueScanPeriod(100L);
    }

    @Test
    public void testMessageDurableSubscription() throws Exception {
        Connection createConnection = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient").createConnection();
        createConnection.start();
        logger.debug("testMessageDurableSubscription");
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("jmsTopic");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "sub1DurSub");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(createTopic, "sub2DurSub");
        MessageProducer createProducer = createSession.createProducer(createTopic);
        sendMessages(createProducer, generateMessages(createSession, "First", 100));
        logger.debug("First batch messages sent");
        List<Message> receiveMessages = receiveMessages(createDurableSubscriber, 100);
        List<Message> receiveMessages2 = receiveMessages(createDurableSubscriber2, 100);
        assertThat(Integer.valueOf(receiveMessages.size()), CoreMatchers.is(100));
        assertMessageContent(receiveMessages, "First");
        logger.debug("{} :First batch messages received", "sub1DurSub");
        assertThat(Integer.valueOf(receiveMessages2.size()), CoreMatchers.is(100));
        assertMessageContent(receiveMessages2, "First");
        logger.debug("{} :First batch messages received", "sub2DurSub");
        createDurableSubscriber.close();
        logger.debug("{} : closed", "sub1DurSub");
        sendMessages(createProducer, generateMessages(createSession, "Second", 100));
        logger.debug("Second batch messages sent");
        List<Message> receiveMessages3 = receiveMessages(createDurableSubscriber2, 100);
        assertThat(Integer.valueOf(receiveMessages3.size()), CoreMatchers.is(100));
        assertMessageContent(receiveMessages3, "Second");
        logger.debug("{} :Second batch messages received", "sub2DurSub");
        TopicSubscriber createDurableSubscriber3 = createSession.createDurableSubscriber(createTopic, "sub1DurSub");
        logger.debug("{} :connected", "sub1DurSub");
        List<Message> receiveMessages4 = receiveMessages(createDurableSubscriber3, 100);
        assertThat(Integer.valueOf(receiveMessages4.size()), CoreMatchers.is(100));
        assertMessageContent(receiveMessages4, "Second");
        logger.debug("{} :Second batch messages received", "sub1DurSub");
        createDurableSubscriber3.close();
        createDurableSubscriber2.close();
        createSession.unsubscribe("sub1DurSub");
        createSession.unsubscribe("sub2DurSub");
    }

    @Test
    public void testSharedNonDurableSubscription() throws Exception {
        for (int i = 0; i < 10; i++) {
            logger.debug("testSharedNonDurableSubscription; iteration: {}", Integer.valueOf(i));
            Connection createConnection = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()).createConnection();
            Hashtable hashtable = new Hashtable();
            hashtable.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            hashtable.put("connectionfactory.qpidConnectionFactory", "amqp://localhost:5672");
            hashtable.put("topic.jmsTopic", "jmsTopic");
            Connection createConnection2 = ((ConnectionFactory) new InitialContext(hashtable).lookup("qpidConnectionFactory")).createConnection();
            createConnection.start();
            createConnection2.start();
            Session createSession = createConnection.createSession(false, 1);
            Session createSession2 = createConnection2.createSession(false, 1);
            Topic createTopic = createSession.createTopic("jmsTopic");
            MessageConsumer createSharedConsumer = createSession.createSharedConsumer(createTopic, "sharedConsumerNonDurable123");
            MessageConsumer createSharedConsumer2 = createSession2.createSharedConsumer(createTopic, "sharedConsumerNonDurable123");
            MessageConsumer createSharedConsumer3 = createSession2.createSharedConsumer(createTopic, "sharedConsumerNonDurable123");
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(1);
            List<Message> generateMessages = generateMessages(createSession, 10);
            List<CompletableFuture<List<Message>>> receiveMessagesAsync = receiveMessagesAsync(10, createSharedConsumer, createSharedConsumer2, createSharedConsumer3);
            sendMessages(createProducer, generateMessages);
            logger.debug("messages sent");
            assertThat("Each message should be received only by one consumer", Integer.valueOf(receiveMessagesAsync.get(0).get(20L, TimeUnit.SECONDS).size() + receiveMessagesAsync.get(1).get(20L, TimeUnit.SECONDS).size() + receiveMessagesAsync.get(2).get(20L, TimeUnit.SECONDS).size()), CoreMatchers.is(10));
            logger.debug("messages received");
            createConnection.stop();
            createConnection2.stop();
            createSharedConsumer.close();
            createSharedConsumer2.close();
            createSession.close();
            createSession2.close();
            createConnection.close();
            createConnection2.close();
            Wait.assertTrue(() -> {
                return this.server.getAddressInfo(SimpleString.toSimpleString("jmsTopic")) == null;
            }, 2000L, 100L);
        }
    }

    private void sendMessages(MessageProducer messageProducer, List<Message> list) {
        list.forEach(message -> {
            try {
                messageProducer.send(message);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    protected List<Message> receiveMessages(MessageConsumer messageConsumer, int i) {
        return receiveMessages(messageConsumer, i, 0L);
    }

    protected List<Message> receiveMessages(MessageConsumer messageConsumer, int i, long j) {
        ArrayList arrayList = new ArrayList();
        IntStream.range(0, i).forEach(i2 -> {
            try {
                arrayList.add(j > 0 ? messageConsumer.receive(j) : messageConsumer.receive());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        return arrayList;
    }

    protected void assertMessageContent(List<Message> list, String str) {
        list.forEach(message -> {
            try {
                assertTrue(((TextMessage) message).getText().contains(str));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    protected List<Message> generateMessages(Session session, int i) {
        return generateMessages(session, "", i);
    }

    protected List<Message> generateMessages(Session session, String str, int i) {
        ArrayList arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder();
        IntStream.range(0, i).forEach(i2 -> {
            try {
                arrayList.add(session.createTextMessage(sb.append(str).append("testMessage").append(i2).toString()));
                sb.setLength(0);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        return arrayList;
    }

    protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int i, MessageConsumer... messageConsumerArr) throws JMSException {
        AtomicInteger atomicInteger = new AtomicInteger(i);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < messageConsumerArr.length; i2++) {
            int i3 = i2;
            arrayList.add(new CompletableFuture());
            arrayList2.add(new ArrayList());
            messageConsumerArr[i2].setMessageListener(message -> {
                logger.debug("Mesages received{} count: {}", message, Integer.valueOf(atomicInteger.get()));
                ((List) arrayList2.get(i3)).add(message);
                if (atomicInteger.decrementAndGet() == 0) {
                    for (int i4 = 0; i4 < messageConsumerArr.length; i4++) {
                        ((CompletableFuture) arrayList.get(i4)).complete((List) arrayList2.get(i4));
                    }
                }
            });
        }
        return arrayList;
    }
}
