package org.fusesource.fabric.bridge.internal;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fusesource/fabric/bridge/internal/BatchMessageListenerContainerTest.class */
public class BatchMessageListenerContainerTest extends AbstractConnectorTestSupport {
    static final Logger LOG = LoggerFactory.getLogger(BatchMessageListenerContainerTest.class);
    private BatchMessageListenerContainer listenerContainer;

    @Before
    public void setUp() throws Exception {
        LOG.debug("Creating listener container");
        this.listenerContainer = new BatchMessageListenerContainer();
        this.listenerContainer.setAutoStartup(false);
        this.listenerContainer.setConnectionFactory(new ActiveMQConnectionFactory("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10"));
        this.listenerContainer.setConcurrentConsumers(10);
        this.listenerContainer.setDestinationName("fabric.bridge.testQueue");
        this.listenerContainer.setReceiveTimeout(1L);
    }

    @After
    public void tearDown() throws Exception {
        LOG.debug("Destroying listener container");
        this.listenerContainer.stop();
        this.listenerContainer.destroy();
        this.listenerContainer = null;
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSetBatchMessageListener() {
        this.listenerContainer.setBatchMessageListener(new Object());
    }

    @Test
    public void testSetBatchSize() {
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        this.listenerContainer.setBatchSize(10L);
        this.listenerContainer.setBatchTimeout(5000L);
        this.listenerContainer.setBatchMessageListener(new SessionAwareBatchMessageListener<Message>() { // from class: org.fusesource.fabric.bridge.internal.BatchMessageListenerContainerTest.1
            public void onMessages(List<Message> list, Session session) throws JMSException {
                countDownLatch.countDown();
                int size = list.size();
                concurrentLinkedQueue.add(Integer.valueOf(size));
                BatchMessageListenerContainerTest.this.logBatch("Batch size [", size, session);
            }
        });
        this.listenerContainer.afterPropertiesSet();
        sendMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", "fabric.bridge.testQueue", 100, null);
        this.listenerContainer.start();
        try {
            assertTrue("Test timed out", countDownLatch.await(15L, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            fail("Batch test interrupted");
        }
        int size = concurrentLinkedQueue.size();
        for (int i = 0; i < size; i++) {
            assertTrue("Batch size exceeded", ((Integer) concurrentLinkedQueue.poll()).intValue() <= 10);
        }
    }

    @Test
    public void testSetBatchTimeout() {
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        this.listenerContainer.setBatchTimeout(1000L);
        this.listenerContainer.setBatchSize(5000L);
        this.listenerContainer.setBatchMessageListener(new SessionAwareBatchMessageListener<Message>() { // from class: org.fusesource.fabric.bridge.internal.BatchMessageListenerContainerTest.2
            public void onMessages(List<Message> list, Session session) throws JMSException {
                countDownLatch.countDown();
                int jMSTimestamp = (int) (list.get(list.size() - 1).getJMSTimestamp() - list.get(0).getJMSTimestamp());
                concurrentLinkedQueue.add(Integer.valueOf(jMSTimestamp));
                BatchMessageListenerContainerTest.this.logBatch("Batch duration [", jMSTimestamp, session);
            }
        });
        this.listenerContainer.afterPropertiesSet();
        this.listenerContainer.start();
        sendMessages("vm://local?broker.persistent=false&broker.brokerName=local&jms.prefetchPolicy.queuePrefetch=10", "fabric.bridge.testQueue", 100, null);
        try {
            assertTrue("Test timed out", countDownLatch.await(15L, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            fail("Batch test interrupted");
        }
        int size = concurrentLinkedQueue.size();
        for (int i = 0; i < size; i++) {
            assertTrue("Batch exceeded timeout", ((long) ((Integer) concurrentLinkedQueue.poll()).intValue()) <= 1000);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logBatch(String str, int i, Session session) {
        StringBuffer stringBuffer = new StringBuffer(str);
        stringBuffer.append(i);
        stringBuffer.append("] in Session[");
        stringBuffer.append(session.toString());
        stringBuffer.append("]");
        LOG.info(stringBuffer.toString());
    }
}
