/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AMQ4062Test {
    private BrokerService service;
    private PolicyEntry policy;
    private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
    private static final int PREFETCH_SIZE_5 = 5;
    private String connectionUri;

    @Before
    public void startBroker() throws IOException, Exception {
        this.service = new BrokerService();
        this.service.setPersistent(true);
        this.service.setDeleteAllMessagesOnStartup(true);
        this.service.setUseJmx(false);
        KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
        File dataFile = new File("createData");
        pa.setDirectory(dataFile);
        pa.setJournalMaxFileLength(0x2000000);
        this.service.setPersistenceAdapter((PersistenceAdapter)pa);
        this.policy = new PolicyEntry();
        this.policy.setTopic(">");
        this.policy.setDurableTopicPrefetch(5);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(this.policy);
        this.service.setDestinationPolicy(pMap);
        this.service.addConnector("tcp://localhost:0");
        this.service.start();
        this.service.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.service.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    public void restartBroker() throws IOException, Exception {
        this.service = new BrokerService();
        this.service.setPersistent(true);
        this.service.setUseJmx(false);
        this.service.setKeepDurableSubsActive(false);
        KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
        File dataFile = new File("createData");
        pa.setDirectory(dataFile);
        pa.setJournalMaxFileLength(0x2000000);
        this.service.setPersistenceAdapter((PersistenceAdapter)pa);
        this.policy = new PolicyEntry();
        this.policy.setTopic(">");
        this.policy.setDurableTopicPrefetch(5);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(this.policy);
        this.service.setDestinationPolicy(pMap);
        this.service.addConnector("tcp://localhost:0");
        this.service.start();
        this.service.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.service.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        this.service.stop();
        this.service.waitUntilStopped();
        this.service = null;
    }

    @Test
    public void testDirableSubPrefetchRecovered() throws Exception {
        PrefetchConsumer consumer = new PrefetchConsumer(true, this.connectionUri);
        consumer.recieve();
        this.durableSubscriptions = this.getDurableSubscriptions();
        ConsumerInfo info = this.getConsumerInfo(this.durableSubscriptions);
        Assert.assertEquals((long)5L, (long)info.getPrefetchSize());
        consumer.a.countDown();
        Producer p = new Producer(this.connectionUri);
        p.send();
        p = null;
        this.service.stop();
        this.service.waitUntilStopped();
        this.durableSubscriptions = null;
        consumer = null;
        this.stopBroker();
        this.restartBroker();
        this.getDurableSubscriptions();
        info = null;
        info = this.getConsumerInfo(this.durableSubscriptions);
        consumer = new PrefetchConsumer(false, this.connectionUri);
        consumer.recieve();
        consumer.a.countDown();
        info = null;
        info = this.getConsumerInfo(this.durableSubscriptions);
        Assert.assertEquals((long)5L, (long)info.getPrefetchSize());
    }

    private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
        if (this.durableSubscriptions != null) {
            return this.durableSubscriptions;
        }
        RegionBroker regionBroker = (RegionBroker)this.service.getRegionBroker();
        TopicRegion region = (TopicRegion)regionBroker.getTopicRegion();
        Field field = TopicRegion.class.getDeclaredField("durableSubscriptions");
        field.setAccessible(true);
        this.durableSubscriptions = (ConcurrentMap)field.get(region);
        return this.durableSubscriptions;
    }

    private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) {
        ConsumerInfo info = null;
        for (Subscription sub : durableSubscriptions.values()) {
            info = sub.getConsumerInfo();
            if (!info.getSubscriptionName().equals("A_NAME_ABC_DEF")) continue;
            return info;
        }
        return null;
    }

    public class Producer {
        protected final String user = ActiveMQConnection.DEFAULT_USER;
        private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
        private final String uri;
        private boolean transacted;

        public Producer(String uri) {
            this.uri = uri;
        }

        public void send() throws Exception {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.user, this.password, this.uri);
            ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
            connection.start();
            ActiveMQSession session = (ActiveMQSession)connection.createSession(this.transacted, 1);
            Topic destination = session.createTopic("topic2");
            MessageProducer producer = session.createProducer((Destination)destination);
            producer.setDeliveryMode(2);
            for (int i = 0; i < 100; ++i) {
                TextMessage om = session.createTextMessage("hello from producer");
                producer.send((Message)om);
            }
            producer.close();
            session.close();
            connection.close();
        }
    }

    public class PrefetchConsumer
    implements MessageListener {
        public static final String SUBSCRIPTION_NAME = "A_NAME_ABC_DEF";
        private final String user = ActiveMQConnection.DEFAULT_USER;
        private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
        private final String uri;
        private boolean transacted;
        ActiveMQConnection connection;
        Session session;
        MessageConsumer consumer;
        private boolean needAck = false;
        CountDownLatch a = new CountDownLatch(1);

        public PrefetchConsumer(boolean needAck, String uri) {
            this.needAck = needAck;
            this.uri = uri;
        }

        public void recieve() throws Exception {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.user, this.password, this.uri);
            this.connection = (ActiveMQConnection)connectionFactory.createConnection();
            this.connection.setClientID("3");
            this.connection.start();
            this.session = this.connection.createSession(this.transacted, 2);
            Topic destination = this.session.createTopic("topic2");
            this.consumer = this.session.createDurableSubscriber(destination, SUBSCRIPTION_NAME);
            this.consumer.setMessageListener((MessageListener)this);
        }

        public void onMessage(Message message) {
            try {
                this.a.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (this.needAck) {
                try {
                    message.acknowledge();
                    this.consumer.close();
                    this.session.close();
                    this.connection.close();
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
            }
        }
    }
}

