/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AMQ3167Test {
    protected BrokerService embeddedBroker;
    protected static final int MEMORY_LIMIT = 16384;
    protected static boolean Debug_f = false;
    protected long Producer_stop_time = 0L;
    protected long Consumer_stop_time = 0L;
    protected long Consumer_startup_delay_ms = 2000L;
    protected boolean Stop_after_error = true;
    protected Connection JMS_conn;
    protected long Num_error = 0L;

    protected Connection createUnsecuredConnection(String username, String password) throws JMSException {
        ActiveMQConnectionFactory conn_fact = new ActiveMQConnectionFactory(this.embeddedBroker.getVmConnectorURI());
        return conn_fact.createConnection(username, password);
    }

    @Before
    public void testPrep() throws Exception {
        this.embeddedBroker = new BrokerService();
        this.configureBroker(this.embeddedBroker);
        this.embeddedBroker.start();
        this.embeddedBroker.waitUntilStarted();
        this.JMS_conn = this.createUnsecuredConnection(null, null);
        this.JMS_conn.start();
    }

    @After
    public void testCleanup() throws Exception {
        this.JMS_conn.stop();
        this.embeddedBroker.stop();
    }

    protected void configureBroker(BrokerService broker_svc) throws Exception {
        broker_svc.setBrokerName("testbroker1");
        broker_svc.setUseJmx(false);
        broker_svc.setPersistent(true);
        broker_svc.setDataDirectory("target/AMQ3167Test");
        this.configureDestinationPolicy(broker_svc);
    }

    protected void configureDestinationPolicy(BrokerService broker_svc) {
        ArrayList<PolicyEntry> ent_list = new ArrayList<PolicyEntry>();
        PolicyEntry pol_ent = new PolicyEntry();
        pol_ent.setQueue(">");
        pol_ent.setMemoryLimit(16384L);
        pol_ent.setProducerFlowControl(false);
        ent_list.add(pol_ent);
        PolicyMap pol_map = new PolicyMap();
        pol_map.setPolicyEntries(ent_list);
        broker_svc.setDestinationPolicy(pol_map);
    }

    @Test
    public void testQueueLostMessage() throws Exception {
        ActiveMQDestination dest = ActiveMQDestination.createDestination((String)"lostmsgtest.queue", (byte)1);
        this.Producer_stop_time = System.nanoTime() + 10000000000L;
        this.Consumer_stop_time = this.Producer_stop_time + 5000000000L;
        this.runLostMsgTest((Destination)dest, 1000000, 1, 1, false);
        Assert.assertTrue((this.Num_error == 0L ? 1 : 0) != 0);
    }

    protected static void log(String msg) {
        if (Debug_f) {
            System.err.println(msg);
        }
    }

    protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess, boolean topic_f) throws Exception {
        String tag = "prod";
        AMQ3167Test.log(">> Starting producer " + tag);
        Session sess = this.JMS_conn.createSession(num_send_per_sess > 1, 1);
        MessageProducer prod = sess.createProducer(dest);
        producerThread prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess);
        prod_thread.start();
        AMQ3167Test.log("Started producer " + tag);
        AMQ3167Test.log("Waiting before starting consumers");
        Thread.sleep(this.Consumer_startup_delay_ms);
        tag = "cons";
        AMQ3167Test.log(">> Starting consumer");
        int ack_mode = num_recv_per_sess > 1 ? 2 : 1;
        sess = this.JMS_conn.createSession(false, ack_mode);
        MessageConsumer cons = sess.createConsumer(dest);
        consumerThread cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess);
        cons_thread.start();
        AMQ3167Test.log("Started consumer " + tag);
        AMQ3167Test.log("< waiting for producer.");
        prod_thread.join();
        AMQ3167Test.log("< waiting for consumer.");
        cons_thread.join();
        AMQ3167Test.log("Shutting down");
    }

    protected class consumerThread
    extends Thread {
        protected Session msgSess;
        protected MessageConsumer msgCons;
        protected String consumerTag;
        protected int numMsg;
        protected int numPerSess;

        consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) {
            this.msgSess = sess;
            this.msgCons = cons;
            this.consumerTag = tag;
            this.numMsg = num_msg;
            this.numPerSess = sess_size;
        }

        public void execTest() throws Exception {
            Message msg = null;
            int sess_start = 0;
            int cur = 0;
            while (!(cur >= this.numMsg || this.didTimeOut() || AMQ3167Test.this.Stop_after_error && AMQ3167Test.this.Num_error != 0L)) {
                msg = this.msgCons.receive(1000L);
                if (msg == null) continue;
                this.checkMessage(msg, cur);
                if (this.numPerSess <= 1 || ++cur - sess_start < this.numPerSess) continue;
                msg.acknowledge();
                sess_start = cur;
            }
            if (this.numPerSess > 1 && cur - sess_start > 0) {
                msg.acknowledge();
            }
            if (cur < this.numMsg) {
                AMQ3167Test.log("* Consumer " + this.consumerTag + " timed out");
            }
        }

        protected boolean didTimeOut() {
            return AMQ3167Test.this.Consumer_stop_time > 0L && System.nanoTime() >= AMQ3167Test.this.Consumer_stop_time;
        }

        protected void checkMessage(Message msg, int exp_seq) throws JMSException {
            int seq = msg.getIntProperty("seq");
            if (exp_seq != seq) {
                ++AMQ3167Test.this.Num_error;
                Assert.fail((String)("*** Consumer " + this.consumerTag + " expected seq " + exp_seq + "; received " + seq));
            }
        }

        @Override
        public void run() {
            try {
                AMQ3167Test.log("- running consumer " + this.consumerTag);
                this.execTest();
                AMQ3167Test.log("- running consumer " + this.consumerTag);
            }
            catch (Throwable thrown) {
                ++AMQ3167Test.this.Num_error;
                Assert.fail((String)("consumer " + this.consumerTag + " failed: " + thrown.getMessage()));
                throw new Error("consumer " + this.consumerTag + " failed", thrown);
            }
        }

        @Override
        public String toString() {
            return this.consumerTag;
        }
    }

    protected class producerThread
    extends Thread {
        protected Session msgSess;
        protected MessageProducer msgProd;
        protected String producerTag;
        protected int numMsg;
        protected int numPerSess;
        protected long producer_stop_time = 0L;

        producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) {
            this.msgSess = sess;
            this.msgProd = prod;
            this.producerTag = tag;
            this.numMsg = num_msg;
            this.numPerSess = sess_size;
        }

        public void execTest() throws Exception {
            int sess_start = 0;
            int cur = 0;
            while (!(cur >= this.numMsg || this.didTimeOut() || AMQ3167Test.this.Stop_after_error && AMQ3167Test.this.Num_error != 0L)) {
                TextMessage msg = this.msgSess.createTextMessage("test message from " + this.producerTag);
                msg.setStringProperty("testprodtag", this.producerTag);
                msg.setIntProperty("seq", cur);
                if (msg instanceof ActiveMQMessage) {
                    ((ActiveMQMessage)msg).setResponseRequired(true);
                }
                this.msgProd.send((Message)msg);
                if (this.numPerSess <= 1 || ++cur - sess_start < this.numPerSess) continue;
                this.msgSess.commit();
                sess_start = cur;
            }
            if (this.numPerSess > 1 && cur - sess_start > 0) {
                this.msgSess.commit();
            }
            if (cur < this.numMsg) {
                AMQ3167Test.log("* Producer " + this.producerTag + " timed out at " + System.nanoTime() + " (stop time " + this.producer_stop_time + ")");
            }
        }

        protected boolean didTimeOut() {
            return AMQ3167Test.this.Producer_stop_time > 0L && System.nanoTime() >= AMQ3167Test.this.Producer_stop_time;
        }

        @Override
        public void run() {
            try {
                AMQ3167Test.log("- running producer " + this.producerTag);
                this.execTest();
                AMQ3167Test.log("- finished running producer " + this.producerTag);
            }
            catch (Throwable thrown) {
                ++AMQ3167Test.this.Num_error;
                Assert.fail((String)("producer " + this.producerTag + " failed: " + thrown.getMessage()));
                throw new Error("producer " + this.producerTag + " failed", thrown);
            }
        }

        @Override
        public String toString() {
            return this.producerTag;
        }
    }
}

