package org.apache.activemq.broker;

import java.util.Arrays;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/broker/RedeliveryRestartTest.class */
public class RedeliveryRestartTest extends TestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
    ActiveMQConnection connection;
    BrokerService broker = null;
    String queueName = "redeliveryRestartQ";

    @Parameterized.Parameter
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;

    @Parameterized.Parameters(name = "Store={0}")
    public static Iterable<Object[]> data() {
        return Arrays.asList(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB}, new Object[]{TestSupport.PersistenceAdapterChoice.JDBC}, new Object[]{TestSupport.PersistenceAdapterChoice.LevelDB});
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        configureBroker(this.broker);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.broker.stop();
        super.tearDown();
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPersistJMSRedelivered(true);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        setPersistenceAdapter(brokerService, this.persistenceAdapterChoice);
        brokerService.addConnector("tcp://0.0.0.0:0");
    }

    @Test(timeout = 60000)
    public void testValidateRedeliveryFlagAfterRestartNoTx() throws Exception {
        this.connection = new ActiveMQConnectionFactory("failover:(" + ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + ")?jms.prefetchPolicy.all=0").createConnection();
        this.connection.start();
        populateDestination(10, this.queueName, this.connection);
        Session createSession = this.connection.createSession(false, 2);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 5; i++) {
            TextMessage receive = createConsumer.receive(20000L);
            LOG.info("not redelivered? got: " + receive);
            assertNotNull("got the message", receive);
            assertEquals("first delivery", 1L, receive.getLongProperty("JMSXDeliveryCount"));
            assertEquals("not a redelivery", false, receive.getJMSRedelivered());
        }
        createConsumer.close();
        restartBroker();
        ((FailoverTransport) this.connection.getTransport().narrow(FailoverTransport.class)).add(true, ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        for (int i2 = 0; i2 < 5; i2++) {
            TextMessage receive2 = createConsumer2.receive(4000L);
            LOG.info("redelivered? got: " + receive2);
            assertNotNull("got the message again", receive2);
            assertEquals("re delivery flag", true, receive2.getJMSRedelivered());
            assertEquals("redelivery count survives restart", 2L, receive2.getLongProperty("JMSXDeliveryCount"));
            receive2.acknowledge();
        }
        for (int i3 = 0; i3 < 5; i3++) {
            TextMessage receive3 = createConsumer2.receive(20000L);
            LOG.info("not redelivered? got: " + receive3);
            assertNotNull("got the message", receive3);
            assertEquals("not a redelivery", false, receive3.getJMSRedelivered());
            assertEquals("first delivery", 1L, receive3.getLongProperty("JMSXDeliveryCount"));
            receive3.acknowledge();
        }
        this.connection.close();
    }

    @Test(timeout = 60000)
    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
        this.connection = new ActiveMQConnectionFactory("failover:(" + ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + ")?jms.prefetchPolicy.all=0").createConnection();
        this.connection.start();
        populateDestination(10, this.queueName, this.connection);
        Session createSession = this.connection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 5; i++) {
            TextMessage receive = createConsumer.receive(20000L);
            LOG.info("not redelivered? got: " + receive);
            assertNotNull("got the message", receive);
            assertEquals("first delivery", 1L, receive.getLongProperty("JMSXDeliveryCount"));
            assertEquals("not a redelivery", false, receive.getJMSRedelivered());
        }
        createSession.rollback();
        createConsumer.close();
        restartBroker();
        ((FailoverTransport) this.connection.getTransport().narrow(FailoverTransport.class)).add(true, ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        for (int i2 = 0; i2 < 5; i2++) {
            TextMessage receive2 = createConsumer2.receive(4000L);
            LOG.info("redelivered? got: " + receive2);
            assertNotNull("got the message again", receive2);
            assertEquals("redelivery count survives restart", 2L, receive2.getLongProperty("JMSXDeliveryCount"));
            assertEquals("re delivery flag", true, receive2.getJMSRedelivered());
        }
        createSession.commit();
        for (int i3 = 0; i3 < 5; i3++) {
            TextMessage receive3 = createConsumer2.receive(20000L);
            LOG.info("not redelivered? got: " + receive3);
            assertNotNull("got the message", receive3);
            assertEquals("first delivery", 1L, receive3.getLongProperty("JMSXDeliveryCount"));
            assertEquals("not a redelivery", false, receive3.getJMSRedelivered());
        }
        createSession.commit();
        this.connection.close();
    }

    @Test(timeout = 60000)
    public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
        this.connection = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.prefetchPolicy.all=0").createConnection();
        this.connection.start();
        populateDestination(1, this.queueName, this.connection);
        Session createSession = this.connection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        TextMessage receive = createSession.createConsumer(createQueue).receive(5000L);
        LOG.info("got: " + receive);
        assertNotNull("got the message", receive);
        assertEquals("first delivery", 1L, receive.getLongProperty("JMSXDeliveryCount"));
        assertEquals("not a redelivery", false, receive.getJMSRedelivered());
        stopBrokerWithStoreFailure(this.broker, this.persistenceAdapterChoice);
        this.broker = createRestartedBroker();
        this.broker.start();
        this.connection.close();
        this.connection = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString()).createConnection();
        this.connection.start();
        Session createSession2 = this.connection.createSession(true, 0);
        TextMessage receive2 = createSession2.createConsumer(createQueue).receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        assertNotNull("got the message again", receive2);
        assertEquals("redelivery count survives restart", 2L, receive2.getLongProperty("JMSXDeliveryCount"));
        assertEquals("re delivery flag", true, receive2.getJMSRedelivered());
        createSession2.commit();
        this.connection.close();
    }

    private void restartBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = createRestartedBroker();
        this.broker.start();
    }

    private BrokerService createRestartedBroker() throws Exception {
        this.broker = new BrokerService();
        configureBroker(this.broker);
        return this.broker;
    }

    private void populateDestination(int i, String str, Connection connection) throws JMSException {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        for (int i2 = 1; i2 <= i; i2++) {
            createProducer.send(createSession.createTextMessage("<hello id='" + i2 + "'/>"));
        }
        createProducer.close();
        createSession.close();
    }
}
