/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DestinationGCStressTest {
    protected static final Logger logger = LoggerFactory.getLogger(DestinationGCStressTest.class);
    private BrokerService brokerService;

    @Before
    public void setUp() throws Exception {
        this.brokerService = this.createBroker();
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setSchedulePeriodForDestinationPurge(1);
        broker.setMaxPurgedDestinationsPerSweep(100);
        broker.setAdvisorySupport(false);
        PolicyEntry entry = new PolicyEntry();
        entry.setGcInactiveDestinations(true);
        entry.setInactiveTimeoutBeforeGC(1L);
        PolicyMap map = new PolicyMap();
        map.setDefaultEntry(entry);
        broker.setDestinationPolicy(map);
        return broker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testClashWithPublishAndGC() throws Exception {
        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(RegionBroker.class);
        final AtomicBoolean failed = new AtomicBoolean(false);
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel().equals((Object)Level.ERROR) && event.getMessage().toString().startsWith("Failed to remove inactive")) {
                    logger.info("received unexpected log message: " + event.getMessage());
                    failed.set(true);
                }
            }
        };
        log4jLogger.addAppender((Appender)appender);
        try {
            final AtomicInteger max = new AtomicInteger(20000);
            final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
            factory.setWatchTopicAdvisories(false);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, 1);
            MessageConsumer messageConsumer = session.createConsumer((javax.jms.Destination)new ActiveMQTopic(">"));
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; ++i) {
                executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            int j;
                            Connection c = factory.createConnection();
                            c.start();
                            Session s = c.createSession(false, 1);
                            MessageProducer producer = s.createProducer(null);
                            TextMessage message = s.createTextMessage();
                            while ((j = max.decrementAndGet()) > 0) {
                                producer.send((javax.jms.Destination)new ActiveMQTopic("A." + j), (Message)message);
                            }
                        }
                        catch (Exception ignored) {
                            ignored.printStackTrace();
                        }
                    }
                });
            }
            executorService.shutdown();
            executorService.awaitTermination(60L, TimeUnit.SECONDS);
            logger.info("Done");
            connection.close();
        }
        finally {
            log4jLogger.removeAppender((Appender)appender);
        }
        Assert.assertFalse((String)"failed on unexpected log event", (boolean)failed.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAddRemoveWildcardWithGc() throws Exception {
        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(RegionBroker.class);
        final AtomicBoolean failed = new AtomicBoolean(false);
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel().equals((Object)Level.ERROR) && event.getMessage().toString().startsWith("Failed to remove inactive")) {
                    logger.info("received unexpected log message: " + event.getMessage());
                    failed.set(true);
                }
            }
        };
        log4jLogger.addAppender((Appender)appender);
        try {
            final AtomicInteger max = new AtomicInteger(10000);
            final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
            factory.setWatchTopicAdvisories(false);
            Connection connection = factory.createConnection();
            connection.start();
            final Session session = connection.createSession(false, 1);
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; ++i) {
                executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            int j;
                            Connection c = factory.createConnection();
                            c.start();
                            Session s = c.createSession(false, 1);
                            MessageProducer producer = s.createProducer(null);
                            TextMessage message = s.createTextMessage();
                            while ((j = max.decrementAndGet()) > 0) {
                                producer.send((javax.jms.Destination)new ActiveMQTopic("A." + j), (Message)message);
                            }
                        }
                        catch (Exception ignored) {
                            ignored.printStackTrace();
                        }
                    }
                });
            }
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    for (int i = 0; i < 1000; ++i) {
                        try {
                            MessageConsumer messageConsumer = session.createConsumer((javax.jms.Destination)new ActiveMQTopic(">"));
                            messageConsumer.close();
                            continue;
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                }
            });
            executorService.shutdown();
            executorService.awaitTermination(60L, TimeUnit.SECONDS);
            logger.info("Done");
            Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    int len = ((RegionBroker)DestinationGCStressTest.this.brokerService.getRegionBroker()).getTopicRegion().getDestinationMap().size();
                    logger.info("Num topics: " + len);
                    return len == 0;
                }
            });
            connection.close();
        }
        finally {
            log4jLogger.removeAppender((Appender)appender);
        }
        Assert.assertFalse((String)"failed on unexpected log event", (boolean)failed.get());
    }

    @Test(timeout=60000L)
    public void testAllDestsSeeSub() throws Exception {
        final AtomicInteger foundDestWithMissingSub = new AtomicInteger(0);
        final AtomicInteger max = new AtomicInteger(20000);
        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        factory.setWatchTopicAdvisories(false);
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(false, 1);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1; ++i) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        int j;
                        Connection c = factory.createConnection();
                        c.start();
                        Session s = c.createSession(false, 1);
                        MessageProducer producer = s.createProducer(null);
                        TextMessage message = s.createTextMessage();
                        while ((j = max.decrementAndGet()) > 0) {
                            producer.send((javax.jms.Destination)new ActiveMQTopic("A." + j), (Message)message);
                        }
                    }
                    catch (Exception ignored) {
                        ignored.printStackTrace();
                    }
                }
            });
        }
        executorService.submit(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 1000; ++i) {
                    try {
                        MessageConsumer messageConsumer = session.createConsumer((javax.jms.Destination)new ActiveMQTopic(">"));
                        if (DestinationGCStressTest.this.destMissingSub(foundDestWithMissingSub)) break;
                        messageConsumer.close();
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        executorService.shutdown();
        executorService.awaitTermination(60L, TimeUnit.SECONDS);
        connection.close();
        Assert.assertEquals((String)"no dests missing sub", (long)0L, (long)foundDestWithMissingSub.get());
    }

    private boolean destMissingSub(AtomicInteger tally) {
        for (Destination destination : ((RegionBroker)this.brokerService.getRegionBroker()).getTopicRegion().getDestinationMap().values()) {
            if (!destination.getConsumers().isEmpty()) continue;
            tally.incrementAndGet();
            return true;
        }
        return false;
    }
}

