package org.apache.activemq.artemis.tests.integration.stomp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.class */
public class ConcurrentStompTest extends StompTestBase {
    private Socket stompSocket_2;
    private ByteArrayOutputStream inputBuffer_2;

    /* JADX WARN: Type inference failed for: r0v30, types: [org.apache.activemq.artemis.tests.integration.stomp.ConcurrentStompTest$1] */
    @Test
    public void testSendManyMessages() throws Exception {
        try {
            sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
            Assert.assertTrue(receiveFrame(10000L).startsWith("CONNECTED"));
            this.stompSocket_2 = createSocket();
            this.inputBuffer_2 = new ByteArrayOutputStream();
            sendFrame(this.stompSocket_2, "CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
            Assert.assertTrue(receiveFrame(this.stompSocket_2, this.inputBuffer_2, 10000L).startsWith("CONNECTED"));
            final CountDownLatch countDownLatch = new CountDownLatch(1000);
            sendFrame(this.stompSocket_2, "SUBSCRIBE\ndestination:" + getQueuePrefix() + getQueueName() + "\nack:auto\n\n��");
            Thread.sleep(2000L);
            new Thread() { // from class: org.apache.activemq.artemis.tests.integration.stomp.ConcurrentStompTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    int i = 0;
                    while (true) {
                        try {
                            String receiveFrame = ConcurrentStompTest.this.receiveFrame(ConcurrentStompTest.this.stompSocket_2, ConcurrentStompTest.this.inputBuffer_2, 10000L);
                            Assert.assertTrue(receiveFrame.startsWith("MESSAGE"));
                            Assert.assertTrue(receiveFrame.indexOf("destination:") > 0);
                            int i2 = i;
                            i++;
                            System.out.println("<<< " + i2);
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            return;
                        }
                    }
                }
            }.start();
            String str = "SEND\ndestination:" + getQueuePrefix() + getQueueName() + "\n";
            for (int i = 1; i <= 1000; i++) {
                System.out.println(">>> " + i);
                sendFrame(str + "count:" + i + "\n\n��");
            }
            assertTrue(countDownLatch.await(60L, TimeUnit.SECONDS));
            this.stompSocket_2.close();
            this.inputBuffer_2.close();
        } catch (Throwable th) {
            this.stompSocket_2.close();
            this.inputBuffer_2.close();
            throw th;
        }
    }

    public void sendFrame(Socket socket, String str) throws Exception {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        OutputStream outputStream = socket.getOutputStream();
        for (byte b : bytes) {
            outputStream.write(b);
        }
        outputStream.flush();
    }

    public String receiveFrame(Socket socket, ByteArrayOutputStream byteArrayOutputStream, long j) throws Exception {
        socket.setSoTimeout((int) j);
        InputStream inputStream = socket.getInputStream();
        while (true) {
            int read = inputStream.read();
            if (read < 0) {
                throw new IOException("socket closed.");
            }
            if (read == 0) {
                int read2 = inputStream.read();
                if (read2 != 10) {
                    System.out.println(new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
                }
                Assert.assertEquals("Expecting stomp frame to terminate with ��\n", read2, 10L);
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                byteArrayOutputStream.reset();
                return new String(byteArray, StandardCharsets.UTF_8);
            }
            byteArrayOutputStream.write(read);
        }
    }
}
