/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class MKahaDBTxVirtualTopicTest {
    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxVirtualTopicTest.class);
    private static final int maxFileLength = 32768;
    private static final int CLEANUP_INTERVAL_MILLIS = 500;
    @Parameterized.Parameter(value=0)
    public boolean concurrentSendOption;
    BrokerService broker;

    @Parameterized.Parameters(name="concurrentSend:{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({Boolean.TRUE}, {Boolean.FALSE});
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setAdvisorySupport(false);
        broker.setUseJmx(true);
        broker.setBrokerName("localhost");
        broker.setPersistenceAdapter(kaha);
        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
        VirtualTopic virtualTopic = new VirtualTopic();
        virtualTopic.setName("VirtualTopic.>");
        virtualTopic.setPrefix("Consumer.*.*.");
        virtualTopic.setConcurrentSend(this.concurrentSendOption);
        VirtualDestination[] virtualDestinations = new VirtualDestination[]{virtualTopic};
        interceptor.setVirtualDestinations(virtualDestinations);
        broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
        return broker;
    }

    @Test
    public void testConcurrentSendOkWithSplitStores() throws Exception {
        this.prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue((String)"Broker doesn't have an Admin View.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return MKahaDBTxVirtualTopicTest.this.broker.getAdminView() != null;
            }
        }));
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory("vm://localhost");
        this.init((ActiveMQConnectionFactory)activeMQXAConnectionFactory);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        this.init(activeMQConnectionFactory);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        String[] consumerIds = new String[]{"A.A", "A.B", "B.A", "B.B"};
        ConsumerHolder[] consumerHolders = new ConsumerHolder[consumerIds.length];
        int consumerHolderIdx = 0;
        for (String consumerId : consumerIds) {
            consumerHolders[consumerHolderIdx++] = this.createConsumer(activeMQConnectionFactory, consumerId);
        }
        XAConnection xaConnection = activeMQXAConnectionFactory.createXAConnection();
        xaConnection.start();
        XASession xas = xaConnection.createXASession();
        MessageProducer producer = xas.createProducer(null);
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.A");
        BytesMessage message = xas.createBytesMessage();
        message.writeBytes(new byte[100]);
        XAResource xaResource = xas.getXAResource();
        int numMessages = 500;
        for (int i = 0; i < 500; ++i) {
            message.setIntProperty("C", i);
            Xid xid = TestUtils.createXid();
            xaResource.start(xid, 0);
            producer.send((Destination)virtualTopic, (Message)message);
            xaResource.end(xid, 0x4000000);
            xaResource.commit(xid, true);
        }
        String[] arr$ = consumerIds;
        int len$ = arr$.length;
        for (int i$ = 0; i$ < len$; ++i$) {
            final String consumerId = arr$[i$];
            Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    org.apache.activemq.broker.region.Destination destination = MKahaDBTxVirtualTopicTest.this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer." + consumerId + ".VirtualTopic.A"));
                    LOG.info("message count for: " + consumerId + ", " + destination.getMessageStore().getMessageCount());
                    return 500 == destination.getMessageStore().getMessageCount();
                }
            }));
        }
        for (int i = 0; i < 500; ++i) {
            for (ConsumerHolder consumerHolder : consumerHolders) {
                Message m = consumerHolder.consumer.receive(4000L);
                if (m != null && i == 50) {
                    LOG.info("@ 50 Got: " + m.getIntProperty("C"));
                }
                if (!consumerHolder.session.getTransacted()) continue;
                consumerHolder.session.commit();
            }
        }
        for (final String consumerId : consumerIds) {
            Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    org.apache.activemq.broker.region.Destination destination = MKahaDBTxVirtualTopicTest.this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer." + consumerId + ".VirtualTopic.A"));
                    return 0 == destination.getMessageStore().getMessageCount();
                }
            }));
        }
    }

    private void init(ActiveMQConnectionFactory f) {
        f.setWatchTopicAdvisories(false);
        f.setAlwaysSyncSend(true);
    }

    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
        kaha.setJournalMaxFileLength(32768);
        kaha.setCleanupInterval(500L);
        if (delete) {
            kaha.deleteAllMessages();
        }
        return kaha;
    }

    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (deleteAllMessages) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        adapters.add(this.createFilteredKahaDBByDestinationPrefix("Consumer.A", deleteAllMessages));
        adapters.add(this.createFilteredKahaDBByDestinationPrefix("Consumer.B", deleteAllMessages));
        adapters.add(this.createFilteredKahaDBByDestinationPrefix(null, deleteAllMessages));
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4096);
        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10L);
        this.broker = this.createBroker((PersistenceAdapter)multiKahaDBPersistenceAdapter);
    }

    private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages) throws IOException {
        FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
        template.setPersistenceAdapter((PersistenceAdapter)this.createStore(deleteAllMessages));
        if (destinationPrefix != null) {
            template.setQueue(destinationPrefix + ".>");
        }
        return template;
    }

    private ConsumerHolder createConsumer(ActiveMQConnectionFactory f, String id) throws JMSException {
        ConsumerHolder consumerHolder = new ConsumerHolder();
        consumerHolder.connection = f.createConnection();
        consumerHolder.connection.start();
        consumerHolder.session = consumerHolder.connection.createSession(false, 1);
        consumerHolder.consumer = consumerHolder.session.createConsumer((Destination)new ActiveMQQueue("Consumer." + id + ".VirtualTopic.A"));
        return consumerHolder;
    }

    class ConsumerHolder {
        Connection connection;
        Session session;
        MessageConsumer consumer;

        ConsumerHolder() {
        }
    }
}

