/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractVmConcurrentDispatchTest {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractVmConcurrentDispatchTest.class);
    private final MessageType messageType;
    private final boolean reduceMemoryFootPrint;
    protected static final boolean[] booleanVals = new boolean[]{true, false};
    protected static boolean[] reduceMemoryFootPrintVals = booleanVals;
    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private BrokerService broker;
    private final AtomicBoolean failure = new AtomicBoolean();
    private CountDownLatch ready;
    private URI connectionURI;
    private URI vmConnectionURI;
    private final boolean USE_VM_TRANSPORT = true;
    private final int NUM_CONSUMERS = 30;
    private final int NUM_PRODUCERS = 1;
    private final int NUM_TASKS = 31;
    private int i = 0;
    private String MessageId = null;
    private int MessageCount = 0;

    public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
        this.messageType = messageType;
        this.reduceMemoryFootPrint = reduceMemoryFootPrint;
    }

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        TransportConnector connector = this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setReduceMemoryFootprint(this.reduceMemoryFootPrint);
        policyMap.setDefaultEntry(defaultPolicy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
        this.configurePersistenceAdapter(this.broker);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.ready = new CountDownLatch(31);
        this.connectionURI = connector.getPublishableConnectURI();
        this.vmConnectionURI = this.broker.getVmConnectorURI();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected abstract void configurePersistenceAdapter(BrokerService var1) throws IOException;

    @Test(timeout=180000L)
    public void testMessagesAreValid() throws Exception {
        int i;
        ExecutorService tasks = Executors.newFixedThreadPool(31);
        for (i = 0; i < 30; ++i) {
            LOG.info("Created Consumer: {}", (Object)(i + 1));
            tasks.execute(new HelloWorldConsumer());
        }
        for (i = 0; i < 1; ++i) {
            LOG.info("Created Producer: {}", (Object)(i + 1));
            tasks.execute(new HelloWorldProducer());
        }
        Assert.assertTrue((boolean)this.ready.await(20L, TimeUnit.SECONDS));
        try {
            tasks.shutdown();
            tasks.awaitTermination(20L, TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertFalse((String)"Test Encountered a null bodied message", (boolean)this.failure.get());
    }

    public URI getBrokerURI() {
        return this.vmConnectionURI;
    }

    public class HelloWorldConsumer
    implements Runnable,
    ExceptionListener {
        String queueName;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(AbstractVmConcurrentDispatchTest.this.getBrokerURI());
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(false, 4);
                HelloWorldConsumer helloWorldConsumer = this;
                synchronized (helloWorldConsumer) {
                    this.queueName = "Consumer.Q" + AbstractVmConcurrentDispatchTest.this.i + ".VirtualTopic.AMQ6218Test";
                    AbstractVmConcurrentDispatchTest.this.i++;
                    LOG.info(this.queueName);
                }
                Queue destination = session.createQueue(this.queueName);
                MessageConsumer consumer = session.createConsumer((Destination)destination);
                AbstractVmConcurrentDispatchTest.this.ready.countDown();
                while (!AbstractVmConcurrentDispatchTest.this.failure.get()) {
                    Message message = consumer.receive(500L);
                    if (message == null) continue;
                    HelloWorldConsumer helloWorldConsumer2 = this;
                    synchronized (helloWorldConsumer2) {
                        if (AbstractVmConcurrentDispatchTest.this.MessageId != null) {
                            if (message.getJMSMessageID().equalsIgnoreCase(AbstractVmConcurrentDispatchTest.this.MessageId)) {
                                AbstractVmConcurrentDispatchTest.this.MessageCount++;
                            } else {
                                LOG.info("Count of message " + AbstractVmConcurrentDispatchTest.this.MessageId + " is " + AbstractVmConcurrentDispatchTest.this.MessageCount);
                                AbstractVmConcurrentDispatchTest.this.MessageCount = 1;
                                AbstractVmConcurrentDispatchTest.this.MessageId = message.getJMSMessageID();
                            }
                        } else {
                            AbstractVmConcurrentDispatchTest.this.MessageId = message.getJMSMessageID();
                            AbstractVmConcurrentDispatchTest.this.MessageCount = 1;
                        }
                    }
                    String text = null;
                    if (AbstractVmConcurrentDispatchTest.this.messageType.equals((Object)MessageType.OBJECT) && message instanceof ObjectMessage) {
                        ObjectMessage objectMessage = (ObjectMessage)message;
                        text = (String)((Object)objectMessage.getObject());
                    } else if (AbstractVmConcurrentDispatchTest.this.messageType.equals((Object)MessageType.TEXT) && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage)message;
                        text = textMessage.getText();
                    } else if (AbstractVmConcurrentDispatchTest.this.messageType.equals((Object)MessageType.MAP) && message instanceof MapMessage) {
                        MapMessage mapMessage = (MapMessage)message;
                        text = mapMessage.getString("text");
                    } else {
                        LOG.info(this.queueName + " Message is not a instanceof " + (Object)((Object)AbstractVmConcurrentDispatchTest.this.messageType) + " message id: " + message.getJMSMessageID() + message);
                    }
                    if (text == null) {
                        LOG.warn(this.queueName + " text received as a null " + message);
                        AbstractVmConcurrentDispatchTest.this.failure.set(true);
                    } else {
                        LOG.info(this.queueName + " text " + text + " message id: " + message.getJMSMessageID());
                    }
                    message.acknowledge();
                }
                connection.close();
            }
            catch (Exception e) {
                LOG.error("Caught: ", (Throwable)e);
            }
        }

        public synchronized void onException(JMSException ex) {
            LOG.error("JMS Exception occurred.  Shutting down client.");
        }
    }

    public class HelloWorldProducer
    implements Runnable {
        @Override
        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(AbstractVmConcurrentDispatchTest.this.getBrokerURI());
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(false, 1);
                Topic destination = session.createTopic("VirtualTopic.AMQ6218Test");
                MessageProducer producer = session.createProducer((Destination)destination);
                LOG.info("Producer: {}", (Object)destination);
                producer.setDeliveryMode(2);
                producer.setPriority(4);
                producer.setTimeToLive(0L);
                AbstractVmConcurrentDispatchTest.this.ready.countDown();
                int j = 0;
                while (!AbstractVmConcurrentDispatchTest.this.failure.get()) {
                    String text = "AMQ Message Number :" + ++j;
                    MapMessage message = null;
                    if (AbstractVmConcurrentDispatchTest.this.messageType.equals((Object)MessageType.MAP)) {
                        MapMessage mapMessage = session.createMapMessage();
                        mapMessage.setString("text", text);
                        message = mapMessage;
                    } else if (AbstractVmConcurrentDispatchTest.this.messageType.equals((Object)MessageType.OBJECT)) {
                        ObjectMessage objectMessage = session.createObjectMessage();
                        objectMessage.setObject((Serializable)((Object)text));
                        message = objectMessage;
                    } else {
                        message = session.createTextMessage(text);
                    }
                    producer.send((Message)message);
                    LOG.info("Sent message: {}", (Object)message.getJMSMessageID());
                }
                connection.close();
            }
            catch (Exception e) {
                LOG.error("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    protected static enum MessageType {
        TEXT,
        MAP,
        OBJECT;

    }
}

