package org.apache.activemq.bugs;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/bugs/SlowConsumerTest.class */
public class SlowConsumerTest extends TestCase {
    private static final Log LOG = LogFactory.getLog(SlowConsumerTest.class);
    private static final int MESSAGES_COUNT = 10000;
    protected int messageLogFrequency = 2500;
    protected long messageReceiveTimeout = StompConnection.RECEIVE_TIMEOUT;
    private Socket stompSocket;
    private ByteArrayOutputStream inputBuffer;
    private int messagesCount;

    public void testRemoveSubscriber() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(true);
        brokerService.setUseJmx(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.addConnector(NetworkedSyncTest.broker1URL).setName("Default");
        brokerService.start();
        final Connection createConnection = new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL).createConnection();
        createConnection.start();
        Thread thread = new Thread("Producing thread") { // from class: org.apache.activemq.bugs.SlowConsumerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Session createSession = createConnection.createSession(false, 1);
                    MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue(SlowConsumerTest.this.getDestinationName()));
                    for (int i = 0; i < 10000; i++) {
                        createProducer.send(createSession.createTextMessage("" + i));
                        SlowConsumerTest.LOG.debug("Sending: " + i);
                    }
                    createProducer.close();
                    createSession.close();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.setPriority(10);
        thread.start();
        Thread.sleep(1000L);
        Thread thread2 = new Thread("Consuming thread") { // from class: org.apache.activemq.bugs.SlowConsumerTest.2
            /* JADX WARN: Code restructure failed: missing block: B:17:0x0044, code lost:
            
                org.apache.activemq.bugs.SlowConsumerTest.LOG.warn("Got null message at count: " + r5.this$0.messagesCount + ". Continuing...");
             */
            @Override // java.lang.Thread, java.lang.Runnable
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void run() {
                /*
                    Method dump skipped, instructions count: 322
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.bugs.SlowConsumerTest.AnonymousClass2.run():void");
            }
        };
        thread2.start();
        thread2.join();
        assertEquals(10000, this.messagesCount);
    }

    public void sendFrame(String str) throws Exception {
        byte[] bytes = str.getBytes("UTF-8");
        OutputStream outputStream = this.stompSocket.getOutputStream();
        for (byte b : bytes) {
            outputStream.write(b);
        }
        outputStream.flush();
    }

    public String receiveFrame(long j) throws Exception {
        this.stompSocket.setSoTimeout((int) j);
        InputStream inputStream = this.stompSocket.getInputStream();
        while (true) {
            int read = inputStream.read();
            if (read < 0) {
                throw new IOException("socket closed.");
            }
            if (read == 0) {
                inputStream.read();
                byte[] byteArray = this.inputBuffer.toByteArray();
                this.inputBuffer.reset();
                return new String(byteArray, "UTF-8");
            }
            this.inputBuffer.write(read);
        }
    }

    protected String getDestinationName() {
        return getClass().getName() + "." + getName();
    }

    static /* synthetic */ int access$104(SlowConsumerTest slowConsumerTest) {
        int i = slowConsumerTest.messagesCount + 1;
        slowConsumerTest.messagesCount = i;
        return i;
    }
}
