package org.apache.activemq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/ExpiredAckAsyncConsumerTest.class */
public class ExpiredAckAsyncConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(ExpiredAckAsyncConsumerTest.class);
    private BrokerService broker;
    private Connection connection;
    private ConnectionConsumer connectionConsumer;
    private Queue queue;
    private AtomicBoolean finished = new AtomicBoolean();
    private AtomicBoolean failed = new AtomicBoolean();

    /* loaded from: input_file:org/apache/activemq/ExpiredAckAsyncConsumerTest$TestMessageListener.class */
    private class TestMessageListener implements MessageListener {
        private TestMessageListener() {
        }

        public void onMessage(Message message) {
            try {
                Thread.sleep(1000L);
                ExpiredAckAsyncConsumerTest.LOG.info("got message: " + ((TextMessage) message).getText());
            } catch (Exception e) {
                ExpiredAckAsyncConsumerTest.LOG.error("in onMessage", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/ExpiredAckAsyncConsumerTest$TestServerSession.class */
    public class TestServerSession implements ServerSession {
        TestServerSessionPool pool;
        Session session;

        public TestServerSession(TestServerSessionPool testServerSessionPool) throws JMSException {
            this.pool = testServerSessionPool;
            this.session = testServerSessionPool.connection.createSession(false, 1);
            this.session.setMessageListener(new TestMessageListener());
        }

        public Session getSession() throws JMSException {
            return this.session;
        }

        /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.activemq.ExpiredAckAsyncConsumerTest$TestServerSession$1] */
        public void start() throws JMSException {
            new Thread() { // from class: org.apache.activemq.ExpiredAckAsyncConsumerTest.TestServerSession.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    if (ExpiredAckAsyncConsumerTest.this.finished.get()) {
                        return;
                    }
                    try {
                        TestServerSession.this.session.run();
                        TestServerSession.this.pool.addSession();
                    } catch (Exception e) {
                    }
                }
            }.start();
        }
    }

    /* loaded from: input_file:org/apache/activemq/ExpiredAckAsyncConsumerTest$TestServerSessionPool.class */
    private class TestServerSessionPool implements ServerSessionPool {
        Connection connection;
        LinkedBlockingQueue<TestServerSession> serverSessions = new LinkedBlockingQueue<>(10);

        public TestServerSessionPool(Connection connection) throws JMSException {
            this.connection = connection;
            for (int i = 0; i < 15; i++) {
                addSession();
            }
        }

        public ServerSession getServerSession() throws JMSException {
            try {
                return this.serverSessions.take();
            } catch (InterruptedException e) {
                throw new RuntimeException("could not get session");
            }
        }

        public void addSession() {
            try {
                this.serverSessions.add(new TestServerSession(this));
            } catch (Exception e) {
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(new PolicyEntry());
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString());
        activeMQConnectionFactory.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.ExpiredAckAsyncConsumerTest.1
            public void onException(JMSException jMSException) {
                ExpiredAckAsyncConsumerTest.this.failed.set(true);
            }
        });
        this.connection = activeMQConnectionFactory.createConnection();
        this.queue = createQueue();
        this.connection.start();
    }

    @After
    public void tearDown() throws Exception {
        this.connectionConsumer.close();
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test(timeout = 60000)
    public void testAsyncMessageExpiration() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        final Session createSession = this.connection.createSession(false, 1);
        final MessageProducer createProducer = createSession.createProducer(this.queue);
        createProducer.setTimeToLive(10L);
        newFixedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.ExpiredAckAsyncConsumerTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(100L);
                    int i = 0;
                    while (!ExpiredAckAsyncConsumerTest.this.failed.get() && i < 30) {
                        createProducer.send(createSession.createTextMessage("Hello World: " + i));
                        ExpiredAckAsyncConsumerTest.LOG.info("sending: " + i);
                        i++;
                        Thread.sleep(100L);
                    }
                    ExpiredAckAsyncConsumerTest.this.finished.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        this.connectionConsumer = this.connection.createConnectionConsumer(this.queue, (String) null, new TestServerSessionPool(this.connection), 1000);
        Assert.assertTrue("received messages", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.ExpiredAckAsyncConsumerTest.3
            public boolean isSatisified() throws Exception {
                return ExpiredAckAsyncConsumerTest.this.finished.get();
            }
        }));
        Assert.assertFalse("An exception was received on receive", this.failed.get());
    }

    protected Queue createQueue() {
        return new ActiveMQQueue("TEST");
    }
}
