/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4368Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory connectionFactory;
    private final Destination destination = new ActiveMQQueue("large_message_queue");
    private String connectionUri;

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.connectionUri = this.broker.addConnector("tcp://localhost:0").getPublishableConnectString();
        this.broker.start();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        PolicyEntry policy = new PolicyEntry();
        policy.setUseCache(false);
        broker.setDestinationPolicy(new PolicyMap());
        broker.getDestinationPolicy().setDefaultEntry(policy);
        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
        kahadb.setCheckForCorruptJournalFiles(true);
        kahadb.setCleanupInterval(1000L);
        kahadb.deleteAllMessages();
        broker.setPersistenceAdapter((PersistenceAdapter)kahadb);
        broker.getSystemUsage().getMemoryUsage().setLimit(0x6400000L);
        return broker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testENTMQ220() throws InterruptedException, JMSException {
        LOG.info("Start test.");
        CountDownLatch producer1Started = new CountDownLatch(1);
        CountDownLatch producer2Started = new CountDownLatch(1);
        CountDownLatch listener1Started = new CountDownLatch(1);
        ProducingClient producer1 = new ProducingClient("1", producer1Started);
        ProducingClient producer2 = new ProducingClient("2", producer2Started);
        ConsumingClient listener1 = new ConsumingClient("subscriber-1", listener1Started);
        try {
            producer1.start();
            producer2.start();
            listener1.start();
            producer1Started.await(15L, TimeUnit.SECONDS);
            producer2Started.await(15L, TimeUnit.SECONDS);
            listener1Started.await(15L, TimeUnit.SECONDS);
            long lastSize = listener1.size.get();
            for (int i = 0; i < 10; ++i) {
                Thread.sleep(2000L);
                long size = listener1.size.get();
                LOG.info("Listener 1: consumed: " + (size - lastSize));
                Assert.assertTrue((String)("No messages received on iteration: " + i), (size > lastSize ? 1 : 0) != 0);
                lastSize = size;
            }
        }
        finally {
            LOG.info("Stopping clients");
            producer1.stop();
            producer2.stop();
            listener1.stop();
        }
    }

    class ConsumingClient
    extends Client {
        public ConsumingClient(String name, CountDownLatch startedLatch) {
            super(name, startedLatch);
        }

        @Override
        protected void work() throws Exception {
            MessageConsumer consumer = this.session.createConsumer(AMQ4368Test.this.destination);
            this.startedLatch.countDown();
            while (!this.done.get()) {
                Message msg = consumer.receive(100L);
                if (msg == null) continue;
                this.size.incrementAndGet();
            }
        }
    }

    class ProducingClient
    extends Client {
        ProducingClient(String name, CountDownLatch startedLatch) {
            super(name, startedLatch);
        }

        private String createMessage() {
            StringBuffer stringBuffer = new StringBuffer();
            for (long i = 0L; i < 1000000L; ++i) {
                stringBuffer.append("1234567890");
            }
            return stringBuffer.toString();
        }

        @Override
        protected void work() throws Exception {
            String data = this.createMessage();
            MessageProducer producer = this.session.createProducer(AMQ4368Test.this.destination);
            this.startedLatch.countDown();
            while (!this.done.get()) {
                producer.send((Message)this.session.createTextMessage(data));
                long i = this.size.incrementAndGet();
                if (i % 1000L != 0L) continue;
                LOG.info("produced " + i + ".");
            }
        }
    }

    abstract class Client
    implements Runnable {
        private final String name;
        final AtomicBoolean done = new AtomicBoolean();
        CountDownLatch startedLatch;
        CountDownLatch doneLatch = new CountDownLatch(1);
        Connection connection;
        Session session;
        final AtomicLong size = new AtomicLong();

        Client(String name, CountDownLatch startedLatch) {
            this.name = name;
            this.startedLatch = startedLatch;
        }

        public void start() {
            LOG.info("Starting: " + this.name);
            new Thread((Runnable)this, this.name).start();
        }

        public void stopAsync() {
            this.done.set(true);
        }

        public void stop() throws InterruptedException {
            this.stopAsync();
            if (!this.doneLatch.await(20L, TimeUnit.MILLISECONDS)) {
                try {
                    this.connection.close();
                    this.doneLatch.await();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                this.connection = this.createConnection();
                this.connection.start();
                try {
                    this.session = this.createSession();
                    this.work();
                }
                finally {
                    try {
                        this.connection.close();
                    }
                    catch (JMSException ignore) {}
                    LOG.info("Stopped: " + this.name);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                this.done.set(true);
            }
            finally {
                this.doneLatch.countDown();
            }
        }

        protected Session createSession() throws JMSException {
            return this.connection.createSession(false, 1);
        }

        protected Connection createConnection() throws JMSException {
            return AMQ4368Test.this.connectionFactory.createConnection();
        }

        protected abstract void work() throws Exception;
    }
}

