package org.apache.activemq.store;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.usecases.DurableSubSelectorDelayTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/AbstractVmConcurrentDispatchTest.class */
public abstract class AbstractVmConcurrentDispatchTest {
    private final MessageType messageType;
    private final boolean reduceMemoryFootPrint;
    private BrokerService broker;
    private CountDownLatch ready;
    private URI connectionURI;
    private URI vmConnectionURI;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractVmConcurrentDispatchTest.class);
    protected static final boolean[] booleanVals = {true, false};
    protected static boolean[] reduceMemoryFootPrintVals = booleanVals;

    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private final AtomicBoolean failure = new AtomicBoolean();
    private final boolean USE_VM_TRANSPORT = true;
    private final int NUM_CONSUMERS = 30;
    private final int NUM_PRODUCERS = 1;
    private final int NUM_TASKS = 31;
    private int i = 0;
    private String MessageId = null;
    private int MessageCount = 0;

    /* loaded from: input_file:org/apache/activemq/store/AbstractVmConcurrentDispatchTest$HelloWorldConsumer.class */
    public class HelloWorldConsumer implements Runnable, ExceptionListener {
        String queueName;

        public HelloWorldConsumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Connection createConnection = new ActiveMQConnectionFactory(AbstractVmConcurrentDispatchTest.this.getBrokerURI()).createConnection();
                createConnection.start();
                Session createSession = createConnection.createSession(false, 4);
                synchronized (this) {
                    this.queueName = "Consumer.Q" + AbstractVmConcurrentDispatchTest.this.i + ".VirtualTopic.AMQ6218Test";
                    AbstractVmConcurrentDispatchTest.access$408(AbstractVmConcurrentDispatchTest.this);
                    AbstractVmConcurrentDispatchTest.LOG.info(this.queueName);
                }
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.queueName));
                AbstractVmConcurrentDispatchTest.this.ready.countDown();
                while (!AbstractVmConcurrentDispatchTest.this.failure.get()) {
                    ObjectMessage receive = createConsumer.receive(500L);
                    if (receive != null) {
                        synchronized (this) {
                            if (AbstractVmConcurrentDispatchTest.this.MessageId == null) {
                                AbstractVmConcurrentDispatchTest.this.MessageId = receive.getJMSMessageID();
                                AbstractVmConcurrentDispatchTest.this.MessageCount = 1;
                            } else if (receive.getJMSMessageID().equalsIgnoreCase(AbstractVmConcurrentDispatchTest.this.MessageId)) {
                                AbstractVmConcurrentDispatchTest.access$608(AbstractVmConcurrentDispatchTest.this);
                            } else {
                                AbstractVmConcurrentDispatchTest.LOG.info("Count of message " + AbstractVmConcurrentDispatchTest.this.MessageId + " is " + AbstractVmConcurrentDispatchTest.this.MessageCount);
                                AbstractVmConcurrentDispatchTest.this.MessageCount = 1;
                                AbstractVmConcurrentDispatchTest.this.MessageId = receive.getJMSMessageID();
                            }
                        }
                        String str = null;
                        if (AbstractVmConcurrentDispatchTest.this.messageType.equals(MessageType.OBJECT) && (receive instanceof ObjectMessage)) {
                            str = (String) receive.getObject();
                        } else if (AbstractVmConcurrentDispatchTest.this.messageType.equals(MessageType.TEXT) && (receive instanceof TextMessage)) {
                            str = ((TextMessage) receive).getText();
                        } else if (AbstractVmConcurrentDispatchTest.this.messageType.equals(MessageType.MAP) && (receive instanceof MapMessage)) {
                            str = ((MapMessage) receive).getString("text");
                        } else {
                            AbstractVmConcurrentDispatchTest.LOG.info(this.queueName + " Message is not a instanceof " + AbstractVmConcurrentDispatchTest.this.messageType + " message id: " + receive.getJMSMessageID() + receive);
                        }
                        if (str == null) {
                            AbstractVmConcurrentDispatchTest.LOG.warn(this.queueName + " text received as a null " + receive);
                            AbstractVmConcurrentDispatchTest.this.failure.set(true);
                        } else {
                            AbstractVmConcurrentDispatchTest.LOG.info(this.queueName + " text " + str + " message id: " + receive.getJMSMessageID());
                        }
                        receive.acknowledge();
                    }
                }
                createConnection.close();
            } catch (Exception e) {
                AbstractVmConcurrentDispatchTest.LOG.error("Caught: ", e);
            }
        }

        public synchronized void onException(JMSException jMSException) {
            AbstractVmConcurrentDispatchTest.LOG.error("JMS Exception occurred.  Shutting down client.");
        }
    }

    /* loaded from: input_file:org/apache/activemq/store/AbstractVmConcurrentDispatchTest$HelloWorldProducer.class */
    public class HelloWorldProducer implements Runnable {
        public HelloWorldProducer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            MapMessage createTextMessage;
            try {
                Connection createConnection = new ActiveMQConnectionFactory(AbstractVmConcurrentDispatchTest.this.getBrokerURI()).createConnection();
                createConnection.start();
                Session createSession = createConnection.createSession(false, 1);
                Topic createTopic = createSession.createTopic("VirtualTopic.AMQ6218Test");
                MessageProducer createProducer = createSession.createProducer(createTopic);
                AbstractVmConcurrentDispatchTest.LOG.info("Producer: {}", createTopic);
                createProducer.setDeliveryMode(2);
                createProducer.setPriority(4);
                createProducer.setTimeToLive(0L);
                AbstractVmConcurrentDispatchTest.this.ready.countDown();
                int i = 0;
                while (!AbstractVmConcurrentDispatchTest.this.failure.get()) {
                    i++;
                    String str = "AMQ Message Number :" + i;
                    if (AbstractVmConcurrentDispatchTest.this.messageType.equals(MessageType.MAP)) {
                        MapMessage createMapMessage = createSession.createMapMessage();
                        createMapMessage.setString("text", str);
                        createTextMessage = createMapMessage;
                    } else if (AbstractVmConcurrentDispatchTest.this.messageType.equals(MessageType.OBJECT)) {
                        MapMessage createObjectMessage = createSession.createObjectMessage();
                        createObjectMessage.setObject(str);
                        createTextMessage = createObjectMessage;
                    } else {
                        createTextMessage = createSession.createTextMessage(str);
                    }
                    createProducer.send(createTextMessage);
                    AbstractVmConcurrentDispatchTest.LOG.info("Sent message: {}", createTextMessage.getJMSMessageID());
                }
                createConnection.close();
            } catch (Exception e) {
                AbstractVmConcurrentDispatchTest.LOG.error("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/store/AbstractVmConcurrentDispatchTest$MessageType.class */
    protected enum MessageType {
        TEXT,
        MAP,
        OBJECT
    }

    public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean z) {
        this.messageType = messageType;
        this.reduceMemoryFootPrint = z;
    }

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        TransportConnector addConnector = this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setReduceMemoryFootprint(this.reduceMemoryFootPrint);
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
        configurePersistenceAdapter(this.broker);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.ready = new CountDownLatch(31);
        this.connectionURI = addConnector.getPublishableConnectURI();
        this.vmConnectionURI = this.broker.getVmConnectorURI();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected abstract void configurePersistenceAdapter(BrokerService brokerService) throws IOException;

    @Test(timeout = DurableSubSelectorDelayTest.RUNTIME)
    public void testMessagesAreValid() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(31);
        for (int i = 0; i < 30; i++) {
            LOG.info("Created Consumer: {}", Integer.valueOf(i + 1));
            newFixedThreadPool.execute(new HelloWorldConsumer());
        }
        for (int i2 = 0; i2 < 1; i2++) {
            LOG.info("Created Producer: {}", Integer.valueOf(i2 + 1));
            newFixedThreadPool.execute(new HelloWorldProducer());
        }
        Assert.assertTrue(this.ready.await(20L, TimeUnit.SECONDS));
        try {
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(20L, TimeUnit.SECONDS);
        } catch (Exception e) {
        }
        Assert.assertFalse("Test Encountered a null bodied message", this.failure.get());
    }

    public URI getBrokerURI() {
        return this.vmConnectionURI;
    }

    static /* synthetic */ int access$408(AbstractVmConcurrentDispatchTest abstractVmConcurrentDispatchTest) {
        int i = abstractVmConcurrentDispatchTest.i;
        abstractVmConcurrentDispatchTest.i = i + 1;
        return i;
    }

    static /* synthetic */ int access$608(AbstractVmConcurrentDispatchTest abstractVmConcurrentDispatchTest) {
        int i = abstractVmConcurrentDispatchTest.MessageCount;
        abstractVmConcurrentDispatchTest.MessageCount = i + 1;
        return i;
    }
}
