package io.vertx.amqp;

import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/amqp/ReceptionTest.class */
public class ReceptionTest extends ArtemisTestBase {

    @Rule
    public TestName name = new TestName();
    private Vertx vertx;

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
    }

    @Override // io.vertx.amqp.ArtemisTestBase
    @After
    public void tearDown() throws InterruptedException {
        super.tearDown();
        if (this.vertx != null) {
            this.vertx.close();
        }
    }

    @Test(timeout = 20000)
    public void testReceiveBasicMessage(TestContext testContext) {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        String str2 = "appPropKey";
        String str3 = "appPropValue";
        Async async = testContext.async();
        Async async2 = testContext.async();
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost(this.host).setPort(this.port).setPassword(this.password).setUsername(this.username));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName, asyncResult -> {
                testContext.assertEquals(methodName, ((AmqpReceiver) asyncResult.result()).address(), "address was not as expected");
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    testContext.assertNotNull(amqpMessage, "message was null");
                    testContext.assertNotNull(amqpMessage.bodyAsString(), "amqp message body content was null");
                    testContext.assertEquals(str, amqpMessage.bodyAsString(), "amqp message body was not as expected");
                    testContext.assertTrue(amqpMessage.applicationProperties() != null, "application properties element not present");
                    JsonObject applicationProperties = amqpMessage.applicationProperties();
                    testContext.assertTrue(applicationProperties.containsKey(str2), "expected property key element not present");
                    testContext.assertEquals(str3, applicationProperties.getValue(str2), "app property value not as expected");
                    create.close(asyncResult -> {
                        async.complete();
                    });
                });
                ProtonClient.create(this.vertx).connect(this.host, this.port, this.username, this.password, asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    Message message = Proton.message();
                    message.setBody(new AmqpValue(str));
                    HashMap hashMap = new HashMap();
                    hashMap.put(str2, str3);
                    message.setApplicationProperties(new ApplicationProperties(hashMap));
                    ProtonConnection open = ((ProtonConnection) asyncResult.result()).open();
                    open.createSender(methodName).open().send(message, protonDelivery -> {
                        testContext.assertNotNull(protonDelivery.getRemoteState(), "message had no remote state");
                        testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message was not accepted");
                        testContext.assertTrue(protonDelivery.remotelySettled(), "message was not settled");
                        open.closeHandler(asyncResult -> {
                            open.disconnect();
                        }).close();
                        async2.complete();
                    });
                });
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testReceiveBasicMessageAsStream(TestContext testContext) {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password));
        create.connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName, asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    testContext.assertNotNull(amqpMessage, "message was null");
                    String bodyAsString = amqpMessage.bodyAsString();
                    testContext.assertNotNull(bodyAsString, "amqp message body content was null");
                    testContext.assertEquals(str, bodyAsString, "amqp message body was not as expected");
                    create.close(asyncResult -> {
                        testContext.assertTrue(asyncResult.succeeded());
                        async.complete();
                    });
                });
                ProtonClient.create(this.vertx).connect(this.host, this.port, this.username, this.password, asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    Message message = Proton.message();
                    message.setBody(new AmqpValue(str));
                    ProtonConnection open = ((ProtonConnection) asyncResult.result()).open();
                    open.createSender(methodName).open().send(message, protonDelivery -> {
                        testContext.assertNotNull(protonDelivery.getRemoteState(), "message had no remote state");
                        testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message was not accepted");
                        testContext.assertTrue(protonDelivery.remotelySettled(), "message was not settled");
                        open.closeHandler(asyncResult -> {
                            open.disconnect();
                        }).close();
                        async2.complete();
                    });
                });
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessageAfterDelayedHandlerAddition(TestContext testContext) {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        int i = 5;
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost(this.host).setPort(this.port).setPassword(this.password).setUsername(this.username));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                sendAFewMessages(testContext, methodName, str, async2, i);
                this.vertx.setTimer(500L, l -> {
                    AtomicInteger atomicInteger = new AtomicInteger();
                    ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        String bodyAsString = amqpMessage.bodyAsString();
                        testContext.assertNotNull(bodyAsString, "amqp message " + incrementAndGet + " body content was null");
                        testContext.assertEquals(str, bodyAsString, "amqp message " + incrementAndGet + " body not as expected");
                        if (incrementAndGet == i) {
                            create.close(asyncResult -> {
                                testContext.assertTrue(asyncResult.succeeded());
                                async.complete();
                            });
                        }
                    });
                });
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    private void sendAFewMessages(TestContext testContext, String str, String str2, Async async, int i) {
        ProtonClient.create(this.vertx).connect(this.host, this.port, this.username, this.password, asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str2));
            ProtonConnection open = ((ProtonConnection) asyncResult.result()).open();
            ProtonSender open2 = open.createSender(str).open();
            for (int i2 = 1; i2 <= i; i2++) {
                int i3 = i2;
                open2.send(message, protonDelivery -> {
                    testContext.assertNotNull(protonDelivery.getRemoteState(), "message " + i3 + " had no remote state");
                    testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message " + i3 + " was not accepted");
                    testContext.assertTrue(protonDelivery.remotelySettled(), "message " + i3 + " was not settled");
                    if (i3 == i) {
                        open.closeHandler(asyncResult -> {
                            open.disconnect();
                        }).close();
                        async.complete();
                    }
                });
            }
        });
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessageAfterPause(TestContext testContext) {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicLong atomicLong = new AtomicLong();
            AtomicReference atomicReference = new AtomicReference();
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                atomicReference.set(asyncResult.result());
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    String bodyAsString = amqpMessage.bodyAsString();
                    testContext.assertNotNull(bodyAsString, "message " + incrementAndGet + " jsonObject body was null");
                    testContext.assertNotNull(bodyAsString, "amqp message " + incrementAndGet + " body content was null");
                    testContext.assertEquals(str, bodyAsString, "amqp message " + incrementAndGet + " body not as expected");
                    if (incrementAndGet == 2) {
                        ((AmqpReceiver) atomicReference.get()).pause();
                        atomicLong.set(System.currentTimeMillis());
                        this.vertx.setTimer(500L, l -> {
                            ((AmqpReceiver) atomicReference.get()).resume();
                        });
                    }
                    if (incrementAndGet > 2) {
                        testContext.assertTrue(atomicLong.get() > 0, "pause start not initialised before receiving msg" + incrementAndGet);
                        testContext.assertTrue(System.currentTimeMillis() + 500 > atomicLong.get(), "delivery occurred before expected");
                    }
                    if (incrementAndGet == 5) {
                        create.close(asyncResult -> {
                            testContext.assertTrue(asyncResult.succeeded());
                            async.complete();
                        });
                    }
                });
                sendAFewMessages(testContext, methodName, str, async2, 5);
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }
}
