package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2512Test.class */
public class AMQ2512Test extends EmbeddedBrokerTestSupport {
    private static Connection connection;
    private static final String QUEUE_NAME = "dee.q";
    private static final int INITIAL_MESSAGES_CNT = 1000;
    private static final int WORKER_INTERNAL_ITERATIONS = 100;
    private static final String PRP_INITIAL_ID = "initial-id";
    private static final String PRP_WORKER_ID = "worker-id";
    private static final byte[] payload = new byte[5120];
    private static final String TEXT = new String(payload);
    private static final int TOTAL_MESSAGES_CNT = 101000;
    private static final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
    private static final AtomicInteger ON_MSG_COUNTER = new AtomicInteger();

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2512Test$Consumer.class */
    private static final class Consumer implements MessageListener {
        private final String name;
        private final Session session;
        private final MessageProducer producer;

        private Consumer(String str) {
            this.name = str;
            try {
                this.session = AMQ2512Test.connection.createSession(false, 2);
                Queue createQueue = this.session.createQueue("dee.q?consumer.prefetchSize=10");
                this.producer = this.session.createProducer(createQueue);
                this.producer.setDeliveryMode(2);
                this.session.createConsumer(createQueue).setMessageListener(this);
            } catch (JMSException e) {
                e.printStackTrace();
                throw new RuntimeException((Throwable) e);
            }
        }

        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                try {
                    if (!textMessage.propertyExists(AMQ2512Test.PRP_WORKER_ID)) {
                        for (int i = 0; i < 100; i++) {
                            TextMessage createTextMessage = this.session.createTextMessage(textMessage.getText());
                            createTextMessage.setStringProperty(AMQ2512Test.PRP_WORKER_ID, this.name + "-" + i);
                            createTextMessage.setStringProperty(AMQ2512Test.PRP_INITIAL_ID, textMessage.getStringProperty(AMQ2512Test.PRP_INITIAL_ID));
                            this.producer.send(createTextMessage);
                        }
                    }
                    textMessage.acknowledge();
                    int andIncrement = AMQ2512Test.ON_MSG_COUNTER.getAndIncrement();
                    if (andIncrement % 1000 == 0) {
                        System.out.println("message received: " + andIncrement);
                    }
                    AMQ2512Test.LATCH.countDown();
                } catch (JMSException e) {
                    e.printStackTrace();
                    throw new RuntimeException((Throwable) e);
                }
            } catch (Throwable th) {
                int andIncrement2 = AMQ2512Test.ON_MSG_COUNTER.getAndIncrement();
                if (andIncrement2 % 1000 == 0) {
                    System.out.println("message received: " + andIncrement2);
                }
                AMQ2512Test.LATCH.countDown();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            if (this.session != null) {
                try {
                    this.session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                    throw new RuntimeException((Throwable) e);
                }
            }
        }
    }

    public void testKahaDBFailure() throws Exception {
        connection = new ActiveMQConnectionFactory(this.bindAddress).createConnection();
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE_NAME));
        createProducer.setDeliveryMode(2);
        connection.start();
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            arrayList.add(new Consumer("worker-" + i));
        }
        for (int i2 = 0; i2 < 1000; i2++) {
            TextMessage createTextMessage = createSession.createTextMessage(TEXT);
            createTextMessage.setStringProperty(PRP_INITIAL_ID, "initial-" + i2);
            createProducer.send(createTextMessage);
        }
        LATCH.await();
        long nanoTime2 = System.nanoTime();
        System.out.println("Total execution time = " + TimeUnit.MILLISECONDS.convert(nanoTime2 - nanoTime, TimeUnit.NANOSECONDS) + " [ms].");
        System.out.println("Rate = " + (101000 / TimeUnit.SECONDS.convert(nanoTime2 - nanoTime, TimeUnit.NANOSECONDS)) + " [msg/s].");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
        connection.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public void setUp() throws Exception {
        this.bindAddress = "tcp://0.0.0.0:61617";
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public BrokerService createBroker() throws Exception {
        File file = new File("target/test-amq-2512/datadb");
        IOHelper.mkdirs(file);
        IOHelper.deleteChildren(file);
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(file);
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistenceAdapter(kahaDBStore);
        kahaDBStore.setEnableJournalDiskSyncs(false);
        brokerService.setDataDirectoryFile(file);
        brokerService.setUseJmx(false);
        brokerService.addConnector(this.bindAddress);
        return brokerService;
    }
}
