/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ2832Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
    BrokerService broker = null;
    private final Destination destination = new ActiveMQQueue("AMQ2832Test");
    final String payload = new String(new byte[1024]);

    protected void startBroker(boolean delete) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(delete);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(false);
        this.broker.addConnector("tcp://localhost:0");
        this.configurePersistence(this.broker, delete);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
        adapter.setJournalMaxFileLength(20480);
        adapter.setCheckpointInterval(500L);
        adapter.setCleanupInterval(500L);
        if (!deleteAllOnStart) {
            adapter.setForceRecoverIndex(true);
        }
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testAckChain() throws Exception {
        this.startBroker(true);
        StagedConsumer consumer = new StagedConsumer();
        this.produceMessagesToConsumeMultipleDataFiles(5);
        consumer.receive(3);
        this.produceAndConsumeImmediately(20, consumer);
        consumer.receive(2).acknowledge();
        consumer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.startBroker(false);
        consumer = new StagedConsumer();
        Message message = consumer.receive(1);
        Assert.assertNotNull((String)"One message stays unacked from db-1.log", (Object)message);
        message.acknowledge();
        message = consumer.receive(1);
        Assert.assertNull((String)"There should not be any unconsumed messages any more", (Object)message);
        consumer.close();
    }

    private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
        for (int i = 0; i < numOfMsgs; ++i) {
            this.produceMessagesToConsumeMultipleDataFiles(1);
            consumer.receive(1).acknowledge();
        }
    }

    @Test
    public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
        this.startBroker(true);
        StagedConsumer consumer = new StagedConsumer();
        int numMessagesAvailable = this.produceMessagesToConsumeMultipleDataFiles(20);
        Message firstUnacked = consumer.receive(10);
        LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
        Message secondUnacked = consumer.receive(1);
        LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
        numMessagesAvailable -= 11;
        numMessagesAvailable += this.produceMessagesToConsumeMultipleDataFiles(10);
        LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
        firstUnacked.acknowledge();
        consumer.receive(numMessagesAvailable += this.produceMessagesToConsumeMultipleDataFiles(10)).acknowledge();
        consumer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.startBroker(false);
        consumer = new StagedConsumer();
        Message msg = consumer.receive(1, 5);
        Assert.assertNotNull((String)"One messages left after recovery", (Object)msg);
        msg.acknowledge();
        msg = consumer.receive(1, 5);
        Assert.assertEquals((String)("Only one messages left after recovery: " + msg), null, (Object)msg);
        consumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
        int sent = 0;
        Connection connection = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri()).createConnection();
        connection.start();
        try {
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer(this.destination);
            for (int i = 0; i < numToSend; ++i) {
                producer.send(this.createMessage(session, i));
                ++sent;
            }
        }
        finally {
            connection.close();
        }
        return sent;
    }

    private Message createMessage(Session session, int i) throws Exception {
        return session.createTextMessage(this.payload + "::" + i);
    }

    private class StagedConsumer {
        Connection connection;
        MessageConsumer consumer;

        StagedConsumer() throws Exception {
            this.connection = new ActiveMQConnectionFactory("failover://" + ((TransportConnector)AMQ2832Test.this.broker.getTransportConnectors().get(0)).getConnectUri().toString()).createConnection();
            this.connection.start();
            this.consumer = this.connection.createSession(false, 4).createConsumer(AMQ2832Test.this.destination);
        }

        public Message receive(int numToReceive) throws Exception {
            return this.receive(numToReceive, 2);
        }

        public Message receive(int numToReceive, int timeoutInSeconds) throws Exception {
            Message msg = null;
            while (numToReceive > 0) {
                while ((msg = this.consumer.receive(1000L)) == null && --timeoutInSeconds > 0) {
                }
                if (numToReceive > 1) {
                    msg.acknowledge();
                }
                if (msg != null) {
                    LOG.debug("received: " + msg.getJMSMessageID());
                }
                --numToReceive;
            }
            return msg;
        }

        void close() throws JMSException {
            this.consumer.close();
            this.connection.close();
        }
    }
}

