package org.apache.activemq.bugs;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4062Test.class */
public class AMQ4062Test {
    private BrokerService service;
    private PolicyEntry policy;
    private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
    private static final int PREFETCH_SIZE_5 = 5;
    private String connectionUri;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4062Test$PrefetchConsumer.class */
    public class PrefetchConsumer implements MessageListener {
        public static final String SUBSCRIPTION_NAME = "A_NAME_ABC_DEF";
        private final String uri;
        private boolean transacted;
        ActiveMQConnection connection;
        Session session;
        MessageConsumer consumer;
        private boolean needAck;
        private final String user = ActiveMQConnection.DEFAULT_USER;
        private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
        CountDownLatch a = new CountDownLatch(1);

        public PrefetchConsumer(boolean z, String str) {
            this.needAck = false;
            this.needAck = z;
            this.uri = str;
        }

        public void recieve() throws Exception {
            this.connection = new ActiveMQConnectionFactory(this.user, this.password, this.uri).createConnection();
            this.connection.setClientID("3");
            this.connection.start();
            this.session = this.connection.createSession(this.transacted, 2);
            this.consumer = this.session.createDurableSubscriber(this.session.createTopic("topic2"), SUBSCRIPTION_NAME);
            this.consumer.setMessageListener(this);
        }

        public void onMessage(Message message) {
            try {
                this.a.await();
            } catch (InterruptedException e) {
            }
            if (this.needAck) {
                try {
                    message.acknowledge();
                    this.consumer.close();
                    this.session.close();
                    this.connection.close();
                } catch (JMSException e2) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4062Test$Producer.class */
    public class Producer {
        protected final String user = ActiveMQConnection.DEFAULT_USER;
        private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
        private final String uri;
        private boolean transacted;

        public Producer(String str) {
            this.uri = str;
        }

        public void send() throws Exception {
            ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.user, this.password, this.uri).createConnection();
            createConnection.start();
            ActiveMQSession createSession = createConnection.createSession(this.transacted, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic("topic2"));
            createProducer.setDeliveryMode(2);
            for (int i = 0; i < 100; i++) {
                createProducer.send(createSession.createTextMessage("hello from producer"));
            }
            createProducer.close();
            createSession.close();
            createConnection.close();
        }
    }

    @Before
    public void startBroker() throws IOException, Exception {
        this.service = new BrokerService();
        this.service.setPersistent(true);
        this.service.setDeleteAllMessagesOnStartup(true);
        this.service.setUseJmx(false);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File("createData"));
        kahaDBPersistenceAdapter.setJournalMaxFileLength(33554432);
        this.service.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.policy = new PolicyEntry();
        this.policy.setTopic(">");
        this.policy.setDurableTopicPrefetch(5);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(this.policy);
        this.service.setDestinationPolicy(policyMap);
        this.service.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.service.start();
        this.service.waitUntilStarted();
        this.connectionUri = ((TransportConnector) this.service.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    public void restartBroker() throws IOException, Exception {
        this.service = new BrokerService();
        this.service.setPersistent(true);
        this.service.setUseJmx(false);
        this.service.setKeepDurableSubsActive(false);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File("createData"));
        kahaDBPersistenceAdapter.setJournalMaxFileLength(33554432);
        this.service.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.policy = new PolicyEntry();
        this.policy.setTopic(">");
        this.policy.setDurableTopicPrefetch(5);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(this.policy);
        this.service.setDestinationPolicy(policyMap);
        this.service.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.service.start();
        this.service.waitUntilStarted();
        this.connectionUri = ((TransportConnector) this.service.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        this.service.stop();
        this.service.waitUntilStopped();
        this.service = null;
    }

    @Test
    public void testDirableSubPrefetchRecovered() throws Exception {
        PrefetchConsumer prefetchConsumer = new PrefetchConsumer(true, this.connectionUri);
        prefetchConsumer.recieve();
        this.durableSubscriptions = getDurableSubscriptions();
        Assert.assertEquals(5L, getConsumerInfo(this.durableSubscriptions).getPrefetchSize());
        prefetchConsumer.a.countDown();
        new Producer(this.connectionUri).send();
        this.service.stop();
        this.service.waitUntilStopped();
        this.durableSubscriptions = null;
        stopBroker();
        restartBroker();
        getDurableSubscriptions();
        getConsumerInfo(this.durableSubscriptions);
        PrefetchConsumer prefetchConsumer2 = new PrefetchConsumer(false, this.connectionUri);
        prefetchConsumer2.recieve();
        prefetchConsumer2.a.countDown();
        Assert.assertEquals(5L, getConsumerInfo(this.durableSubscriptions).getPrefetchSize());
    }

    private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
        if (this.durableSubscriptions != null) {
            return this.durableSubscriptions;
        }
        TopicRegion topicRegion = this.service.getRegionBroker().getTopicRegion();
        Field declaredField = TopicRegion.class.getDeclaredField("durableSubscriptions");
        declaredField.setAccessible(true);
        this.durableSubscriptions = (ConcurrentMap) declaredField.get(topicRegion);
        return this.durableSubscriptions;
    }

    private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription> concurrentMap) {
        Iterator<DurableTopicSubscription> it = concurrentMap.values().iterator();
        while (it.hasNext()) {
            ConsumerInfo consumerInfo = it.next().getConsumerInfo();
            if (consumerInfo.getSubscriptionName().equals(PrefetchConsumer.SUBSCRIPTION_NAME)) {
                return consumerInfo;
            }
        }
        return null;
    }
}
