package io.vertx.proton;

import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.proton.MockServer;
import io.vertx.proton.impl.ProtonConnectionImpl;
import io.vertx.proton.impl.ProtonMetaDataSupportImpl;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/proton/ProtonClientTest.class */
public class ProtonClientTest extends MockServerTestBase {
    private static Logger LOG = LoggerFactory.getLogger(ProtonClientTest.class);
    private static final Symbol[] ANON_RELAY_ONLY = {ProtonConnectionImpl.ANONYMOUS_RELAY};
    private static final Symbol[] NO_CAPABILITIES = new Symbol[0];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/proton/ProtonClientTest$ConPropValidator.class */
    public interface ConPropValidator {
        void validate(TestContext testContext, Map<Symbol, Object> map);
    }

    /* loaded from: input_file:io/vertx/proton/ProtonClientTest$ProductVersionConPropValidator.class */
    private class ProductVersionConPropValidator implements ConPropValidator {
        private String expectedProduct;
        private String expectedVersion;

        public ProductVersionConPropValidator(String str, String str2) {
            this.expectedProduct = str;
            this.expectedVersion = str2;
        }

        @Override // io.vertx.proton.ProtonClientTest.ConPropValidator
        public void validate(TestContext testContext, Map<Symbol, Object> map) {
            testContext.assertNotNull(map, "no properties map provided");
            testContext.assertTrue(map.containsKey(ProtonMetaDataSupportImpl.PRODUCT_KEY), "product not present");
            testContext.assertNotNull(map.get(ProtonMetaDataSupportImpl.VERSION_KEY), "unexpected product");
            testContext.assertEquals(this.expectedProduct, map.get(ProtonMetaDataSupportImpl.PRODUCT_KEY), "unexpected product");
            testContext.assertTrue(map.containsKey(ProtonMetaDataSupportImpl.VERSION_KEY), "version not present");
            testContext.assertNotNull(map.get(ProtonMetaDataSupportImpl.VERSION_KEY), "unexpected version");
            testContext.assertEquals(this.expectedVersion, map.get(ProtonMetaDataSupportImpl.VERSION_KEY), "unexpected version");
        }
    }

    @Test(timeout = 20000)
    public void testConnectionOpenResultReturnsConnection(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
                testContext.assertNotNull(protonConnection, "opened connection result should not be null");
                protonConnection.disconnect();
                async.complete();
            }).open();
        });
    }

    @Test(timeout = 20000)
    public void testConnectionDisconnectedDuringCreation(TestContext testContext) {
        this.server.close();
        Async async = testContext.async();
        NetServer createNetServer = this.vertx.createNetServer();
        createNetServer.connectHandler(netSocket -> {
            netSocket.pause();
            this.vertx.setTimer(50L, l -> {
                netSocket.close();
            });
        });
        createNetServer.listen(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ProtonClient.create(this.vertx).connect("localhost", createNetServer.actualPort(), asyncResult -> {
                testContext.assertFalse(asyncResult.succeeded());
                async.complete();
            });
        });
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testGetConnectionFromLink(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            ProtonSender createSender = protonConnection.createSender("some-address");
            testContext.assertNotNull(createSender);
            testContext.assertEquals(protonConnection, createSender.getSession().getConnection());
            testContext.assertTrue(protonConnection == createSender.getSession().getConnection());
            ProtonSession createSession = protonConnection.createSession();
            testContext.assertNotNull(createSession);
            ProtonSender createSender2 = createSession.createSender("some-address");
            testContext.assertNotNull(createSender2);
            testContext.assertEquals(createSession, createSender2.getSession());
            testContext.assertTrue(createSession == createSender2.getSession());
            testContext.assertEquals(protonConnection, createSender2.getSession().getConnection());
            testContext.assertTrue(protonConnection == createSender2.getSession().getConnection());
            protonConnection.disconnect();
            async.complete();
        });
    }

    @Test(timeout = 20000)
    public void testClientIdentification(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.setContainer("foo").openHandler(asyncResult -> {
                testContext.assertEquals("foo", protonConnection.getContainer());
                testContext.assertEquals("pong: foo", protonConnection.getRemoteContainer());
                protonConnection.disconnect();
                async.complete();
            }).open();
        });
    }

    @Test(timeout = 20000)
    public void testRemoteDisconnectHandling(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            testContext.assertFalse(protonConnection.isDisconnected());
            protonConnection.disconnectHandler(protonConnection -> {
                testContext.assertTrue(protonConnection.isDisconnected());
                LOG.trace("Client disconnect handler called");
                async.complete();
            });
            ProtonSender open = protonConnection.createSender((String) null).open();
            LOG.trace("Sending request for remote disconnect");
            open.send(ProtonHelper.message("command", "disconnect"));
        });
    }

    @Test(timeout = 20000)
    public void testLocalDisconnectHandling(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            testContext.assertFalse(protonConnection.isDisconnected());
            protonConnection.disconnectHandler(protonConnection -> {
                LOG.trace("Client disconnect handler called");
                testContext.assertTrue(protonConnection.isDisconnected());
                async.complete();
            });
            LOG.trace("Client disconnecting connection");
            protonConnection.disconnect();
        });
    }

    @Test(timeout = 20000)
    public void testSetVirtualHostOnConnect(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, new ProtonClientOptions().setVirtualHost("example.com"), protonConnection -> {
            testContext.assertFalse(protonConnection.isDisconnected());
            testContext.assertEquals("example.com", protonConnection.getHostname());
            async.complete();
        });
    }

    @Test(timeout = 20000)
    public void testRequestResponse(TestContext testContext) {
        sendReceiveEcho(testContext, "Hello World");
    }

    @Test(timeout = 20000)
    public void testTransferLargeMessage(TestContext testContext) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 5242880; i++) {
            sb.append(97 + (i % 26));
        }
        sendReceiveEcho(testContext, sb.toString());
    }

    @Test(timeout = 20000)
    public void testTransferLargeMessageWithSmallerSessionWindow(TestContext testContext) {
        Async async = testContext.async();
        int i = 2097152;
        testContext.assertTrue(5242880 >= 2 * 2097152);
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < 5242880; i2++) {
            sb.append(97 + (i2 % 26));
        }
        String sb2 = sb.toString();
        ProtonClient.create(this.vertx).connect("localhost", this.server.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            protonConnection.open();
            ProtonSession createSession = protonConnection.createSession();
            createSession.setIncomingCapacity(i);
            createSession.open();
            ProtonReceiver createReceiver = createSession.createReceiver(MockServer.Addresses.echo.toString());
            createReceiver.handler((protonDelivery, message) -> {
                LOG.trace("Got message");
                testContext.assertEquals(sb2, (String) getMessageBody(testContext, message));
                async.complete();
                protonConnection.disconnect();
            });
            createReceiver.open();
            createSession.createSender(MockServer.Addresses.echo.toString()).open().send(ProtonHelper.message("echo", sb2));
        });
    }

    private void sendReceiveEcho(TestContext testContext, String str) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            protonConnection.createReceiver(MockServer.Addresses.echo.toString()).handler((protonDelivery, message) -> {
                testContext.assertEquals(str, (String) getMessageBody(testContext, message));
                protonConnection.disconnect();
                async.complete();
            }).open();
            protonConnection.createSender(MockServer.Addresses.echo.toString()).open().send(ProtonHelper.message("echo", str));
        });
    }

    @Test(timeout = 20000)
    public void testIsAnonymousRelaySupported(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            testContext.assertFalse(protonConnection.isAnonymousRelaySupported(), "Connection not yet open, so result should be false");
            protonConnection.openHandler(asyncResult -> {
                testContext.assertTrue(protonConnection.isAnonymousRelaySupported(), "Connection now open, server supports relay, should be true");
                protonConnection.disconnect();
                async.complete();
            }).open();
        });
    }

    @Test(timeout = 20000)
    public void testAnonymousRelayIsNotSupported(TestContext testContext) {
        this.server.getProtonServer().setAdvertiseAnonymousRelayCapability(false);
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            testContext.assertFalse(protonConnection.isAnonymousRelaySupported(), "Connection not yet open, so result should be false");
            protonConnection.openHandler(asyncResult -> {
                testContext.assertFalse(protonConnection.isAnonymousRelaySupported(), "Connection now open, server does not support relay, should be false");
                protonConnection.disconnect();
                async.complete();
            }).open();
        });
    }

    @Test(timeout = 20000)
    public void testAnonymousSenderEnforcesMessageHasAddress(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            ProtonSender createSender = protonConnection.createSender((String) null);
            createSender.open();
            Message message = Proton.message();
            message.setBody(new AmqpValue("bodyString"));
            try {
                createSender.send(message);
                testContext.fail("Send should have thrown IAE due to lack of message address");
            } catch (IllegalArgumentException e) {
                protonConnection.disconnect();
                async.complete();
            }
        });
    }

    @Test(timeout = 20000)
    public void testNonAnonymousSenderDoesNotEnforceMessageHasAddress(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            ProtonSender createSender = protonConnection.createSender(MockServer.Addresses.drop.toString());
            createSender.open();
            Message message = Proton.message();
            message.setBody(new AmqpValue("bodyString"));
            createSender.send(message);
            protonConnection.disconnect();
            async.complete();
        });
    }

    @Test(timeout = 20000)
    public void testDefaultAnonymousSenderSpecifiesLinkTarget(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                processConnectionAnonymousSenderSpecifiesLinkTarget(testContext, async, protonConnection);
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.openHandler(asyncResult -> {
                    LOG.trace("Client connection opened");
                    ProtonSender createSender = protonConnection2.createSender((String) null);
                    createSender.open();
                    createSender.send(ProtonHelper.message("ignored", "content"));
                }).open();
            });
            async.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    private void processConnectionAnonymousSenderSpecifiesLinkTarget(TestContext testContext, Async async, ProtonConnection protonConnection) {
        protonConnection.sessionOpenHandler(protonSession -> {
            protonSession.open();
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            LOG.trace("Server receiver opened");
            protonReceiver.handler((protonDelivery, message) -> {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Server got msg: " + getMessageBody(testContext, message));
                }
                protonConnection.disconnect();
                async.complete();
            });
            Target remoteTarget = protonReceiver.getRemoteTarget();
            testContext.assertNotNull(remoteTarget, "Client did not set a link target");
            testContext.assertNull(remoteTarget.getAddress(), "Unexpected target address");
            protonReceiver.open();
        });
        protonConnection.openHandler(asyncResult -> {
            protonConnection.open();
        });
    }

    @Test(timeout = 20000)
    public void testConfigureDynamicReceiver(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        String str = "testConfigureDynamicReceiver:" + UUID.randomUUID();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.closeHandler(asyncResult2 -> {
                        protonSender.close();
                    });
                    testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                    Source remoteSource = protonSender.getRemoteSource();
                    testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source to be requested");
                    testContext.assertNull(remoteSource.getAddress(), "expected no source address to be set");
                    Source copy = remoteSource.copy();
                    copy.setAddress(str);
                    protonSender.setSource(copy);
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async2.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonReceiver createReceiver = protonConnection2.createReceiver((String) null, new ProtonLinkOptions().setDynamic(true));
                createReceiver.openHandler(asyncResult -> {
                    LOG.trace("Client link opened");
                    testContext.assertEquals(str, createReceiver.getRemoteAddress(), "unexpected remote address");
                    Source remoteSource = createReceiver.getRemoteSource();
                    testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source");
                    testContext.assertEquals(str, remoteSource.getAddress(), "unexpected source address");
                    async.complete();
                    protonConnection2.disconnect();
                });
                createReceiver.open();
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessagesWithLowerPrefetch(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            protonConnection.createReceiver(MockServer.Addresses.five_messages.toString()).setPrefetch(2).handler((protonDelivery, message) -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                if (incrementAndGet == 5) {
                    LOG.trace("Got msg 5, completing async");
                    async.complete();
                    protonConnection.disconnect();
                }
            }).open();
        });
    }

    @Test(timeout = 20000)
    public void testDelayedInitialCreditWithPrefetchDisabled(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            long currentTimeMillis = System.currentTimeMillis();
            ProtonReceiver createReceiver = protonConnection.createReceiver(MockServer.Addresses.two_messages.toString());
            createReceiver.handler((protonDelivery, message) -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                switch (incrementAndGet) {
                    case 1:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        testContext.assertTrue(atomicBoolean.get(), "Initial credit not yet granted, so we should not have received message 1 yet!");
                        testContext.assertTrue(System.currentTimeMillis() > currentTimeMillis + 250, "Message received before expected time delay elapsed!");
                        LOG.trace("Got msg 1");
                        this.vertx.setTimer(250L, l -> {
                            LOG.trace("Granting additional credit");
                            atomicBoolean2.set(true);
                            createReceiver.flow(1);
                        });
                        return;
                    case 2:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        testContext.assertTrue(atomicBoolean2.get(), "Additional credit not yet granted, so we should not have received message " + incrementAndGet + " yet!");
                        testContext.assertTrue(System.currentTimeMillis() > currentTimeMillis + 500, "Message received before expected time delay elapsed!");
                        LOG.trace("Got msg 2, completing async");
                        async.complete();
                        protonConnection.disconnect();
                        return;
                    default:
                        return;
                }
            }).setPrefetch(0).open();
            this.vertx.setTimer(250L, l -> {
                LOG.trace("Flowing initial credit");
                atomicBoolean.set(true);
                createReceiver.flow(1);
            });
        });
    }

    @Test(timeout = 20000)
    public void testImmediateInitialCreditWithPrefetchDisabled(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            ProtonReceiver createReceiver = protonConnection.createReceiver(MockServer.Addresses.five_messages.toString());
            createReceiver.handler((protonDelivery, message) -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                switch (incrementAndGet) {
                    case 1:
                    case 2:
                    case 3:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        return;
                    case 4:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        this.vertx.setTimer(1000L, l -> {
                            LOG.trace("Flowing more credit");
                            atomicBoolean.set(true);
                            createReceiver.flow(1);
                        });
                        this.vertx.setTimer(500L, l2 -> {
                            LOG.trace("Checking msg 5 not received yet");
                            testContext.assertEquals(4, Integer.valueOf(atomicInteger.get()));
                        });
                        return;
                    case 5:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        testContext.assertTrue(atomicBoolean.get(), "Additional credit not yet granted, so we should not have received message 5 yet!");
                        LOG.trace("Got msg 5, completing async");
                        async.complete();
                        protonConnection.disconnect();
                        return;
                    default:
                        return;
                }
            }).setPrefetch(0).flow(4).open();
        });
    }

    @Test(timeout = 20000)
    public void testDrainWithSomeCreditsUsed(TestContext testContext) throws Exception {
        doDrainWithSomeCreditUsedTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testDrainWithSomeCreditsUsedSenderAutoDrained(TestContext testContext) throws Exception {
        doDrainWithSomeCreditUsedTestImpl(testContext, true);
    }

    private void doDrainWithSomeCreditUsedTestImpl(TestContext testContext, boolean z) throws Exception {
        this.server.close();
        int i = 5;
        MockServer doDrainTestServerSetup = doDrainTestServerSetup(testContext, z, !z, 2);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ProtonClient.create(this.vertx).connect("localhost", doDrainTestServerSetup.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            protonConnection.open();
            ProtonReceiver createReceiver = protonConnection.createReceiver("some-address");
            createReceiver.handler((protonDelivery, message) -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                switch (incrementAndGet) {
                    case 1:
                    case 2:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        testContext.assertFalse(atomicBoolean.get(), "Drain should not yet be completed!");
                        return;
                    default:
                        testContext.fail("Should only get 2 messages");
                        return;
                }
            }).setPrefetch(0).open();
            createReceiver.flow(i);
            createReceiver.drain(10000L, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded(), "Drain should have succeeded");
                testContext.assertEquals(2, Integer.valueOf(atomicInteger.get()), "Drain should not yet be completed! Unexpected message count");
                atomicBoolean.set(true);
                async.complete();
                protonConnection.disconnect();
            });
        });
    }

    @Test(timeout = 20000)
    public void testDrainWithAllCreditsUsed(TestContext testContext) throws Exception {
        this.server.close();
        int i = 5;
        MockServer doDrainTestServerSetup = doDrainTestServerSetup(testContext, true, false, 5);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ProtonClient.create(this.vertx).connect("localhost", doDrainTestServerSetup.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            protonConnection.open();
            ProtonReceiver createReceiver = protonConnection.createReceiver("some-address");
            createReceiver.handler((protonDelivery, message) -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                switch (incrementAndGet) {
                    case 1:
                    case 2:
                    case 3:
                    case 4:
                    case 5:
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        testContext.assertFalse(atomicBoolean.get(), "Drain should not yet be completed!");
                        return;
                    default:
                        testContext.fail("Should only get 5 messages");
                        return;
                }
            }).setPrefetch(0).open();
            createReceiver.flow(i);
            createReceiver.drain(10000L, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded(), "Drain should have succeeded");
                testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(atomicInteger.get()), "Drain should not yet be completed! Unexpected message count");
                atomicBoolean.set(true);
                async.complete();
                protonConnection.disconnect();
            });
        });
    }

    @Test(timeout = 20000)
    public void testDrainWithNoCredit(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            ProtonReceiver createReceiver = protonConnection.createReceiver(MockServer.Addresses.no_messages.toString());
            createReceiver.setPrefetch(0).open();
            createReceiver.drain(0L, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded(), "Drain should have succeeded");
                async.complete();
                protonConnection.disconnect();
            });
        });
    }

    @Test(timeout = 20000)
    public void testDrainTimeout(TestContext testContext) throws Exception {
        this.server.close();
        long j = 1000;
        MockServer doDrainTestServerSetup = doDrainTestServerSetup(testContext, false, false, 0);
        Async async = testContext.async();
        ProtonClient.create(this.vertx).connect("localhost", doDrainTestServerSetup.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            protonConnection.open();
            ProtonReceiver createReceiver = protonConnection.createReceiver("some-address");
            createReceiver.setPrefetch(0).open();
            createReceiver.flow(1);
            long nanoTime = System.nanoTime();
            createReceiver.drain(j, asyncResult -> {
                long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                testContext.assertTrue(asyncResult.failed(), "Drain should have failed due to timeout");
                testContext.assertTrue(millis >= j, "Timeout fired earlier than expected: " + millis);
                async.complete();
                protonConnection.disconnect();
            });
        });
    }

    private MockServer doDrainTestServerSetup(TestContext testContext, boolean z, boolean z2, int i) throws Exception {
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                if (!z) {
                    protonSender.setAutoDrained(false);
                }
                LOG.trace("Server sender open");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                protonSender.setSource(remoteSource.copy());
                protonSender.open();
                protonSender.sendQueueDrainHandler(protonSender -> {
                    if (protonSender.getDrain()) {
                        for (int i2 = 1; i2 <= i; i2++) {
                            protonSender.send(ProtonHelper.message(String.valueOf(i2)));
                        }
                        if (z2) {
                            protonSender.drained();
                        }
                    }
                });
            });
        });
    }

    @Test(timeout = 20000)
    public void testRemoteCloseDefaultSessionWithError(TestContext testContext) throws Exception {
        remoteCloseDefaultSessionTestImpl(testContext, true);
    }

    @Test(timeout = 20000)
    public void testRemoteCloseDefaultSessionWithoutError(TestContext testContext) throws Exception {
        remoteCloseDefaultSessionTestImpl(testContext, false);
    }

    private void remoteCloseDefaultSessionTestImpl(TestContext testContext, boolean z) throws InterruptedException, ExecutionException {
        this.server.close();
        Async async = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                Promise promise = Promise.promise();
                protonConnection.sessionOpenHandler(protonSession -> {
                    LOG.trace("Server session open");
                    protonSession.open();
                    promise.complete(protonSession);
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver open");
                    protonReceiver.open();
                    testContext.assertTrue(promise.future().succeeded(), "Session future not [yet] succeeded");
                    LOG.trace("Server session close");
                    ProtonSession protonSession2 = (ProtonSession) promise.future().result();
                    if (z) {
                        ErrorCondition errorCondition = new ErrorCondition();
                        errorCondition.setCondition(AmqpError.INTERNAL_ERROR);
                        errorCondition.setDescription("error description");
                        protonSession2.setCondition(errorCondition);
                    }
                    protonSession2.close();
                });
                protonConnection.openHandler(asyncResult -> {
                    LOG.trace("Server connection open");
                    protonConnection.open();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.openHandler(asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded(), "Connection open failed");
                    LOG.trace("Client connection opened");
                    protonConnection2.createSender((String) null).open();
                });
                protonConnection2.closeHandler(asyncResult2 -> {
                    LOG.trace("Connection close handler called (as espected): " + asyncResult2.cause());
                    async.complete();
                });
                protonConnection2.open();
            });
            async.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testReceiverOpenWithAtLeastOnceQos(TestContext testContext) throws Exception {
        doOpenLinkWithQosTestImpl(testContext, true, ProtonQoS.AT_LEAST_ONCE);
    }

    @Test(timeout = 20000)
    public void testReceiverOpenWithAtMostOnceQos(TestContext testContext) throws Exception {
        doOpenLinkWithQosTestImpl(testContext, true, ProtonQoS.AT_MOST_ONCE);
    }

    @Test(timeout = 20000)
    public void testSenderOpenWithAtLeastOnceQos(TestContext testContext) throws Exception {
        doOpenLinkWithQosTestImpl(testContext, false, ProtonQoS.AT_LEAST_ONCE);
    }

    @Test(timeout = 20000)
    public void testSenderOpenWithAtMostOnceQos(TestContext testContext) throws Exception {
        doOpenLinkWithQosTestImpl(testContext, false, ProtonQoS.AT_MOST_ONCE);
    }

    public void doOpenLinkWithQosTestImpl(TestContext testContext, boolean z, ProtonQoS protonQoS) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                if (z) {
                    protonConnection.receiverOpenHandler(protonReceiver -> {
                        testContext.assertEquals(protonQoS, protonReceiver.getRemoteQoS(), "unexpected remote qos value");
                        LOG.trace("Server receiver opened");
                        protonReceiver.open();
                        async.complete();
                    });
                } else {
                    protonConnection.senderOpenHandler(protonSender -> {
                        testContext.assertEquals(protonQoS, protonSender.getRemoteQoS(), "unexpected remote qos value");
                        LOG.trace("Server sender opened");
                        protonSender.open();
                        async.complete();
                    });
                }
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.openHandler(asyncResult -> {
                    LOG.trace("Client connection opened");
                    ProtonSender createSender = z ? protonConnection2.createSender((String) null) : protonConnection2.createReceiver("some-address");
                    createSender.setQoS(protonQoS);
                    ProtonSender protonSender = createSender;
                    createSender.openHandler(asyncResult -> {
                        LOG.trace("Client link opened");
                        testContext.assertEquals(protonQoS, protonSender.getRemoteQoS(), "unexpected remote qos value");
                        async2.complete();
                    });
                    createSender.open();
                }).open();
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesDefault(TestContext testContext) throws Exception {
        ProductVersionConPropValidator productVersionConPropValidator = new ProductVersionConPropValidator("vertx-proton", ProtonMetaDataSupportImpl.VERSION);
        doConnectionPropertiesTestImpl(testContext, false, null, productVersionConPropValidator, null, productVersionConPropValidator);
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesSetNonDefaultWithoutProductVersion(TestContext testContext) throws Exception {
        Symbol valueOf = Symbol.valueOf("custom-client-prop-key");
        String str = "custom-client-prop-value";
        Symbol valueOf2 = Symbol.valueOf("custom-server-prop-key");
        String str2 = "custom-server-prop-value";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(valueOf, "custom-client-prop-value");
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(valueOf2, "custom-server-prop-value");
        doConnectionPropertiesTestImpl(testContext, true, linkedHashMap, (testContext2, map) -> {
            new ProductVersionConPropValidator("vertx-proton", ProtonMetaDataSupportImpl.VERSION).validate(testContext2, map);
            testContext.assertTrue(map.containsKey(valueOf), "custom client prop not present");
            testContext.assertEquals(str, map.get(valueOf), "unexpected custom client prop value");
        }, linkedHashMap2, (testContext3, map2) -> {
            new ProductVersionConPropValidator("vertx-proton", ProtonMetaDataSupportImpl.VERSION).validate(testContext3, map2);
            testContext.assertTrue(map2.containsKey(valueOf2), "custom server prop not present");
            testContext.assertEquals(str2, map2.get(valueOf2), "unexpected custom server prop value");
        });
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesSetWithCustomProductVersion(TestContext testContext) throws Exception {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(ProtonMetaDataSupportImpl.PRODUCT_KEY, "custom-product");
        linkedHashMap.put(ProtonMetaDataSupportImpl.VERSION_KEY, "0.1.2.3.custom");
        ConPropValidator productVersionConPropValidator = new ProductVersionConPropValidator("custom-product", "0.1.2.3.custom");
        doConnectionPropertiesTestImpl(testContext, true, linkedHashMap, productVersionConPropValidator, linkedHashMap, productVersionConPropValidator);
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesSetExplicitNull(TestContext testContext) throws Exception {
        ConPropValidator conPropValidator = (testContext2, map) -> {
            testContext.assertNull(map, "expected no properties map");
        };
        doConnectionPropertiesTestImpl(testContext, true, null, conPropValidator, null, conPropValidator);
    }

    public void doConnectionPropertiesTestImpl(TestContext testContext, boolean z, Map<Symbol, Object> map, ConPropValidator conPropValidator, Map<Symbol, Object> map2, ConPropValidator conPropValidator2) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    if (z) {
                        protonConnection.setProperties(map2);
                    }
                    conPropValidator.validate(testContext, protonConnection.getRemoteProperties());
                    protonConnection.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                if (z) {
                    protonConnection2.setProperties(map);
                }
                protonConnection2.openHandler(asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    LOG.trace("Client connection opened");
                    conPropValidator2.validate(testContext, protonConnection2.getRemoteProperties());
                    async2.complete();
                }).open();
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testDesiredAndOfferedCapabilities(TestContext testContext) throws Exception {
        Symbol valueOf = Symbol.valueOf("foo");
        Symbol valueOf2 = Symbol.valueOf("bar");
        Symbol valueOf3 = Symbol.valueOf("baz");
        doTestCapabilities(testContext, new Symbol[]{valueOf, valueOf2}, new Symbol[]{valueOf3}, new Symbol[]{valueOf3}, new Symbol[]{valueOf2}, true);
    }

    @Test(timeout = 20000)
    public void testNotSettingDesiredAndOfferedCapabilitiesRetainsAnonRelay(TestContext testContext) throws Exception {
        Symbol valueOf = Symbol.valueOf("foo");
        doTestCapabilities(testContext, null, new Symbol[]{valueOf}, new Symbol[]{valueOf}, null, false);
        doTestCapabilities(testContext, null, new Symbol[]{valueOf}, new Symbol[]{valueOf}, null, true);
    }

    private void doTestCapabilities(TestContext testContext, Symbol[] symbolArr, Symbol[] symbolArr2, Symbol[] symbolArr3, Symbol[] symbolArr4, boolean z) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.setDesiredCapabilities(symbolArr3);
                    if (z) {
                        protonConnection.setOfferedCapabilities(symbolArr4);
                    }
                    testContext.assertTrue(Arrays.equals(symbolArr2 == null ? NO_CAPABILITIES : symbolArr2, protonConnection.getRemoteOfferedCapabilities()));
                    testContext.assertTrue(Arrays.equals(symbolArr == null ? NO_CAPABILITIES : symbolArr, protonConnection.getRemoteDesiredCapabilities()));
                    protonConnection.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.setOfferedCapabilities(symbolArr2);
                protonConnection2.setDesiredCapabilities(symbolArr);
                protonConnection2.openHandler(asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    LOG.trace("Client connection opened");
                    testContext.assertTrue(Arrays.equals(symbolArr3 == null ? NO_CAPABILITIES : symbolArr3, protonConnection2.getRemoteDesiredCapabilities()));
                    testContext.assertTrue(Arrays.equals(symbolArr4 == null ? z ? NO_CAPABILITIES : ANON_RELAY_ONLY : symbolArr4, protonConnection2.getRemoteOfferedCapabilities()));
                    async2.complete();
                }).open();
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testDetachHandlingWithSender(TestContext testContext) throws Exception {
        doDetachHandlingTestImpl(testContext, true);
    }

    @Test(timeout = 20000)
    public void testDetachHandlingWithReceiver(TestContext testContext) throws Exception {
        doDetachHandlingTestImpl(testContext, false);
    }

    public void doDetachHandlingTestImpl(TestContext testContext, boolean z) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                if (z) {
                    protonConnection.receiverOpenHandler(protonReceiver -> {
                        LOG.trace("Server receiver opened");
                        protonReceiver.open();
                        async2.complete();
                        protonReceiver.closeHandler(asyncResult2 -> {
                            atomicBoolean.set(true);
                        });
                        protonReceiver.detachHandler(asyncResult3 -> {
                            testContext.assertTrue(asyncResult3.succeeded(), "expected non-errored async result");
                            protonReceiver.detach();
                            async3.complete();
                        });
                    });
                } else {
                    protonConnection.senderOpenHandler(protonSender -> {
                        LOG.trace("Server sender opened");
                        protonSender.open();
                        async2.complete();
                        protonSender.closeHandler(asyncResult2 -> {
                            atomicBoolean.set(true);
                        });
                        protonSender.detachHandler(asyncResult3 -> {
                            testContext.assertTrue(asyncResult3.succeeded(), "expected non-errored async result");
                            protonSender.detach();
                            async3.complete();
                        });
                    });
                }
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.openHandler(asyncResult -> {
                    LOG.trace("Client connection opened");
                    ProtonSender createSender = z ? protonConnection2.createSender((String) null) : protonConnection2.createReceiver("some-address");
                    createSender.closeHandler(asyncResult -> {
                        atomicBoolean2.set(true);
                    });
                    createSender.detachHandler(asyncResult2 -> {
                        LOG.trace("Client link detached");
                        async4.complete();
                    });
                    ProtonSender protonSender = createSender;
                    createSender.openHandler(asyncResult3 -> {
                        LOG.trace("Client link opened");
                        async.complete();
                        protonSender.detach();
                    });
                    createSender.open();
                }).open();
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            async3.awaitSuccess();
            async4.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
            testContext.assertFalse(atomicBoolean.get(), "server link close handler should not have fired");
            testContext.assertFalse(atomicBoolean2.get(), "client link close handler should not have fired");
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testMaxMessageSizeWithSingleFrameExceedingLimit(TestContext testContext) throws Exception {
        doMaxMessageSizeTestImpl(testContext, 10000, 5000, 5100);
    }

    @Test(timeout = 20000)
    public void testMaxMessageSizeWithMultipleFramesExceedingOnLastFrameOfDelivery(TestContext testContext) throws Exception {
        doMaxMessageSizeTestImpl(testContext, 10000, 30000, 30100);
    }

    @Test(timeout = 20000)
    public void testMaxMessageSizeWithMultipleFramesExceedingInCentralFrameOfDelivery(TestContext testContext) throws Exception {
        doMaxMessageSizeTestImpl(testContext, 10000, 50000, 150000);
    }

    private void doMaxMessageSizeTestImpl(TestContext testContext, int i, int i2, int i3) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        Async async5 = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        UnsignedLong valueOf = UnsignedLong.valueOf(i2);
        UnsignedLong valueOf2 = UnsignedLong.valueOf(5 * valueOf.longValue());
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    testContext.assertEquals(valueOf, protonSender.getRemoteMaxMessageSize(), "unexpected remote max message size at server");
                    testContext.assertNull(protonSender.getMaxMessageSize(), "Expected no value to be set");
                    protonSender.setMaxMessageSize(valueOf2);
                    testContext.assertEquals(valueOf2, protonSender.getMaxMessageSize(), "Expected value to now be set");
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        if (async2.isCompleted()) {
                            return;
                        }
                        testContext.assertTrue(protonSender.getCredit() > 3, "Unexpectedly low credit: " + protonSender.getCredit());
                        async2.complete();
                        sendMessagesForMaxMessageSizeTest(i3, protonSender);
                    });
                    protonSender.detachHandler(asyncResult2 -> {
                        ErrorCondition remoteCondition = protonSender.getRemoteCondition();
                        testContext.assertNotNull(remoteCondition);
                        testContext.assertEquals(remoteCondition.getCondition(), LinkError.MESSAGE_SIZE_EXCEEDED);
                        testContext.assertTrue(asyncResult2.failed());
                        protonSender.detach();
                        async4.complete();
                    });
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async.complete();
                });
            });
            ProtonClient create = ProtonClient.create(this.vertx);
            ProtonClientOptions protonClientOptions = new ProtonClientOptions();
            protonClientOptions.setMaxFrameSize(i);
            create.connect(protonClientOptions, "localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.openHandler(asyncResult -> {
                    LOG.trace("Client connection opened");
                    ProtonReceiver createReceiver = protonConnection2.createReceiver("some-address");
                    testContext.assertNull(createReceiver.getMaxMessageSize(), "Expected no value to be set");
                    createReceiver.setMaxMessageSize(valueOf);
                    testContext.assertEquals(valueOf, createReceiver.getMaxMessageSize(), "Expected value to now be set");
                    createReceiver.openHandler(asyncResult -> {
                        LOG.trace("Client link opened");
                        testContext.assertEquals(valueOf2, createReceiver.getRemoteMaxMessageSize(), "unexpected remote max message size at client");
                        async3.complete();
                        createReceiver.handler((protonDelivery, message) -> {
                            validateMessage(testContext, atomicInteger.incrementAndGet(), "small-first-message", message);
                        });
                        createReceiver.maxMessageSizeExceededHandler(protonReceiver -> {
                            atomicInteger2.incrementAndGet();
                            async5.complete();
                        });
                    });
                    createReceiver.open();
                }).open();
            });
            async.awaitSuccess();
            async3.awaitSuccess();
            async2.awaitSuccess();
            async5.awaitSuccess();
            async4.awaitSuccess();
            testContext.assertEquals(1, Integer.valueOf(atomicInteger2.get()), "Unexpected number of handler executions");
            testContext.assertEquals(1, Integer.valueOf(atomicInteger.get()), "Unexpected number of deliveries");
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    private void sendMessagesForMaxMessageSizeTest(int i, ProtonSender protonSender) {
        Message create = Message.Factory.create();
        create.setBody(new AmqpValue("small-first-message"));
        protonSender.send(create);
        Message create2 = Message.Factory.create();
        byte[] bArr = new byte[i];
        for (int i2 = 0; i2 < bArr.length; i2++) {
            bArr[i2] = (byte) (i2 % 256);
        }
        create2.setBody(new Data(new Binary(bArr)));
        protonSender.send(create2);
        Message create3 = Message.Factory.create();
        create3.setBody(new AmqpValue("small-last-message"));
        protonSender.send(create3);
    }

    @Test(timeout = 20000)
    public void testLinkPropertiesAndCapabilities(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Map singletonMap = Collections.singletonMap(Symbol.valueOf("client"), true);
        Map singletonMap2 = Collections.singletonMap(Symbol.valueOf("server"), true);
        Symbol valueOf = Symbol.valueOf("1");
        Symbol valueOf2 = Symbol.valueOf("1");
        Symbol valueOf3 = Symbol.valueOf("1");
        Symbol[] symbolArr = {valueOf, valueOf2};
        Symbol[] symbolArr2 = {valueOf3};
        Symbol[] symbolArr3 = {valueOf3, Symbol.valueOf("1")};
        Symbol[] symbolArr4 = {valueOf2};
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    testContext.assertEquals(singletonMap, protonSender.getRemoteProperties());
                    testContext.assertTrue(Arrays.equals(symbolArr, protonSender.getRemoteDesiredCapabilities()));
                    testContext.assertTrue(Arrays.equals(symbolArr2, protonSender.getRemoteOfferedCapabilities()));
                    LOG.trace("Server sender opened");
                    protonSender.setProperties(singletonMap2);
                    protonSender.setDesiredCapabilities(symbolArr3);
                    protonSender.setOfferedCapabilities(symbolArr4);
                    protonSender.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.openHandler(asyncResult -> {
                    LOG.trace("Client connection opened");
                    ProtonReceiver createReceiver = protonConnection2.createReceiver("some-address");
                    createReceiver.setProperties(singletonMap);
                    createReceiver.setDesiredCapabilities(symbolArr);
                    createReceiver.setOfferedCapabilities(symbolArr2);
                    createReceiver.openHandler(asyncResult -> {
                        LOG.trace("Client link opened");
                        testContext.assertEquals(singletonMap2, createReceiver.getRemoteProperties(), "Unexpected value for link properties");
                        testContext.assertTrue(Arrays.equals(symbolArr3, createReceiver.getRemoteDesiredCapabilities()));
                        testContext.assertTrue(Arrays.equals(symbolArr4, createReceiver.getRemoteOfferedCapabilities()));
                        async2.complete();
                    });
                    createReceiver.open();
                }).open();
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testFailedMessageDecoding(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.open();
                    AtomicInteger atomicInteger = new AtomicInteger();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        switch (incrementAndGet) {
                            case 1:
                                testContext.assertEquals(1000, Integer.valueOf(protonSender.getCredit()), "Unexpected initial credit level when send handler fired for round 1");
                                protonSender.send(ProtonHelper.message(String.valueOf(incrementAndGet)), protonDelivery -> {
                                    testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "Unexpected state for delivery 1 after update");
                                });
                                return;
                            case 2:
                                MessageImpl messageImpl = (MessageImpl) Mockito.mock(MessageImpl.class);
                                Mockito.when(Integer.valueOf(messageImpl.encode((WritableBuffer) Mockito.any(WritableBuffer.class)))).then(invocationOnMock -> {
                                    byte[] bArr = new byte[5];
                                    ((WritableBuffer) invocationOnMock.getArgument(0)).put(bArr, 0, bArr.length);
                                    return Integer.valueOf(bArr.length);
                                });
                                protonSender.send(messageImpl, protonDelivery2 -> {
                                    Modified remoteState = protonDelivery2.getRemoteState();
                                    testContext.assertTrue(remoteState instanceof Modified, "Unexpected state for delivery 2 after update");
                                    testContext.assertTrue(remoteState.getDeliveryFailed().booleanValue(), "Expected true");
                                    testContext.assertTrue(remoteState.getUndeliverableHere().booleanValue(), "Expected true");
                                });
                                return;
                            case 3:
                                protonSender.send(ProtonHelper.message(String.valueOf(incrementAndGet)), protonDelivery3 -> {
                                    testContext.assertTrue(protonDelivery3.getRemoteState() instanceof Accepted, "Unexpected state for delivery 3 after update");
                                    this.vertx.setTimer(500L, l -> {
                                        testContext.assertEquals(1000, Integer.valueOf(protonSender.getCredit()), "Unexpected credit level after messages processed");
                                        async.complete();
                                    });
                                });
                                return;
                            default:
                                return;
                        }
                    });
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                protonConnection2.createReceiver("address").handler((protonDelivery, message) -> {
                    switch (atomicInteger.incrementAndGet()) {
                        case 1:
                            validateMessage(testContext, 1, "1", message);
                            return;
                        case 2:
                            validateMessage(testContext, 3, "3", message);
                            async2.complete();
                            return;
                        default:
                            return;
                    }
                }).open();
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    private ProtonServer createServer(Handler<ProtonConnection> handler) throws InterruptedException, ExecutionException {
        ProtonServer create = ProtonServer.create(this.vertx);
        create.connectHandler(handler);
        FutureHandler asyncResult = FutureHandler.asyncResult();
        create.listen(0, asyncResult);
        asyncResult.get();
        return create;
    }

    private void validateMessage(TestContext testContext, int i, Object obj, Message message) {
        Object messageBody = getMessageBody(testContext, message);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got msg " + i + ", body: " + messageBody);
        }
        testContext.assertEquals(obj, messageBody, "Unexpected message body");
    }

    private Object getMessageBody(TestContext testContext, Message message) {
        AmqpValue body = message.getBody();
        testContext.assertNotNull(body);
        testContext.assertTrue(body instanceof AmqpValue);
        return body.getValue();
    }
}
