package org.apache.activemq.broker;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/RedeliveryRestartTest.class */
public class RedeliveryRestartTest extends BrokerRestartTestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.BrokerRestartTestSupport
    public void configureBroker(BrokerService brokerService) throws Exception {
        super.configureBroker(brokerService);
        KahaDBPersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
        persistenceAdapter.setRewriteOnRedelivery(true);
        persistenceAdapter.setCleanupInterval(500L);
        brokerService.addConnector("tcp://0.0.0.0:0");
    }

    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory("failover:(" + ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + ")?jms.transactedIndividualAck=true").createConnection();
        createConnection.start();
        populateDestination(10, this.queueName, createConnection);
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 5; i++) {
            TextMessage receive = createConsumer.receive(20000L);
            LOG.info("not redelivered? got: " + receive);
            assertNotNull("got the message", receive);
            assertEquals("first delivery", 1L, receive.getLongProperty("JMSXDeliveryCount"));
            assertEquals("not a redelivery", false, receive.getJMSRedelivered());
        }
        createSession.rollback();
        createConsumer.close();
        restartBroker();
        ((FailoverTransport) createConnection.getTransport().narrow(FailoverTransport.class)).add(true, ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        for (int i2 = 0; i2 < 5; i2++) {
            TextMessage receive2 = createConsumer2.receive(4000L);
            LOG.info("redelivered? got: " + receive2);
            assertNotNull("got the message again", receive2);
            assertEquals("redelivery count survives restart", 2L, receive2.getLongProperty("JMSXDeliveryCount"));
            assertEquals("re delivery flag", true, receive2.getJMSRedelivered());
        }
        createSession.commit();
        for (int i3 = 0; i3 < 5; i3++) {
            TextMessage receive3 = createConsumer2.receive(20000L);
            LOG.info("not redelivered? got: " + receive3);
            assertNotNull("got the message", receive3);
            assertEquals("first delivery", 1L, receive3.getLongProperty("JMSXDeliveryCount"));
            assertEquals("not a redelivery", false, receive3.getJMSRedelivered());
        }
        createSession.commit();
        createConnection.close();
    }

    public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.transactedIndividualAck=true").createConnection();
        createConnection.start();
        populateDestination(1, this.queueName, createConnection);
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(this.queueName);
        TextMessage receive = createSession.createConsumer(createQueue).receive(20000L);
        LOG.info("got: " + receive);
        assertNotNull("got the message", receive);
        assertEquals("first delivery", 1L, receive.getLongProperty("JMSXDeliveryCount"));
        assertEquals("not a redelivery", false, receive.getJMSRedelivered());
        this.broker.getPersistenceAdapter().getStore().getJournal().close();
        this.broker.waitUntilStopped();
        this.broker = createRestartedBroker();
        this.broker.start();
        ActiveMQConnection createConnection2 = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.transactedIndividualAck=true").createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(true, 0);
        TextMessage receive2 = createSession2.createConsumer(createQueue).receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
        assertNotNull("got the message again", receive2);
        assertEquals("redelivery count survives restart", 2L, receive2.getLongProperty("JMSXDeliveryCount"));
        assertEquals("re delivery flag", true, receive2.getJMSRedelivered());
        createSession2.commit();
        createConnection2.close();
    }

    private void populateDestination(int i, String str, Connection connection) throws JMSException {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        for (int i2 = 1; i2 <= i; i2++) {
            createProducer.send(createSession.createTextMessage("<hello id='" + i2 + "'/>"));
        }
        createProducer.close();
        createSession.close();
    }

    public static Test suite() {
        return suite(RedeliveryRestartTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }
}
