package org.apache.activemq.transport.stomp;

import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
import java.util.UUID;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/stomp/StompMissingMessageTest.class */
public class StompMissingMessageTest {
    private static final Logger LOG = LoggerFactory.getLogger(StompMissingMessageTest.class);
    protected String bindAddress = "stomp://localhost:61613";
    protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
    protected String jmsUri = "vm://localhost";
    private BrokerService broker;
    protected String destination;

    @Before
    public void setUp() throws Exception {
        this.broker = BrokerFactory.createBroker(new URI(this.confUri));
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.destination = "/topic/" + getTopicName();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testProducerConsumerLoop() throws Exception {
        int i = 0;
        for (int i2 = 1; i2 <= 1000; i2 += 2) {
            if (doTestProducerConsumer(i2) != null) {
                i++;
            }
        }
        Assert.assertEquals(500L, i);
    }

    public String doTestProducerConsumer(int i) throws Exception {
        String str = null;
        Assert.assertEquals("Should not be any consumers", 0L, this.broker.getAdminView().getTopicSubscribers().length);
        StompConnection stompConnect = stompConnect();
        StompConnection stompConnect2 = stompConnect();
        subscribe(stompConnect2, Integer.toString(i));
        sendMessage(stompConnect, i);
        try {
            StompFrame receive = stompConnect2.receive();
            LOG.debug("Consumer got frame: " + ((String) null));
            Assert.assertEquals(i, Integer.valueOf(receive.getBody()).intValue());
            str = receive.getBody();
        } catch (Exception e) {
            Assert.fail("Consumer[" + i + "] got error while consuming: " + e.getMessage());
        }
        unsubscribe(stompConnect2, Integer.toString(i));
        stompDisconnect(stompConnect2);
        stompDisconnect(stompConnect);
        return str;
    }

    @Test
    public void testProducerDurableConsumerLoop() throws Exception {
        int i = 0;
        for (int i2 = 1; i2 <= 1000; i2 += 2) {
            if (doTestProducerDurableConsumer(i2) != null) {
                i++;
            }
        }
        Assert.assertEquals(500L, i);
    }

    public String doTestProducerDurableConsumer(int i) throws Exception {
        String str = null;
        Assert.assertEquals("Should not be any consumers", 0L, this.broker.getAdminView().getTopicSubscribers().length);
        StompConnection stompConnect = stompConnect();
        StompConnection stompConnect2 = stompConnect("test");
        subscribe(stompConnect2, Integer.toString(i), true);
        sendMessage(stompConnect, i);
        try {
            StompFrame receive = stompConnect2.receive();
            LOG.debug("Consumer got frame: " + ((String) null));
            Assert.assertEquals(i, Integer.valueOf(receive.getBody()).intValue());
            str = receive.getBody();
        } catch (Exception e) {
            Assert.fail("Consumer[" + i + "] got error while consuming: " + e.getMessage());
        }
        unsubscribe(stompConnect2, Integer.toString(i));
        stompDisconnect(stompConnect2);
        stompDisconnect(stompConnect);
        return str;
    }

    protected void subscribe(StompConnection stompConnection, String str) throws Exception {
        subscribe(stompConnection, str, false);
    }

    protected void subscribe(StompConnection stompConnection, String str, boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("id", str);
        if (z) {
            hashMap.put("activemq.subscriptionName", str);
        }
        hashMap.put("receipt", UUID.randomUUID().toString());
        stompConnection.subscribe(this.destination, "auto", hashMap);
        StompFrame receive = stompConnection.receive();
        Assert.assertEquals("RECEIPT", receive.getAction());
        Assert.assertEquals(hashMap.get("receipt"), (String) receive.getHeaders().get("receipt-id"));
    }

    protected void unsubscribe(StompConnection stompConnection, String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("id", str);
        hashMap.put("receipt", UUID.randomUUID().toString());
        stompConnection.unsubscribe(this.destination, hashMap);
        StompFrame receive = stompConnection.receive();
        Assert.assertEquals("RECEIPT", receive.getAction());
        Assert.assertEquals(hashMap.get("receipt"), (String) receive.getHeaders().get("receipt-id"));
    }

    protected void sendMessage(StompConnection stompConnection, int i) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("receipt", UUID.randomUUID().toString());
        stompConnection.send(this.destination, Integer.toString(i), (String) null, hashMap);
        StompFrame receive = stompConnection.receive();
        Assert.assertEquals("RECEIPT", receive.getAction());
        Assert.assertEquals(hashMap.get("receipt"), (String) receive.getHeaders().get("receipt-id"));
    }

    protected StompConnection stompConnect() throws Exception {
        return stompConnect(null);
    }

    protected StompConnection stompConnect(String str) throws Exception {
        StompConnection stompConnection = new StompConnection();
        stompConnection.open(createSocket(new URI(this.bindAddress)));
        stompConnection.connect("system", "manager", str);
        return stompConnection;
    }

    protected Socket createSocket(URI uri) throws IOException {
        return new Socket("127.0.0.1", uri.getPort());
    }

    protected String getTopicName() {
        return getClass().getName() + ".Messages";
    }

    protected void stompDisconnect(StompConnection stompConnection) throws Exception {
        if (stompConnection != null) {
            stompConnection.disconnect(UUID.randomUUID().toString());
            if (!stompConnection.receive().getAction().equals("RECEIPT")) {
                throw new Exception("Failed to receive receipt for disconnect.");
            }
            stompConnection.close();
        }
    }
}
