package org.apache.activemq.store.kahadb;

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.class */
public class MKahaDBStoreLimitTest {
    private static final Logger LOG = LoggerFactory.getLogger(MKahaDBStoreLimitTest.class);
    final ActiveMQQueue queueA = new ActiveMQQueue("Q.A");
    final ActiveMQQueue queueB = new ActiveMQQueue("Q.B");
    private BrokerService broker;

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    private BrokerService createBroker(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistenceAdapter(multiKahaDBPersistenceAdapter);
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.setSchedulerSupport(false);
        this.broker.setPersistenceAdapter(multiKahaDBPersistenceAdapter);
        this.broker.setDeleteAllMessagesOnStartup(true);
        return this.broker;
    }

    @Test
    public void testPerDestUsage() throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(5120);
        kahaDBPersistenceAdapter.setCleanupInterval(1000L);
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setPercentLimit(10);
        storeUsage.setTotal(10485760L);
        filteredKahaDBPersistenceAdapter.setUsage(storeUsage);
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(kahaDBPersistenceAdapter);
        filteredKahaDBPersistenceAdapter.setPerDestination(true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(filteredKahaDBPersistenceAdapter);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        createBroker(multiKahaDBPersistenceAdapter).start();
        produceMessages(this.queueA, 20);
        produceMessages(this.queueB, 0);
        Logger logger = LOG;
        long usage = this.broker.getSystemUsage().getStoreUsage().getUsage();
        this.broker.getSystemUsage().getStoreUsage().getPercentUsage();
        logger.info("Store global u: " + usage + ", %:" + logger);
        Assert.assertTrue("some usage", this.broker.getSystemUsage().getStoreUsage().getUsage() > 0);
        BaseDestination baseDestination = (BaseDestination) this.broker.getRegionBroker().getDestinationMap().get(this.queueA);
        BaseDestination baseDestination2 = (BaseDestination) this.broker.getRegionBroker().getDestinationMap().get(this.queueB);
        Logger logger2 = LOG;
        long usage2 = baseDestination.getSystemUsage().getStoreUsage().getUsage();
        baseDestination.getSystemUsage().getStoreUsage().getPercentUsage();
        logger2.info("Store A u: " + usage2 + ", %: " + logger2);
        Assert.assertTrue(baseDestination.getSystemUsage().getStoreUsage().getUsage() > 0);
        produceMessages(this.queueB, 40);
        Assert.assertTrue(baseDestination2.getSystemUsage().getStoreUsage().getUsage() > 0);
        Assert.assertTrue(baseDestination2.getSystemUsage().getStoreUsage().getUsage() > baseDestination.getSystemUsage().getStoreUsage().getUsage());
        Logger logger3 = LOG;
        long usage3 = baseDestination2.getSystemUsage().getStoreUsage().getUsage();
        baseDestination2.getSystemUsage().getStoreUsage().getPercentUsage();
        logger3.info("Store B u: " + usage3 + ", %: " + logger3);
        Logger logger4 = LOG;
        long usage4 = this.broker.getSystemUsage().getStoreUsage().getUsage();
        this.broker.getSystemUsage().getStoreUsage().getPercentUsage();
        logger4.info("Store global u: " + usage4 + ", %:" + logger4);
        consume(this.queueA);
        consume(this.queueB);
        Logger logger5 = LOG;
        long usage5 = this.broker.getSystemUsage().getStoreUsage().getUsage();
        this.broker.getSystemUsage().getStoreUsage().getPercentUsage();
        logger5.info("Store global u: " + usage5 + ", %:" + logger5);
        Logger logger6 = LOG;
        long usage6 = baseDestination.getSystemUsage().getStoreUsage().getUsage();
        baseDestination.getSystemUsage().getStoreUsage().getPercentUsage();
        logger6.info("Store A u: " + usage6 + ", %: " + logger6);
        Logger logger7 = LOG;
        long usage7 = baseDestination2.getSystemUsage().getStoreUsage().getUsage();
        baseDestination2.getSystemUsage().getStoreUsage().getPercentUsage();
        logger7.info("Store B u: " + usage7 + ", %: " + logger7);
    }

    @Test
    public void testExplicitAdapter() throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(25600);
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setPercentLimit(50);
        storeUsage.setTotal(524288L);
        filteredKahaDBPersistenceAdapter.setUsage(storeUsage);
        filteredKahaDBPersistenceAdapter.setDestination(this.queueA);
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(kahaDBPersistenceAdapter);
        ArrayList arrayList = new ArrayList();
        arrayList.add(filteredKahaDBPersistenceAdapter);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        BrokerService createBroker = createBroker(multiKahaDBPersistenceAdapter);
        createBroker.getSystemUsage().getStoreUsage().setTotal(1048576L);
        createBroker.start();
        produceMessages(this.queueA, 20);
        Logger logger = LOG;
        long usage = this.broker.getSystemUsage().getStoreUsage().getUsage();
        this.broker.getSystemUsage().getStoreUsage().getPercentUsage();
        logger.info("Store global u: " + usage + ", %:" + logger);
        Assert.assertTrue("some usage", this.broker.getSystemUsage().getStoreUsage().getUsage() > 0);
        BaseDestination baseDestination = (BaseDestination) this.broker.getRegionBroker().getDestinationMap().get(this.queueA);
        Logger logger2 = LOG;
        long usage2 = baseDestination.getSystemUsage().getStoreUsage().getUsage();
        baseDestination.getSystemUsage().getStoreUsage().getPercentUsage();
        logger2.info("Store A u: " + usage2 + ", %: " + logger2);
        Assert.assertTrue("limited store has more % usage than parent", baseDestination.getSystemUsage().getStoreUsage().getPercentUsage() > this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
    }

    @Test
    public void testExplicitAdapterBlockingProducer() throws Exception {
        long count;
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(8192);
        kahaDBPersistenceAdapter.setIndexDirectory(new File(IOHelper.getDefaultDataDirectory()));
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(45056L);
        filteredKahaDBPersistenceAdapter.setUsage(storeUsage);
        filteredKahaDBPersistenceAdapter.setDestination(this.queueA);
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(kahaDBPersistenceAdapter);
        ArrayList arrayList = new ArrayList();
        arrayList.add(filteredKahaDBPersistenceAdapter);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        createBroker(multiKahaDBPersistenceAdapter).start();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.store.kahadb.MKahaDBStoreLimitTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    MKahaDBStoreLimitTest.this.produceMessages(MKahaDBStoreLimitTest.this.queueA, 20);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                }
            }
        });
        Assert.assertTrue("some messages got to dest", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.MKahaDBStoreLimitTest.2
            public boolean isSatisified() throws Exception {
                BaseDestination baseDestination = (BaseDestination) MKahaDBStoreLimitTest.this.broker.getRegionBroker().getDestinationMap().get(MKahaDBStoreLimitTest.this.queueA);
                return baseDestination != null && baseDestination.getDestinationStatistics().getMessages().getCount() > 4;
            }
        }));
        BaseDestination baseDestination = (BaseDestination) this.broker.getRegionBroker().getDestinationMap().get(this.queueA);
        do {
            count = baseDestination.getDestinationStatistics().getEnqueues().getCount();
            LOG.info("Dest Enqueues: " + count);
            TimeUnit.MILLISECONDS.sleep(500L);
        } while (count != baseDestination.getDestinationStatistics().getEnqueues().getCount());
        Assert.assertFalse("expect producer to block", atomicBoolean.get());
        Logger logger = LOG;
        long usage = this.broker.getSystemUsage().getStoreUsage().getUsage();
        this.broker.getSystemUsage().getStoreUsage().getPercentUsage();
        logger.info("Store global u: " + usage + ", %:" + logger);
        Assert.assertTrue("some usage", this.broker.getSystemUsage().getStoreUsage().getUsage() > 0);
        Logger logger2 = LOG;
        long usage2 = baseDestination.getSystemUsage().getStoreUsage().getUsage();
        baseDestination.getSystemUsage().getStoreUsage().getPercentUsage();
        logger2.info("Store A u: " + usage2 + ", %: " + logger2);
        Assert.assertTrue("limited store has more % usage than parent", baseDestination.getSystemUsage().getStoreUsage().getPercentUsage() > this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
        newCachedThreadPool.shutdownNow();
    }

    private void consume(Destination destination) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(destination);
        for (int i = 0; i < 5; i++) {
            Assert.assertNotNull("message[" + i + "]", createConsumer.receive(4000L));
        }
        createConnection.close();
    }

    private void produceMessages(Destination destination, int i) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(destination);
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[1024]);
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createBytesMessage);
        }
        createConnection.close();
    }
}
