package org.apache.activemq.bugs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.transport.nio.NIOSSLConcurrencyTest;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/bugs/MKahaDBTxVirtualTopicTest.class */
public class MKahaDBTxVirtualTopicTest {
    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxVirtualTopicTest.class);
    private static final int maxFileLength = 32768;
    private static final int CLEANUP_INTERVAL_MILLIS = 500;

    @Parameterized.Parameter(0)
    public boolean concurrentSendOption;
    BrokerService broker;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/bugs/MKahaDBTxVirtualTopicTest$ConsumerHolder.class */
    public class ConsumerHolder {
        Connection connection;
        Session session;
        MessageConsumer consumer;

        ConsumerHolder() {
        }
    }

    @Parameterized.Parameters(name = "concurrentSend:{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE});
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected BrokerService createBroker(PersistenceAdapter persistenceAdapter) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setAdvisorySupport(false);
        brokerService.setUseJmx(true);
        brokerService.setBrokerName("localhost");
        brokerService.setPersistenceAdapter(persistenceAdapter);
        DestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
        VirtualDestination virtualTopic = new VirtualTopic();
        virtualTopic.setName("VirtualTopic.>");
        virtualTopic.setPrefix("Consumer.*.*.");
        virtualTopic.setConcurrentSend(this.concurrentSendOption);
        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
        brokerService.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
        return brokerService;
    }

    @Test
    public void testConcurrentSendOkWithSplitStores() throws Exception {
        prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxVirtualTopicTest.1
            public boolean isSatisified() throws Exception {
                return MKahaDBTxVirtualTopicTest.this.broker.getAdminView() != null;
            }
        }));
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory("vm://localhost");
        init(activeMQXAConnectionFactory);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        init(activeMQConnectionFactory);
        activeMQConnectionFactory.createConnection().start();
        String[] strArr = {"A.A", "A.B", "B.A", "B.B"};
        ConsumerHolder[] consumerHolderArr = new ConsumerHolder[strArr.length];
        int i = 0;
        for (String str : strArr) {
            int i2 = i;
            i++;
            consumerHolderArr[i2] = createConsumer(activeMQConnectionFactory, str);
        }
        XAConnection createXAConnection = activeMQXAConnectionFactory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageProducer createProducer = createXASession.createProducer((Destination) null);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.A");
        BytesMessage createBytesMessage = createXASession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[100]);
        XAResource xAResource = createXASession.getXAResource();
        for (int i3 = 0; i3 < 500; i3++) {
            createBytesMessage.setIntProperty("C", i3);
            Xid createXid = TestUtils.createXid();
            xAResource.start(createXid, 0);
            createProducer.send(activeMQTopic, createBytesMessage);
            xAResource.end(createXid, 67108864);
            xAResource.commit(createXid, true);
        }
        for (final String str2 : strArr) {
            Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxVirtualTopicTest.2
                public boolean isSatisified() throws Exception {
                    org.apache.activemq.broker.region.Destination destination = MKahaDBTxVirtualTopicTest.this.broker.getDestination(new ActiveMQQueue("Consumer." + str2 + ".VirtualTopic.A"));
                    MKahaDBTxVirtualTopicTest.LOG.info("message count for: " + str2 + ", " + destination.getMessageStore().getMessageCount());
                    return 500 == destination.getMessageStore().getMessageCount();
                }
            }));
        }
        for (int i4 = 0; i4 < 500; i4++) {
            for (ConsumerHolder consumerHolder : consumerHolderArr) {
                Message receive = consumerHolder.consumer.receive(4000L);
                if (receive != null && i4 == 50) {
                    LOG.info("@ 50 Got: " + receive.getIntProperty("C"));
                }
                if (consumerHolder.session.getTransacted()) {
                    consumerHolder.session.commit();
                }
            }
        }
        for (final String str3 : strArr) {
            Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxVirtualTopicTest.3
                public boolean isSatisified() throws Exception {
                    return 0 == MKahaDBTxVirtualTopicTest.this.broker.getDestination(new ActiveMQQueue(new StringBuilder().append("Consumer.").append(str3).append(".VirtualTopic.A").toString())).getMessageStore().getMessageCount();
                }
            }));
        }
    }

    private void init(ActiveMQConnectionFactory activeMQConnectionFactory) {
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setAlwaysSyncSend(true);
    }

    protected KahaDBPersistenceAdapter createStore(boolean z) throws IOException {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(maxFileLength);
        kahaDBPersistenceAdapter.setCleanupInterval(500L);
        if (z) {
            kahaDBPersistenceAdapter.deleteAllMessages();
        }
        return kahaDBPersistenceAdapter;
    }

    public void prepareBrokerWithMultiStore(boolean z) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (z) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(createFilteredKahaDBByDestinationPrefix("Consumer.A", z));
        arrayList.add(createFilteredKahaDBByDestinationPrefix("Consumer.B", z));
        arrayList.add(createFilteredKahaDBByDestinationPrefix(null, z));
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(NIOSSLConcurrencyTest.MESSAGE_SIZE);
        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10L);
        this.broker = createBroker(multiKahaDBPersistenceAdapter);
    }

    private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String str, boolean z) throws IOException {
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(createStore(z));
        if (str != null) {
            filteredKahaDBPersistenceAdapter.setQueue(str + ".>");
        }
        return filteredKahaDBPersistenceAdapter;
    }

    private ConsumerHolder createConsumer(ActiveMQConnectionFactory activeMQConnectionFactory, String str) throws JMSException {
        ConsumerHolder consumerHolder = new ConsumerHolder();
        consumerHolder.connection = activeMQConnectionFactory.createConnection();
        consumerHolder.connection.start();
        consumerHolder.session = consumerHolder.connection.createSession(false, 1);
        consumerHolder.consumer = consumerHolder.session.createConsumer(new ActiveMQQueue("Consumer." + str + ".VirtualTopic.A"));
        return consumerHolder;
    }
}
