package org.apache.activemq.bugs;

import java.io.File;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/bugs/AMQ4952Test.class */
public class AMQ4952Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class);
    protected static final int MESSAGE_COUNT = 1;
    protected BrokerService consumerBroker;
    protected BrokerService producerBroker;
    protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store");
    private final CountDownLatch stopConsumerBroker = new CountDownLatch(1);
    private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1);
    private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1);
    private EmbeddedDataSource localDataSource;

    @Parameterized.Parameter(0)
    public boolean enableCursorAudit;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4952Test$MyTestBroker.class */
    class MyTestBroker extends BrokerFilter {
        public MyTestBroker(Broker broker) {
            super(broker);
        }

        public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
            super.send(producerBrokerExchange, message);
            AMQ4952Test.LOG.error("Stopping broker on send:  " + message.getMessageId().getProducerSequenceId());
            AMQ4952Test.this.stopConsumerBroker.countDown();
            producerBrokerExchange.getConnectionContext().setDontSendReponse(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4952Test$MyTestPlugin.class */
    public class MyTestPlugin implements BrokerPlugin {
        MyTestPlugin() {
        }

        public Broker installPlugin(Broker broker) throws Exception {
            return new MyTestBroker(broker);
        }
    }

    @Parameterized.Parameters(name = "enableAudit={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE});
    }

    @BeforeClass
    public static void dbHomeSysProp() throws Exception {
        System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
    }

    @Test
    public void testConsumerBrokerRestart() throws Exception {
        Callable callable = new Callable() { // from class: org.apache.activemq.bugs.AMQ4952Test.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                int i = 0;
                Connection createConnection = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false").createConnection();
                try {
                    createConnection.setClientID("consumer");
                    createConnection.start();
                    MessageConsumer createConsumer = createConnection.createSession(false, 2).createConsumer(AMQ4952Test.this.QUEUE_NAME);
                    while (true) {
                        TextMessage receive = createConsumer.receive(5000L);
                        if (receive == null) {
                            Integer valueOf = Integer.valueOf(i);
                            createConnection.close();
                            return valueOf;
                        }
                        i++;
                        AMQ4952Test.LOG.info("*** receivedMessageCount {} message has MessageID {} ", Integer.valueOf(i), receive.getJMSMessageID());
                        if (receive.getJMSMessageID().endsWith("1") && i == 1) {
                            AMQ4952Test.LOG.info("Waiting for restart...");
                            AMQ4952Test.this.consumerRestartedAndMessageForwarded.await(90L, TimeUnit.SECONDS);
                        }
                        receive.acknowledge();
                    }
                } catch (Throwable th) {
                    createConnection.close();
                    throw th;
                }
            }
        };
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.bugs.AMQ4952Test.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AMQ4952Test.this.stopConsumerBroker.await();
                    AMQ4952Test.LOG.info("********* STOPPING CONSUMER BROKER");
                    AMQ4952Test.this.consumerBroker.stop();
                    AMQ4952Test.this.consumerBroker.waitUntilStopped();
                    AMQ4952Test.LOG.info("***** STARTING CONSUMER BROKER");
                    AMQ4952Test.this.consumerBroker = AMQ4952Test.this.createConsumerBroker(false);
                    AMQ4952Test.LOG.info("***** CONSUMER BROKER STARTED!!");
                    AMQ4952Test.this.consumerBrokerRestarted.countDown();
                    Assert.assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4952Test.2.1
                        public boolean isSatisified() throws Exception {
                            AMQ4952Test.LOG.info("ProducerBroker totalMessageCount: " + AMQ4952Test.this.producerBroker.getAdminView().getTotalMessageCount());
                            return AMQ4952Test.this.producerBroker.getAdminView().getTotalMessageCount() == 0;
                        }
                    }));
                    AMQ4952Test.this.consumerRestartedAndMessageForwarded.countDown();
                } catch (Exception e) {
                    AMQ4952Test.LOG.error("Exception when stopping/starting the consumerBroker ", e);
                }
            }
        };
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        newFixedThreadPool.execute(runnable);
        Future submit = newFixedThreadPool.submit(callable);
        produceMessages();
        int intValue = ((Integer) submit.get()).intValue();
        StringBuffer stringBuffer = new StringBuffer();
        boolean isMessageInJDBCStore = isMessageInJDBCStore(this.localDataSource, stringBuffer);
        LOG.debug("****number of messages received " + intValue);
        Assert.assertEquals("number of messages received", 2L, intValue);
        Assert.assertEquals("messages left in store", true, Boolean.valueOf(isMessageInJDBCStore));
        Assert.assertTrue("message is in dlq: " + stringBuffer.toString(), stringBuffer.toString().contains("DLQ"));
    }

    private void produceMessages() throws JMSException {
        Connection createConnection = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false").createConnection();
        try {
            createConnection.setClientID("producer");
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(this.QUEUE_NAME);
            for (int i = 0; 1 > i; i++) {
                createProducer.send(createSession.createTextMessage("test msg " + i));
            }
        } finally {
            createConnection.close();
        }
    }

    @Before
    public void setUp() throws Exception {
        LOG.debug("Running with enableCursorAudit set to {}", Boolean.valueOf(this.enableCursorAudit));
        doSetUp();
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    protected void doTearDown() throws Exception {
        try {
            this.producerBroker.stop();
        } catch (Exception e) {
        }
        try {
            this.consumerBroker.stop();
        } catch (Exception e2) {
        }
    }

    protected void doSetUp() throws Exception {
        this.producerBroker = createProducerBroker();
        this.consumerBroker = createConsumerBroker(true);
    }

    protected BrokerService createProducerBroker() throws Exception {
        String[] strArr = {"2006"};
        HashMap hashMap = new HashMap();
        hashMap.put("networkTTL", "10");
        hashMap.put("conduitSubscriptions", "true");
        hashMap.put("decreaseNetworkConsumerPriority", "true");
        hashMap.put("dynamicOnly", "true");
        BrokerService brokerService = new BrokerService();
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setBrokerName("BP");
        brokerService.setAdvisorySupport(false);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:2003"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConnector);
        brokerService.setTransportConnectors(arrayList);
        if (strArr != null && strArr.length > 0) {
            NetworkConnector addNetworkConnector = brokerService.addNetworkConnector(new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false").toString());
            if (hashMap != null) {
                IntrospectionSupport.setProperties(addNetworkConnector, hashMap);
            }
            addNetworkConnector.setStaticallyIncludedDestinations(Arrays.asList(this.QUEUE_NAME));
        }
        JDBCPersistenceAdapter jDBCPersistenceAdapter = new JDBCPersistenceAdapter();
        EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource();
        embeddedDataSource.setDatabaseName("target/derbyDBRemoteBroker");
        embeddedDataSource.setCreateDatabase("create");
        jDBCPersistenceAdapter.setDataSource(embeddedDataSource);
        brokerService.setPersistenceAdapter(jDBCPersistenceAdapter);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setEnableAudit(false);
        policyEntry.setUseCache(false);
        policyEntry.setExpireMessagesPeriod(0L);
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policyEntry.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.start();
        brokerService.waitUntilStarted();
        return brokerService;
    }

    protected BrokerService createConsumerBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.setDeleteAllMessagesOnStartup(z);
        brokerService.setBrokerName("BC");
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:2006"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConnector);
        brokerService.setTransportConnectors(arrayList);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setEnableAudit(this.enableCursorAudit);
        policyEntry.setExpireMessagesPeriod(0L);
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policyEntry.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        JDBCPersistenceAdapter jDBCPersistenceAdapter = new JDBCPersistenceAdapter();
        EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource();
        embeddedDataSource.setDatabaseName("target/derbyDBLocalBroker");
        embeddedDataSource.setCreateDatabase("create");
        jDBCPersistenceAdapter.setDataSource(embeddedDataSource);
        brokerService.setPersistenceAdapter(jDBCPersistenceAdapter);
        if (z) {
            brokerService.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
        }
        this.localDataSource = embeddedDataSource;
        brokerService.start();
        brokerService.waitUntilStarted();
        return brokerService;
    }

    private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
        boolean z = false;
        java.sql.Connection connection = dataSource.getConnection();
        PreparedStatement prepareStatement = connection.prepareStatement("select * from ACTIVEMQ_MSGS");
        ResultSet resultSet = null;
        try {
            StringBuffer stringBuffer2 = new StringBuffer();
            resultSet = prepareStatement.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            for (int i = 1; i <= metaData.getColumnCount(); i++) {
                if (i == 1) {
                    stringBuffer2.append("||");
                }
                stringBuffer2.append(metaData.getColumnName(i) + "||");
            }
            LOG.error(stringBuffer2.toString());
            while (resultSet.next()) {
                z = true;
                for (int i2 = 1; i2 <= metaData.getColumnCount(); i2++) {
                    if (i2 == 1) {
                        stringBuffer.append("|");
                    }
                    stringBuffer.append(resultSet.getString(i2) + "|");
                }
                LOG.error(stringBuffer.toString());
            }
            try {
                resultSet.close();
            } catch (Throwable th) {
            }
            try {
                prepareStatement.close();
            } catch (Throwable th2) {
            }
            connection.close();
            return z;
        } catch (Throwable th3) {
            try {
                resultSet.close();
            } catch (Throwable th4) {
            }
            try {
                prepareStatement.close();
            } catch (Throwable th5) {
            }
            connection.close();
            throw th3;
        }
    }
}
