package org.apache.activemq.bugs;

import java.io.File;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.util.LoggingBrokerPlugin;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2149Test.class */
public class AMQ2149Test extends AutoFailTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class);
    private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
    private static final String DEFAULT_BROKER_URL = "failover:(tcp://localhost:61617)?maxReconnectDelay=1000&useExponentialBackOff=false";
    private static final long DEFAULT_BROKER_STOP_PERIOD = 20000;
    private static final long DEFAULT_NUM_TO_SEND = 1400;
    static final int MAX_BROKER_RESTARTS = 5;
    BrokerService broker;
    private File dataDirFile;
    private final String SEQ_NUM_PROPERTY = "seqNum";
    final int MESSAGE_LENGTH_BYTES = 76800;
    final long SLEEP_BETWEEN_SEND_MS = 25;
    final int NUM_SENDERS_AND_RECEIVERS = 10;
    final Object brokerLock = new Object();
    long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
    long numtoSend = DEFAULT_NUM_TO_SEND;
    long sleepBetweenSend = 25;
    String brokerURL = DEFAULT_BROKER_URL;
    int numBrokerRestarts = 0;
    Vector<Throwable> exceptions = new Vector<>();
    final LoggingBrokerPlugin[] plugins = {new LoggingBrokerPlugin()};

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.bugs.AMQ2149Test$1RestartTask, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2149Test$1RestartTask.class */
    public class C1RestartTask extends TimerTask {
        final /* synthetic */ Configurer val$configurer;
        final /* synthetic */ Timer val$timer;

        C1RestartTask(Configurer configurer, Timer timer) {
            this.val$configurer = configurer;
            this.val$timer = timer;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            synchronized (AMQ2149Test.this.brokerLock) {
                AMQ2149Test.LOG.info("stopping broker..");
                try {
                    AMQ2149Test.this.broker.stop();
                    AMQ2149Test.this.broker.waitUntilStopped();
                } catch (Exception e) {
                    AMQ2149Test.LOG.error("ex on broker stop", e);
                    AMQ2149Test.this.exceptions.add(e);
                }
                AMQ2149Test.LOG.info("restarting broker");
                try {
                    AMQ2149Test.this.createBroker(this.val$configurer);
                    AMQ2149Test.this.broker.waitUntilStarted();
                } catch (Exception e2) {
                    AMQ2149Test.LOG.error("ex on broker restart", e2);
                    AMQ2149Test.this.exceptions.add(e2);
                }
            }
            AMQ2149Test aMQ2149Test = AMQ2149Test.this;
            int i = aMQ2149Test.numBrokerRestarts + 1;
            aMQ2149Test.numBrokerRestarts = i;
            if (i >= 5) {
                AMQ2149Test.LOG.info("no longer stopping broker on reaching Max restarts: 5");
            } else {
                try {
                    this.val$timer.schedule(new C1RestartTask(this.val$configurer, this.val$timer), AMQ2149Test.this.brokerStopPeriod);
                } catch (IllegalStateException e3) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2149Test$Receiver.class */
    public class Receiver implements MessageListener {
        private final Destination dest;
        private final Connection connection;
        private final Session session;
        private final MessageConsumer messageConsumer;
        private final boolean transactional;
        private volatile long nextExpectedSeqNum = 0;
        private String lastId = null;
        final int TRANSACITON_BATCH = 500;

        public Receiver(Destination destination, boolean z) throws JMSException {
            this.dest = destination;
            this.transactional = z;
            this.connection = new ActiveMQConnectionFactory(AMQ2149Test.this.brokerURL).createConnection();
            this.connection.setClientID(destination.toString());
            this.session = this.connection.createSession(z, z ? 0 : 1);
            if (ActiveMQDestination.transform(destination).isTopic()) {
                this.messageConsumer = this.session.createDurableSubscriber((Topic) destination, destination.toString());
            } else {
                this.messageConsumer = this.session.createConsumer(destination);
            }
            this.messageConsumer.setMessageListener(this);
            this.connection.start();
        }

        public void close() throws JMSException {
            this.connection.close();
        }

        public long getNextExpectedSeqNo() {
            return this.nextExpectedSeqNum;
        }

        public void onMessage(Message message) {
            try {
                long longProperty = message.getLongProperty("seqNum");
                if (longProperty % 500 == 0) {
                    AMQ2149Test.LOG.info(this.dest + " received " + longProperty);
                    if (this.transactional) {
                        AMQ2149Test.LOG.info("committing..");
                        this.session.commit();
                    }
                }
                if (longProperty != this.nextExpectedSeqNum) {
                    AMQ2149Test.LOG.warn(this.dest + " received " + longProperty + " in msg: " + message.getJMSMessageID() + " expected " + this.nextExpectedSeqNum + ", lastId: " + this.lastId + ", message:" + message);
                    Assert.fail(this.dest + " received " + longProperty + " expected " + this.nextExpectedSeqNum);
                }
                this.nextExpectedSeqNum++;
                this.lastId = message.getJMSMessageID();
            } catch (TransactionRolledBackException e) {
                AMQ2149Test.LOG.info("got rollback: " + e);
                this.nextExpectedSeqNum -= 499;
            } catch (Throwable th) {
                AMQ2149Test.LOG.error(this.dest + " onMessage error", th);
                AMQ2149Test.this.exceptions.add(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2149Test$Sender.class */
    public class Sender implements Runnable {
        private final Destination dest;
        private final Connection connection;
        private final Session session;
        private final MessageProducer messageProducer;
        private volatile long nextSequenceNumber = 0;

        public Sender(Destination destination) throws JMSException {
            this.dest = destination;
            this.connection = new ActiveMQConnectionFactory(AMQ2149Test.this.brokerURL).createConnection();
            this.session = this.connection.createSession(false, 1);
            this.messageProducer = this.session.createProducer(destination);
            this.messageProducer.setDeliveryMode(2);
            this.connection.start();
        }

        @Override // java.lang.Runnable
        public void run() {
            String buildLongString = AMQ2149Test.this.buildLongString();
            while (this.nextSequenceNumber < AMQ2149Test.this.numtoSend) {
                try {
                    TextMessage createTextMessage = this.session.createTextMessage(buildLongString);
                    createTextMessage.setLongProperty("seqNum", this.nextSequenceNumber);
                    this.nextSequenceNumber++;
                    this.messageProducer.send(createTextMessage);
                    if (this.nextSequenceNumber % 500 == 0) {
                        AMQ2149Test.LOG.info(this.dest + " sent " + this.nextSequenceNumber);
                    }
                } catch (Exception e) {
                    AMQ2149Test.LOG.error(this.dest + " send error", e);
                    AMQ2149Test.this.exceptions.add(e);
                }
                if (AMQ2149Test.this.sleepBetweenSend > 0) {
                    try {
                        Thread.sleep(AMQ2149Test.this.sleepBetweenSend);
                    } catch (InterruptedException e2) {
                        AMQ2149Test.LOG.warn(this.dest + " sleep interrupted", e2);
                    }
                }
            }
            try {
                this.connection.close();
            } catch (JMSException e3) {
            }
        }
    }

    public void createBroker(Configurer configurer) throws Exception {
        this.broker = new BrokerService();
        configurePersistenceAdapter(this.broker);
        SystemUsage systemUsage = new SystemUsage();
        MemoryUsage memoryUsage = new MemoryUsage();
        memoryUsage.setLimit(153600000L);
        systemUsage.setMemoryUsage(memoryUsage);
        this.broker.setSystemUsage(systemUsage);
        this.broker.addConnector(BROKER_CONNECTOR);
        this.broker.setBrokerName(getName());
        this.broker.setDataDirectoryFile(this.dataDirFile);
        if (configurer != null) {
            configurer.configure(this.broker);
        }
        this.broker.start();
    }

    protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
        AMQPersistenceAdapterFactory aMQPersistenceAdapterFactory = new AMQPersistenceAdapterFactory();
        aMQPersistenceAdapterFactory.setDataDirectory(this.dataDirFile);
        brokerService.setPersistenceFactory(aMQPersistenceAdapterFactory);
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        setMaxTestTime(1800000L);
        setAutoFail(true);
        this.dataDirFile = new File("target/" + getName());
        this.numtoSend = DEFAULT_NUM_TO_SEND;
        this.brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
        this.sleepBetweenSend = 25L;
        this.brokerURL = DEFAULT_BROKER_URL;
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        synchronized (this.brokerLock) {
            if (this.broker != null) {
                this.broker.stop();
                this.broker.waitUntilStopped();
            }
        }
        this.exceptions.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String buildLongString() {
        StringBuilder sb = new StringBuilder(76800);
        for (int i = 0; i < 76800; i++) {
            sb.append((int) (Math.random() * 10.0d));
        }
        return sb.toString();
    }

    public void vanilaVerify_testOrder() throws Exception {
        createBroker(new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.1
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.deleteAllMessages();
            }
        });
        verifyOrderedMessageReceipt();
        verifyStats(false);
    }

    public void noProblem_testOrderWithRestartAndVMIndex() throws Exception {
        createBroker(new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.2
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.getPersistenceFactory().setPersistentIndex(false);
                brokerService.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        schedualRestartTask(timer, new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.3
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.getPersistenceFactory().setPersistentIndex(false);
            }
        });
        try {
            verifyOrderedMessageReceipt();
            timer.cancel();
            verifyStats(true);
        } catch (Throwable th) {
            timer.cancel();
            throw th;
        }
    }

    public void testOrderWithRestart() throws Exception {
        createBroker(new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.4
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        schedualRestartTask(timer, new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.5
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
            }
        });
        try {
            verifyOrderedMessageReceipt();
            timer.cancel();
            verifyStats(true);
        } catch (Throwable th) {
            timer.cancel();
            throw th;
        }
    }

    public void testTopicOrderWithRestart() throws Exception {
        createBroker(new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.6
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        schedualRestartTask(timer, null);
        try {
            verifyOrderedMessageReceipt((byte) 2);
            timer.cancel();
            verifyStats(true);
        } catch (Throwable th) {
            timer.cancel();
            throw th;
        }
    }

    public void testQueueTransactionalOrderWithRestart() throws Exception {
        doTestTransactionalOrderWithRestart((byte) 1);
    }

    public void testTopicTransactionalOrderWithRestart() throws Exception {
        doTestTransactionalOrderWithRestart((byte) 2);
    }

    public void doTestTransactionalOrderWithRestart(byte b) throws Exception {
        this.numtoSend = DurableSubProcessWithRestartTest.BROKER_RESTART;
        this.sleepBetweenSend = 3L;
        this.brokerStopPeriod = Wait.MAX_WAIT_MILLIS;
        createBroker(new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.7
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        schedualRestartTask(timer, null);
        try {
            verifyOrderedMessageReceipt(b, 1, true);
            timer.cancel();
            verifyStats(true);
        } catch (Throwable th) {
            timer.cancel();
            throw th;
        }
    }

    public void eaiserToRepoduce_testOrderWithRestartWithForceRecover() throws Exception {
        createBroker(new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.8
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.getPersistenceFactory().setForceRecoverReferenceStore(true);
                brokerService.setPlugins(AMQ2149Test.this.plugins);
                brokerService.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        schedualRestartTask(timer, new Configurer() { // from class: org.apache.activemq.bugs.AMQ2149Test.9
            @Override // org.apache.activemq.bugs.Configurer
            public void configure(BrokerService brokerService) throws Exception {
                brokerService.getPersistenceFactory().setForceRecoverReferenceStore(true);
                brokerService.setPlugins(AMQ2149Test.this.plugins);
            }
        });
        try {
            verifyOrderedMessageReceipt();
            timer.cancel();
            verifyStats(true);
        } catch (Throwable th) {
            timer.cancel();
            throw th;
        }
    }

    private void verifyStats(boolean z) throws Exception {
        for (org.apache.activemq.broker.region.Destination destination : this.broker.getRegionBroker().getQueueRegion().getDestinationMap().values()) {
            DestinationStatistics destinationStatistics = destination.getDestinationStatistics();
            if (z) {
                LOG.info("with restart: not asserting qneue/dequeue stat match for: " + destination.getName() + " " + destinationStatistics.getEnqueues().getCount() + " <= " + destinationStatistics.getDequeues().getCount());
            } else {
                assertEquals("qneue/dequeue match for: " + destination.getName(), destinationStatistics.getEnqueues().getCount(), destinationStatistics.getDequeues().getCount());
            }
        }
    }

    private void schedualRestartTask(Timer timer, Configurer configurer) {
        timer.schedule(new C1RestartTask(configurer, timer), this.brokerStopPeriod);
    }

    private void verifyOrderedMessageReceipt(byte b) throws Exception {
        verifyOrderedMessageReceipt(b, 10, false);
    }

    private void verifyOrderedMessageReceipt() throws Exception {
        verifyOrderedMessageReceipt((byte) 1, 10, false);
    }

    private void verifyOrderedMessageReceipt(byte b, int i, boolean z) throws Exception {
        Vector vector = new Vector();
        Vector vector2 = new Vector();
        for (int i2 = 0; i2 < i; i2++) {
            ActiveMQDestination createDestination = ActiveMQDestination.createDestination("test.dest." + i2, b);
            vector2.add(new Receiver(createDestination, z));
            Thread thread = new Thread(new Sender(createDestination));
            thread.start();
            vector.add(thread);
        }
        long currentTimeMillis = System.currentTimeMillis() + 1800000;
        while (!vector.isEmpty() && this.exceptions.isEmpty() && System.currentTimeMillis() < currentTimeMillis) {
            Thread thread2 = (Thread) vector.firstElement();
            thread2.join(DurableSubProcessWithRestartTest.BROKER_RESTART);
            if (!thread2.isAlive()) {
                vector.remove(thread2);
            }
        }
        LOG.info("senders done...");
        while (!vector2.isEmpty() && System.currentTimeMillis() < currentTimeMillis) {
            Receiver receiver = (Receiver) vector2.firstElement();
            if (receiver.getNextExpectedSeqNo() >= this.numtoSend || !this.exceptions.isEmpty()) {
                receiver.close();
                vector2.remove(receiver);
            }
        }
        assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < currentTimeMillis);
        if (!this.exceptions.isEmpty()) {
            this.exceptions.get(0).printStackTrace();
        }
        assertTrue("No exceptions", this.exceptions.isEmpty());
    }
}
