/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=BlockJUnit4ClassRunner.class)
public class OnePrefetchAsyncConsumerTest
extends EmbeddedBrokerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class);
    private Connection connection;
    private ConnectionConsumer connectionConsumer;
    private Queue queue;
    private final AtomicBoolean completed = new AtomicBoolean();
    private final AtomicBoolean success = new AtomicBoolean();

    @Ignore(value="https://issues.apache.org/jira/browse/AMQ-5126")
    @Test(timeout=60000L)
    public void testPrefetchExtension() throws Exception {
        Session session = this.connection.createSession(true, 1);
        MessageProducer producer = session.createProducer((Destination)this.queue);
        producer.send((Message)session.createTextMessage("Msg1"));
        producer.send((Message)session.createTextMessage("Msg2"));
        producer.send((Message)session.createTextMessage("Msg3"));
        session.commit();
        OnePrefetchAsyncConsumerTest.assertTrue((String)"test completed on time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return OnePrefetchAsyncConsumerTest.this.completed.get();
            }
        }));
        OnePrefetchAsyncConsumerTest.assertTrue((String)"Attempted to retrieve more than one ServerSession at a time", (boolean)this.success.get());
    }

    @Override
    protected ConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
    }

    @Override
    @Before
    public void setUp() throws Exception {
        this.setAutoFail(true);
        this.bindAddress = "tcp://localhost:0";
        super.setUp();
        this.connection = this.createConnection();
        this.queue = this.createQueue();
        this.connectionConsumer = this.connection.createConnectionConsumer((Destination)this.queue, null, (ServerSessionPool)new TestServerSessionPool(this.connection), 1);
        this.connection.start();
    }

    @Override
    @After
    public void tearDown() throws Exception {
        this.connectionConsumer.close();
        this.connection.close();
        super.tearDown();
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService answer = super.createBroker();
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setUsePrefetchExtension(false);
        policyMap.setDefaultEntry(defaultEntry);
        answer.setDestinationPolicy(policyMap);
        return answer;
    }

    protected Queue createQueue() {
        return new ActiveMQQueue(this.getDestinationString());
    }

    private class TestMessageListener
    implements MessageListener {
        private TestMessageListener() {
        }

        public void onMessage(Message message) {
            try {
                String text = ((TextMessage)message).getText();
                LOG.info("got message: " + text);
                if (text.equals("Msg3")) {
                    OnePrefetchAsyncConsumerTest.this.success.set(true);
                    OnePrefetchAsyncConsumerTest.this.completed.set(true);
                } else if (text.equals("Msg2")) {
                    TimeUnit.SECONDS.sleep(4L);
                }
            }
            catch (JMSException e) {
                LOG.error("in onMessage", (Throwable)e);
            }
            catch (InterruptedException e) {
                LOG.error("in onMessage", (Throwable)e);
            }
        }
    }

    private class TestServerSession
    implements ServerSession {
        TestServerSessionPool pool;
        Session session;

        public TestServerSession(TestServerSessionPool pool) throws JMSException {
            this.pool = pool;
            this.session = pool.connection.createSession(true, 1);
            this.session.setMessageListener((MessageListener)new TestMessageListener());
        }

        public Session getSession() throws JMSException {
            return this.session;
        }

        public void start() throws JMSException {
            new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    TestServerSession.this.session.run();
                    LOG.debug("Waiting on pool");
                    TestServerSessionPool testServerSessionPool = TestServerSession.this.pool;
                    synchronized (testServerSessionPool) {
                        try {
                            LOG.debug("About to call session.commit");
                            TestServerSession.this.session.commit();
                            LOG.debug("Commit completed");
                        }
                        catch (JMSException e) {
                            LOG.error("In start", (Throwable)e);
                        }
                        TestServerSession.this.pool.serverSessionInUse = false;
                    }
                }
            }.start();
        }
    }

    private class TestServerSessionPool
    implements ServerSessionPool {
        Connection connection;
        TestServerSession serverSession;
        boolean serverSessionInUse = false;

        public TestServerSessionPool(Connection connection) throws JMSException {
            this.connection = connection;
            this.serverSession = new TestServerSession(this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ServerSession getServerSession() throws JMSException {
            TestServerSessionPool testServerSessionPool = this;
            synchronized (testServerSessionPool) {
                if (this.serverSessionInUse) {
                    LOG.info("asked for session while in use, not serialised delivery");
                    OnePrefetchAsyncConsumerTest.this.success.set(false);
                    OnePrefetchAsyncConsumerTest.this.completed.set(true);
                }
                this.serverSessionInUse = true;
                return this.serverSession;
            }
        }
    }
}

