package io.vertx.proton.streams.tck;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.proton.MockServer;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.streams.Delivery;
import io.vertx.proton.streams.ProtonStreams;
import io.vertx.proton.streams.impl.ProtonPublisherImpl;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

/* loaded from: input_file:io/vertx/proton/streams/tck/DeliveryPublisherVerificationTckTest.class */
public class DeliveryPublisherVerificationTckTest extends PublisherVerification<Delivery> {
    private static final Logger LOG = LoggerFactory.getLogger(DeliveryPublisherVerificationTckTest.class);
    private static final long DEFAULT_TIMEOUT = 200;
    private static final long DEFAULT_GC_TIMEOUT = 300;
    protected TestServer server;
    Vertx vertx;
    protected String testName;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/vertx/proton/streams/tck/DeliveryPublisherVerificationTckTest$TestServer.class */
    public class TestServer extends MockServer {
        private final AtomicBoolean detachLink;

        public TestServer(Vertx vertx, Handler<ProtonConnection> handler) throws ExecutionException, InterruptedException {
            super(vertx, handler);
            this.detachLink = new AtomicBoolean();
        }

        boolean getDetachLink() {
            return this.detachLink.get();
        }

        void setDetachLink(boolean z) {
            this.detachLink.set(z);
        }
    }

    public DeliveryPublisherVerificationTckTest() {
        super(new TestEnvironment(DEFAULT_TIMEOUT), DEFAULT_GC_TIMEOUT);
        this.vertx = Vertx.vertx();
        this.testName = "unknown";
    }

    public long maxElementsFromPublisher() {
        return publisherUnableToSignalOnComplete();
    }

    @BeforeMethod
    public void testName(Method method) throws Exception {
        this.testName = method.getName();
        LOG.trace("#### Test: " + this.testName);
    }

    @BeforeMethod
    public void setUp() throws Exception {
        super.setUp();
        try {
            this.server = createServer();
        } catch (Exception e) {
            throw new RuntimeException("Problem creating test server", e);
        }
    }

    @AfterMethod
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.close();
            this.server = null;
        }
    }

    public Publisher<Delivery> createPublisher(long j) {
        int actualPort = this.server.actualPort();
        ProtonClient create = ProtonClient.create(this.vertx);
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        create.connect("localhost", actualPort, asyncResult -> {
            if (asyncResult.succeeded()) {
                ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
                protonConnection.open();
                ProtonPublisherImpl createDeliveryConsumer = ProtonStreams.createDeliveryConsumer(protonConnection, this.testName);
                createDeliveryConsumer.setEmitOnConnectionEnd(false);
                atomicReference.set(createDeliveryConsumer);
            } else {
                LOG.error("Connection failed");
            }
            countDownLatch.countDown();
        });
        try {
            LOG.trace("Awaiting connection");
            LOG.trace("Client connected: " + countDownLatch.await(2L, TimeUnit.SECONDS));
            return (Publisher) atomicReference.get();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while creating publisher", e);
        }
    }

    public Publisher<Delivery> createFailedPublisher() {
        this.server.setDetachLink(true);
        return createPublisher(0L);
    }

    private TestServer createServer() throws Exception {
        return new TestServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Client connected: " + protonConnection.getRemoteContainer());
                protonConnection.open();
            }).closeHandler(asyncResult2 -> {
                LOG.trace("Client closing amqp connection: " + protonConnection.getRemoteContainer());
                protonConnection.close();
                protonConnection.disconnect();
            }).disconnectHandler(protonConnection -> {
                LOG.trace("Client socket disconnected: " + protonConnection.getRemoteContainer());
                protonConnection.disconnect();
            }).sessionOpenHandler(protonSession -> {
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                if (!this.server.getDetachLink()) {
                    LOG.trace("Sending to client from: " + protonSender.getRemoteSource().getAddress());
                    protonSender.setSource(protonSender.getRemoteSource());
                    AtomicLong atomicLong = new AtomicLong();
                    AtomicLong atomicLong2 = new AtomicLong();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        while (!protonSender.sendQueueFull()) {
                            LOG.trace("Sending message to client");
                            Message message = ProtonHelper.message("Hello World from Server!" + atomicLong.incrementAndGet());
                            atomicLong2.incrementAndGet();
                            protonSender.send(message, protonDelivery -> {
                                LOG.trace("The message was received by the client.");
                            });
                        }
                    });
                    protonSender.closeHandler(asyncResult3 -> {
                        ((ProtonSender) asyncResult3.result()).close();
                    });
                }
                protonSender.open();
                if (this.server.getDetachLink()) {
                    protonSender.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Publisher Requested"), ""));
                    protonSender.close();
                }
            });
        });
    }
}
