/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Message;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.BrokenPersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCCommitExceptionTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCCommitExceptionTest.class);
    protected static final int messagesExpected = 10;
    protected ActiveMQConnectionFactory factory;
    protected BrokerService broker;
    protected String connectionUri;
    protected BrokenPersistenceAdapter jdbc;

    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.factory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + 10);
    }

    public void tearDown() throws Exception {
        this.broker.stop();
    }

    public void testSqlException() throws Exception {
        this.doTestSqlException();
    }

    public void doTestSqlException() throws Exception {
        this.sendMessages(10);
        int messagesReceived = this.receiveMessages(10);
        this.dumpMessages();
        JDBCCommitExceptionTest.assertEquals((String)"Messages expected doesn't equal messages received", (int)10, (int)messagesReceived);
        this.broker.stop();
    }

    protected void dumpMessages() throws Exception {
        OpenWireFormat wireFormat = new OpenWireFormat();
        java.sql.Connection conn = ((JDBCPersistenceAdapter)this.broker.getPersistenceAdapter()).getDataSource().getConnection();
        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
        ResultSet result = statement.executeQuery();
        LOG.info("Messages left in broker after test");
        while (result.next()) {
            long id = result.getLong(1);
            Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
            LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
        }
        statement.close();
        conn.close();
    }

    protected int receiveMessages(int messagesExpected) throws Exception {
        Connection connection = this.factory.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        this.jdbc.setShouldBreak(true);
        this.receiveMessages(messagesExpected, session);
        this.jdbc.setShouldBreak(false);
        return this.receiveMessages(messagesExpected, session);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int receiveMessages(int messagesExpected, Session session) throws Exception {
        int messagesReceived = 0;
        for (int i = 0; i < messagesExpected; ++i) {
            Queue destination = session.createQueue("TEST");
            javax.jms.Message message = null;
            try (MessageConsumer consumer = session.createConsumer((Destination)destination);){
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                if (message == null) continue;
                session.commit();
                ++messagesReceived;
                continue;
            }
        }
        return messagesReceived;
    }

    protected void sendMessages(int messagesExpected) throws Exception {
        Connection connection = this.factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue("TEST");
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.setDeliveryMode(2);
        for (int i = 0; i < messagesExpected; ++i) {
            LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected);
            producer.send((javax.jms.Message)session.createTextMessage("test message " + (i + 1)));
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        this.jdbc = new BrokenPersistenceAdapter();
        this.jdbc.setUseLock(false);
        this.jdbc.deleteAllMessages();
        broker.setPersistenceAdapter((PersistenceAdapter)this.jdbc);
        broker.setPersistent(true);
        this.connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
        return broker;
    }
}

