/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ2832Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
    BrokerService broker = null;
    private ActiveMQConnectionFactory cf;
    private final Destination destination = new ActiveMQQueue("AMQ2832Test");
    private String connectionUri;
    final String payload = new String(new byte[1024]);

    protected void startBroker() throws Exception {
        this.doStartBroker(true, false);
    }

    protected void restartBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        this.doStartBroker(false, false);
    }

    protected void recoverBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        this.doStartBroker(false, true);
    }

    private void doStartBroker(boolean delete, boolean recover) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(delete);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector("tcp://localhost:0");
        this.configurePersistence(this.broker, recover);
        this.connectionUri = "vm://localhost?create=false";
        this.cf = new ActiveMQConnectionFactory(this.connectionUri);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception {
        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
        adapter.setJournalMaxFileLength(20480);
        adapter.setCheckpointInterval(5000L);
        adapter.setCleanupInterval(5000L);
        if (recover) {
            adapter.setForceRecoverIndex(true);
        }
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
        this.startBroker();
        StagedConsumer consumer = new StagedConsumer();
        int numMessagesAvailable = this.produceMessagesToConsumeMultipleDataFiles(20);
        Message firstUnacked = consumer.receive(10);
        LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
        Message secondUnacked = consumer.receive(1);
        LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
        numMessagesAvailable -= 11;
        numMessagesAvailable += this.produceMessagesToConsumeMultipleDataFiles(10);
        LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
        firstUnacked.acknowledge();
        consumer.receive(numMessagesAvailable += this.produceMessagesToConsumeMultipleDataFiles(10)).acknowledge();
        consumer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.recoverBroker();
        consumer = new StagedConsumer();
        Message msg = consumer.receive(1, 5);
        Assert.assertNotNull((String)"One messages left after recovery", (Object)msg);
        msg.acknowledge();
        msg = consumer.receive(1, 5);
        Assert.assertEquals((String)("Only one messages left after recovery: " + msg), null, (Object)msg);
        consumer.close();
    }

    @Test
    public void testAlternateLossScenario() throws Exception {
        this.startBroker();
        PersistenceAdapter pa = this.broker.getPersistenceAdapter();
        if (pa instanceof LevelDBStore) {
            return;
        }
        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
        ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue");
        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
        this.createInactiveDurableSub((Topic)topic);
        Assert.assertEquals((long)1L, (long)this.getNumberOfJournalFiles());
        this.produceMessages((Destination)queue, 1);
        Assert.assertEquals((long)1L, (long)this.getNumberOfJournalFiles());
        this.produceMessages((Destination)disposable, 50);
        int dataFilesCount = this.getNumberOfJournalFiles();
        Assert.assertTrue((dataFilesCount > 1 ? 1 : 0) != 0);
        this.drainQueue((Queue)queue);
        this.produceMessages((Destination)disposable, 50);
        Assert.assertTrue((dataFilesCount < this.getNumberOfJournalFiles() ? 1 : 0) != 0);
        dataFilesCount = this.getNumberOfJournalFiles();
        this.restartBroker();
        this.broker.getAdminView().removeQueue(disposable.getQueueName());
        Assert.assertTrue((String)("Less than three journal file expected, was " + this.getNumberOfJournalFiles()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ2832Test.this.getNumberOfJournalFiles() <= 3;
            }
        }, (long)TimeUnit.MINUTES.toMillis(3L)));
        this.recoverBroker();
        Assert.assertTrue((this.drainQueue((Queue)queue) == 0 ? 1 : 0) != 0);
    }

    private int getNumberOfJournalFiles() throws IOException {
        Collection files = ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
        int reality = 0;
        for (DataFile file : files) {
            if (file == null) continue;
            ++reality;
        }
        return reality;
    }

    private void createInactiveDurableSub(Topic topic) throws Exception {
        Connection connection = this.cf.createConnection();
        connection.setClientID("Inactive");
        Session session = connection.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber(topic, "Inactive");
        consumer.close();
        connection.close();
        this.produceMessages((Destination)topic, 1);
    }

    private int drainQueue(Queue queue) throws Exception {
        Connection connection = this.cf.createConnection();
        connection.setClientID("Inactive");
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        int count = 0;
        while (consumer.receive(5000L) != null) {
            ++count;
        }
        consumer.close();
        connection.close();
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int produceMessages(Destination destination, int numToSend) throws Exception {
        int sent = 0;
        Connection connection = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri()).createConnection();
        connection.start();
        try {
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < numToSend; ++i) {
                producer.send(this.createMessage(session, i));
                ++sent;
            }
        }
        finally {
            connection.close();
        }
        return sent;
    }

    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
        return this.produceMessages(this.destination, numToSend);
    }

    private Message createMessage(Session session, int i) throws Exception {
        return session.createTextMessage(this.payload + "::" + i);
    }

    private class StagedConsumer {
        Connection connection;
        MessageConsumer consumer;

        StagedConsumer() throws Exception {
            this.connection = new ActiveMQConnectionFactory("failover://" + ((TransportConnector)AMQ2832Test.this.broker.getTransportConnectors().get(0)).getConnectUri().toString()).createConnection();
            this.connection.start();
            this.consumer = this.connection.createSession(false, 4).createConsumer(AMQ2832Test.this.destination);
        }

        public Message receive(int numToReceive) throws Exception {
            return this.receive(numToReceive, 2);
        }

        public Message receive(int numToReceive, int timeoutInSeconds) throws Exception {
            Message msg = null;
            while (numToReceive > 0) {
                while ((msg = this.consumer.receive(1000L)) == null && --timeoutInSeconds > 0) {
                }
                if (numToReceive > 1) {
                    msg.acknowledge();
                }
                if (msg != null) {
                    LOG.debug("received: " + msg.getJMSMessageID());
                }
                --numToReceive;
            }
            return msg;
        }

        void close() throws JMSException {
            this.consumer.close();
            this.connection.close();
        }
    }
}

