/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.leveldb.LevelDBStoreFactory;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationMap;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class AMQ6254Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6254Test.class);
    private static final String KAHADB = "KahaDB";
    private static final String LEVELDB = "LevelDB";
    private BrokerService brokerService;
    private String topicA = "alphabet.a";
    private String topicB = "alphabet.b";
    private String persistenceAdapterName;
    private boolean pluginsEnabled;

    @Parameterized.Parameters(name="{0} -> plugins = {1}")
    public static Collection<Object[]> data() {
        return Arrays.asList({KAHADB, true}, {KAHADB, false}, {LEVELDB, true}, {LEVELDB, false});
    }

    public AMQ6254Test(String persistenceAdapterName, boolean pluginsEnabled) {
        this.persistenceAdapterName = persistenceAdapterName;
        this.pluginsEnabled = pluginsEnabled;
    }

    @Test(timeout=60000L)
    public void testReactivateKeepaliveSubscription() throws Exception {
        Connection connection = this.createConnection();
        connection.setClientID("cliID");
        connection.start();
        Session session = connection.createSession(false, 1);
        TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>");
        connection = this.createConnection();
        connection.start();
        session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)session.createTopic(this.topicA));
        producer.send((Message)session.createTextMessage("Hello A"));
        TextMessage message = (TextMessage)subscriber.receive(2000L);
        Assert.assertNotNull((String)"Message not received.", (Object)message);
        Assert.assertEquals((Object)"Hello A", (Object)message.getText());
        Assert.assertTrue((String)"Should have only one consumer", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ6254Test.this.getProxyToTopic(AMQ6254Test.this.topicA).getConsumerCount() == 1L;
            }
        }));
        subscriber.close();
        Assert.assertTrue((String)"Should have one message consumed", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ6254Test.this.getProxyToTopic(AMQ6254Test.this.topicA).getDequeueCount() == 1L;
            }
        }));
        connection.close();
        Assert.assertTrue((String)"Should have only one destination", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                org.apache.activemq.broker.region.Destination destA = AMQ6254Test.this.getDestination(AMQ6254Test.this.topicA);
                return destA.getDestinationStatistics().getConsumers().getCount() == 1L;
            }
        }));
        Assert.assertTrue((String)"Should have only one inactive subscription", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ6254Test.this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
            }
        }));
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        LOG.info("Broker stopped");
        this.brokerService = this.createBroker(false);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        LOG.info("Broker restarted");
        connection = this.createConnection();
        connection.setClientID("cliID");
        connection.start();
        session = connection.createSession(false, 1);
        subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>");
        connection = this.createConnection();
        connection.start();
        session = connection.createSession(false, 1);
        producer = session.createProducer((Destination)session.createTopic(this.topicA));
        producer.send((Message)session.createTextMessage("Hello Again A"));
        message = (TextMessage)subscriber.receive(2000L);
        Assert.assertNotNull((String)"Message not received.", (Object)message);
        Assert.assertEquals((Object)"Hello Again A", (Object)message.getText());
        Assert.assertTrue((String)"Should have only one destination", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                org.apache.activemq.broker.region.Destination destA = AMQ6254Test.this.getDestination(AMQ6254Test.this.topicA);
                return destA.getDestinationStatistics().getConsumers().getCount() == 1L;
            }
        }));
        subscriber.close();
        connection.close();
    }

    private org.apache.activemq.broker.region.Destination getDestination(String topicName) {
        RegionBroker regionBroker = (RegionBroker)this.brokerService.getRegionBroker();
        TopicRegion topicRegion = (TopicRegion)regionBroker.getTopicRegion();
        Set destinations = topicRegion.getDestinations((ActiveMQDestination)new ActiveMQTopic(topicName));
        Assert.assertEquals((long)1L, (long)destinations.size());
        return (org.apache.activemq.broker.region.Destination)destinations.iterator().next();
    }

    private Connection createConnection() throws Exception {
        String connectionURI = ((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
        return cf.createConnection("system", "manager");
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = this.createBroker(true);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }

    protected BrokerService createBroker(boolean deleteAllMessages) throws Exception {
        BrokerService answer = new BrokerService();
        answer.setKeepDurableSubsActive(true);
        answer.setUseJmx(true);
        answer.setPersistent(true);
        answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
        answer.setAdvisorySupport(false);
        switch (this.persistenceAdapterName) {
            case "KahaDB": {
                answer.setPersistenceAdapter((PersistenceAdapter)new KahaDBPersistenceAdapter());
                break;
            }
            case "LevelDB": {
                answer.setPersistenceFactory((PersistenceAdapterFactory)new LevelDBStoreFactory());
            }
        }
        answer.addConnector("tcp://localhost:0");
        if (this.pluginsEnabled) {
            BrokerPlugin authorizationPlugin;
            ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
            BrokerPlugin authenticationPlugin = this.configureAuthentication();
            if (authenticationPlugin != null) {
                plugins.add(this.configureAuthorization());
            }
            if ((authorizationPlugin = this.configureAuthorization()) != null) {
                plugins.add(this.configureAuthentication());
            }
            if (!plugins.isEmpty()) {
                BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
                answer.setPlugins(plugins.toArray(array));
            }
        }
        ActiveMQDestination[] destinations = new ActiveMQDestination[]{new ActiveMQTopic(this.topicA), new ActiveMQTopic(this.topicB)};
        answer.setDestinations(destinations);
        return answer;
    }

    protected BrokerPlugin configureAuthentication() throws Exception {
        ArrayList<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
        users.add(new AuthenticationUser("system", "manager", "users,admins"));
        SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
        return authenticationPlugin;
    }

    protected BrokerPlugin configureAuthorization() throws Exception {
        ArrayList<AuthorizationEntry> authorizationEntries = new ArrayList<AuthorizationEntry>();
        AuthorizationEntry entry = new AuthorizationEntry();
        entry.setQueue(">");
        entry.setRead("admins");
        entry.setWrite("admins");
        entry.setAdmin("admins");
        authorizationEntries.add(entry);
        entry = new AuthorizationEntry();
        entry.setQueue("USERS.>");
        entry.setRead("users");
        entry.setWrite("users");
        entry.setAdmin("users");
        authorizationEntries.add(entry);
        entry = new AuthorizationEntry();
        entry.setQueue("GUEST.>");
        entry.setRead("guests");
        entry.setWrite("guests,users");
        entry.setAdmin("guests,users");
        authorizationEntries.add(entry);
        entry = new AuthorizationEntry();
        entry.setTopic(">");
        entry.setRead("admins");
        entry.setWrite("admins");
        entry.setAdmin("admins");
        authorizationEntries.add(entry);
        entry = new AuthorizationEntry();
        entry.setTopic("USERS.>");
        entry.setRead("users");
        entry.setWrite("users");
        entry.setAdmin("users");
        authorizationEntries.add(entry);
        entry = new AuthorizationEntry();
        entry.setTopic("GUEST.>");
        entry.setRead("guests");
        entry.setWrite("guests,users");
        entry.setAdmin("guests,users");
        authorizationEntries.add(entry);
        entry = new AuthorizationEntry();
        entry.setTopic("ActiveMQ.Advisory.>");
        entry.setRead("guests,users");
        entry.setWrite("guests,users");
        entry.setAdmin("guests,users");
        authorizationEntries.add(entry);
        TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
        tempEntry.setRead("admins");
        tempEntry.setWrite("admins");
        tempEntry.setAdmin("admins");
        DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
        authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin((AuthorizationMap)authorizationMap);
        return authorizationPlugin;
    }

    protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + name);
        TopicViewMBean proxy = (TopicViewMBean)this.brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true);
        return proxy;
    }
}

