package org.apache.activemq.store.jdbc;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Message;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.ByteSequence;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.class */
public class JDBCCommitExceptionTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCCommitExceptionTest.class);
    protected static final int messagesExpected = 10;
    protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=10");
    protected BrokerService broker;
    protected EmbeddedDataSource dataSource;
    protected Connection dbConnection;
    protected BrokenPersistenceAdapter jdbc;

    public void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
    }

    public void tearDown() throws Exception {
        this.broker.stop();
    }

    public void testSqlException() throws Exception {
        doTestSqlException();
    }

    public void doTestSqlException() throws Exception {
        sendMessages(10);
        int receiveMessages = receiveMessages(10);
        dumpMessages();
        assertEquals("Messages expected doesn't equal messages received", 10, receiveMessages);
        this.broker.stop();
    }

    protected void dumpMessages() throws Exception {
        OpenWireFormat openWireFormat = new OpenWireFormat();
        Connection connection = this.broker.getPersistenceAdapter().getDataSource().getConnection();
        PreparedStatement prepareStatement = connection.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
        ResultSet executeQuery = prepareStatement.executeQuery();
        LOG.info("Messages left in broker after test");
        while (executeQuery.next()) {
            long j = executeQuery.getLong(1);
            Message message = (Message) openWireFormat.unmarshal(new ByteSequence(executeQuery.getBytes(2)));
            LOG.info("id: " + j + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
        }
        prepareStatement.close();
        connection.close();
    }

    protected int receiveMessages(int i) throws Exception {
        javax.jms.Connection createConnection = this.factory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        this.jdbc.setShouldBreak(true);
        receiveMessages(i, createSession);
        this.jdbc.setShouldBreak(false);
        return receiveMessages(i, createSession);
    }

    protected int receiveMessages(int i, Session session) throws Exception {
        int i2 = 0;
        for (int i3 = 0; i3 < i; i3++) {
            MessageConsumer createConsumer = session.createConsumer(session.createQueue("TEST"));
            try {
                try {
                    LOG.debug("Receiving message " + (i2 + 1) + " of " + i);
                    javax.jms.Message receive = createConsumer.receive(2000L);
                    LOG.info("Received : " + receive);
                    if (receive != null) {
                        session.commit();
                        i2++;
                    }
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                } catch (Exception e) {
                    LOG.debug("Caught exception " + e);
                    session.rollback();
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                }
            } catch (Throwable th) {
                if (createConsumer != null) {
                    createConsumer.close();
                }
                throw th;
            }
        }
        return i2;
    }

    protected void sendMessages(int i) throws Exception {
        javax.jms.Connection createConnection = this.factory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("TEST"));
        createProducer.setDeliveryMode(2);
        for (int i2 = 0; i2 < i; i2++) {
            LOG.debug("Sending message " + (i2 + 1) + " of " + i);
            createProducer.send(createSession.createTextMessage("test message " + (i2 + 1)));
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        this.jdbc = new BrokenPersistenceAdapter();
        this.dataSource = new EmbeddedDataSource();
        this.dataSource.setDatabaseName("target/derbyDb");
        this.dataSource.setCreateDatabase("create");
        this.jdbc.setDataSource(this.dataSource);
        this.jdbc.setUseDatabaseLock(false);
        this.jdbc.deleteAllMessages();
        brokerService.setPersistenceAdapter(this.jdbc);
        brokerService.setPersistent(true);
        brokerService.addConnector(NetworkedSyncTest.broker1URL);
        return brokerService;
    }
}
