/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JmsTransactionCommitFailureTest {
    private static final Log LOGGER = LogFactory.getLog(JmsTransactionCommitFailureTest.class);
    private static final String OUTPUT_DIR = "target/" + JmsTransactionCommitFailureTest.class.getSimpleName();
    private Properties originalSystemProps;
    private DataSource dataSource;
    private CommitFailurePersistenceAdapter persistenceAdapter;
    private BrokerService broker;
    private ConnectionFactory connectionFactory;
    private int messageCounter = 1;

    @Before
    public void setUp() throws Exception {
        this.originalSystemProps = System.getProperties();
        Properties systemProps = (Properties)this.originalSystemProps.clone();
        systemProps.setProperty("derby.stream.error.file", OUTPUT_DIR + "/derby.log");
        System.setProperties(systemProps);
        this.dataSource = this.createDataSource();
        this.persistenceAdapter = new CommitFailurePersistenceAdapter(this.dataSource);
        this.broker = this.createBroker((PersistenceAdapter)this.persistenceAdapter);
        this.broker.start();
        this.connectionFactory = this.createConnectionFactory(this.broker.getBrokerName());
    }

    private DataSource createDataSource() {
        EmbeddedDataSource dataSource = new EmbeddedDataSource();
        dataSource.setDatabaseName(OUTPUT_DIR + "/derby-db");
        dataSource.setCreateDatabase("create");
        return dataSource;
    }

    private BrokerService createBroker(PersistenceAdapter persistenceAdapter) throws IOException {
        String brokerName = JmsTransactionCommitFailureTest.class.getSimpleName();
        BrokerService broker = new BrokerService();
        broker.setDataDirectory(OUTPUT_DIR + "/activemq");
        broker.setBrokerName(brokerName);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setAdvisorySupport(false);
        broker.setUseJmx(false);
        if (persistenceAdapter != null) {
            broker.setPersistent(true);
            broker.setPersistenceAdapter(persistenceAdapter);
        }
        return broker;
    }

    private ConnectionFactory createConnectionFactory(String brokerName) {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
        factory.setWatchTopicAdvisories(false);
        return factory;
    }

    private void stopDataSource() {
        if (this.dataSource instanceof EmbeddedDataSource) {
            EmbeddedDataSource derbyDataSource = (EmbeddedDataSource)this.dataSource;
            derbyDataSource.setShutdownDatabase("shutdown");
            try {
                derbyDataSource.getConnection();
            }
            catch (SQLException sQLException) {
                // empty catch block
            }
        }
    }

    private void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @After
    public void tearDown() throws Exception {
        try {
            this.stopBroker();
            this.stopDataSource();
        }
        finally {
            System.setProperties(this.originalSystemProps);
        }
    }

    @Test
    public void testJmsTransactionCommitFailure() throws Exception {
        String queueName = "testJmsTransactionCommitFailure";
        this.sendMessage(queueName, 1);
        Assert.assertEquals((long)1L, (long)this.getMessageCount());
        this.persistenceAdapter.setCommitFailureEnabled(true);
        try {
            LOGGER.warn((Object)"Attempt to send Message-2/Message-3 (first time)...");
            this.sendMessage(queueName, 2);
            LOGGER.warn((Object)"Message-2/Message-3 successfuly sent (first time)");
            Assert.fail();
        }
        catch (JMSException jmse) {
            LOGGER.warn((Object)"Attempt to send Message-2/Message-3 failed", (Throwable)jmse);
            this.messageCounter -= 2;
            Assert.assertEquals((long)1L, (long)this.getMessageCount());
        }
        this.persistenceAdapter.setCommitFailureEnabled(false);
        LOGGER.warn((Object)"Attempt to send Message-2/Message-3 (second time)...");
        this.sendMessage(queueName, 2);
        LOGGER.warn((Object)"Message-2/Message-3 successfuly sent (second time)");
        int expectedMessageCount = 3;
        Assert.assertEquals((long)3L, (long)this.getMessageCount());
        for (int i = 1; i <= expectedMessageCount; ++i) {
            Message message = this.receiveMessage(queueName, 10000L);
            LOGGER.warn((Object)(i + ". Message received (" + message + ")"));
            Assert.assertNotNull((Object)message);
            Assert.assertTrue((boolean)(message instanceof TextMessage));
            Assert.assertEquals((long)i, (long)message.getIntProperty("MessageId"));
            Assert.assertEquals((Object)("Message-" + i), (Object)((TextMessage)message).getText());
        }
        Assert.assertEquals((long)0L, (long)this.getMessageCount());
        Assert.assertNull((Object)this.receiveMessage(queueName, 4000L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQueueMemoryLeak() throws Exception {
        String queueName = "testMemoryLeak";
        this.sendMessage(queueName, 1);
        this.persistenceAdapter.setCommitFailureEnabled(true);
        try {
            for (int i = 0; i < 10; ++i) {
                try {
                    this.sendMessage(queueName, 2);
                    continue;
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
            }
        }
        finally {
            this.persistenceAdapter.setCommitFailureEnabled(false);
        }
        org.apache.activemq.broker.region.Destination destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(queueName));
        if (destination instanceof Queue) {
            Queue queue = (Queue)destination;
            Field listField = Queue.class.getDeclaredField("indexOrderedCursorUpdates");
            listField.setAccessible(true);
            List list = (List)listField.get(queue);
            Assert.assertEquals((long)0L, (long)list.size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQueueMemoryLeakNoTx() throws Exception {
        String queueName = "testMemoryLeak";
        this.sendMessage(queueName, 1);
        this.persistenceAdapter.setCommitFailureEnabled(true);
        try {
            for (int i = 0; i < 10; ++i) {
                try {
                    this.sendMessage(queueName, 2, false);
                    continue;
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
            }
        }
        finally {
            this.persistenceAdapter.setCommitFailureEnabled(false);
        }
        org.apache.activemq.broker.region.Destination destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(queueName));
        if (destination instanceof Queue) {
            Queue queue = (Queue)destination;
            Field listField = Queue.class.getDeclaredField("indexOrderedCursorUpdates");
            listField.setAccessible(true);
            List list = (List)listField.get(queue);
            Assert.assertEquals((long)0L, (long)list.size());
        }
    }

    private void sendMessage(String queueName, int count) throws JMSException {
        this.sendMessage(queueName, count, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessage(String queueName, int count, boolean transacted) throws JMSException {
        try (Connection con = this.connectionFactory.createConnection();
             Session session = con.createSession(transacted, transacted ? 0 : 1);){
            javax.jms.Queue destination = session.createQueue(queueName);
            try (MessageProducer producer = session.createProducer((Destination)destination);){
                for (int i = 0; i < count; ++i) {
                    TextMessage message = session.createTextMessage();
                    message.setIntProperty("MessageId", this.messageCounter);
                    message.setText("Message-" + this.messageCounter++);
                    producer.send((Message)message);
                }
                if (transacted) {
                    session.commit();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message receiveMessage(String queueName, long receiveTimeout) throws JMSException {
        Message message = null;
        try (Connection con = this.connectionFactory.createConnection();){
            con.start();
            try (Session session = con.createSession(true, 0);){
                javax.jms.Queue destination = session.createQueue(queueName);
                try (MessageConsumer consumer = session.createConsumer((Destination)destination);){
                    message = consumer.receive(receiveTimeout);
                    session.commit();
                }
            }
            finally {
                con.stop();
            }
        }
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getMessageCount() throws SQLException {
        long messageCount = -1L;
        try (java.sql.Connection con = this.dataSource.getConnection();
             Statement stmt = con.createStatement();
             ResultSet rs = stmt.executeQuery("select count(*) from activemq_msgs");){
            while (rs.next()) {
                messageCount = rs.getLong(1);
            }
        }
        return messageCount;
    }

    private static class CommitFailurePersistenceAdapter
    extends JDBCPersistenceAdapter {
        private boolean isCommitFailureEnabled;
        private int transactionIsolation;

        public CommitFailurePersistenceAdapter(DataSource dataSource) {
            this.setDataSource(dataSource);
        }

        public void setCommitFailureEnabled(boolean isCommitFailureEnabled) {
            this.isCommitFailureEnabled = isCommitFailureEnabled;
        }

        public void setTransactionIsolation(int transactionIsolation) {
            super.setTransactionIsolation(transactionIsolation);
            this.transactionIsolation = transactionIsolation;
        }

        public TransactionContext getTransactionContext() throws IOException {
            TransactionContext answer = new TransactionContext(this){

                public void executeBatch() throws SQLException {
                    if (CommitFailurePersistenceAdapter.this.isCommitFailureEnabled) {
                        throw new SQLException("Test commit failure exception");
                    }
                    super.executeBatch();
                }
            };
            if (this.transactionIsolation > 0) {
                answer.setTransactionIsolation(this.transactionIsolation);
            }
            return answer;
        }
    }
}

