package io.vertx.proton.streams.tck;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.MockServer;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.impl.ProtonConnectionImpl;
import io.vertx.proton.streams.impl.ProtonSubscriberImpl;
import io.vertx.proton.streams.impl.ProtonSubscriberWrapperImpl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

/* loaded from: input_file:io/vertx/proton/streams/tck/MessageSubscriberWhiteboxVerificationTckTest.class */
public class MessageSubscriberWhiteboxVerificationTckTest extends SubscriberWhiteboxVerification<Message> {
    private static final Logger LOG = LoggerFactory.getLogger(MessageSubscriberWhiteboxVerificationTckTest.class);
    private static final long DEFAULT_TIMEOUT = 500;
    TestServer server;
    Vertx vertx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/proton/streams/tck/MessageSubscriberWhiteboxVerificationTckTest$TestServer.class */
    public class TestServer extends MockServer {
        private final AtomicBoolean detachLink;

        public TestServer(Vertx vertx, Handler<ProtonConnection> handler) throws ExecutionException, InterruptedException {
            super(vertx, handler);
            this.detachLink = new AtomicBoolean();
        }

        boolean getDetachLink() {
            return this.detachLink.get();
        }
    }

    public MessageSubscriberWhiteboxVerificationTckTest() {
        super(new TestEnvironment(DEFAULT_TIMEOUT));
        this.vertx = Vertx.vertx();
    }

    @BeforeMethod
    public void setUp() throws Exception {
        super.setUp();
        try {
            this.server = createServer();
        } catch (Exception e) {
            throw new RuntimeException("Problem creating test server", e);
        }
    }

    @AfterMethod
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.close();
            this.server = null;
        }
    }

    public Subscriber<Message> createSubscriber(SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<Message> whiteboxSubscriberProbe) {
        int actualPort = this.server.actualPort();
        ProtonClient create = ProtonClient.create(this.vertx);
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        create.connect("localhost", actualPort, asyncResult -> {
            if (asyncResult.succeeded()) {
                ProtonConnectionImpl protonConnectionImpl = (ProtonConnection) asyncResult.result();
                protonConnectionImpl.open();
                atomicReference.set(new ProtonSubscriberWrapperImpl(new ProtonSubscriberImpl("myAddress", protonConnectionImpl)) { // from class: io.vertx.proton.streams.tck.MessageSubscriberWhiteboxVerificationTckTest.1
                    public void onSubscribe(final Subscription subscription) {
                        super.onSubscribe(subscription);
                        whiteboxSubscriberProbe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { // from class: io.vertx.proton.streams.tck.MessageSubscriberWhiteboxVerificationTckTest.1.1
                            public void triggerRequest(long j) {
                                subscription.request(j);
                            }

                            public void signalCancel() {
                                subscription.cancel();
                            }
                        });
                    }

                    public void onNext(Message message) {
                        whiteboxSubscriberProbe.registerOnNext(message);
                        super.onNext(message);
                    }

                    public void onError(Throwable th) {
                        whiteboxSubscriberProbe.registerOnError(th);
                        super.onError(th);
                    }

                    public void onComplete() {
                        whiteboxSubscriberProbe.registerOnComplete();
                        super.onComplete();
                    }
                });
            } else {
                LOG.error("Connection failed");
            }
            countDownLatch.countDown();
        });
        try {
            LOG.trace("Awaiting connection");
            LOG.trace("Client connected: " + countDownLatch.await(2L, TimeUnit.SECONDS));
            return (Subscriber) atomicReference.get();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while creating subscriber", e);
        }
    }

    /* renamed from: createElement, reason: merged with bridge method [inline-methods] */
    public Message m15createElement(int i) {
        Message message = Proton.message();
        message.setBody(new AmqpValue("hello" + i));
        return message;
    }

    private TestServer createServer() throws Exception {
        return new TestServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Client connected: " + protonConnection.getRemoteContainer());
                protonConnection.open();
            }).closeHandler(asyncResult2 -> {
                LOG.trace("Client closing amqp connection: " + protonConnection.getRemoteContainer());
                protonConnection.close();
                protonConnection.disconnect();
            }).disconnectHandler(protonConnection -> {
                LOG.trace("Client socket disconnected: " + protonConnection.getRemoteContainer());
                protonConnection.disconnect();
            }).sessionOpenHandler(protonSession -> {
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                if (!this.server.getDetachLink()) {
                    LOG.trace("Receiving from client to: " + protonReceiver.getRemoteTarget().getAddress());
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                    protonReceiver.handler((protonDelivery, message) -> {
                        String address = message.getAddress();
                        if (address == null) {
                            address = protonReceiver.getRemoteTarget().getAddress();
                        }
                        AmqpValue body = message.getBody();
                        LOG.trace("message to:" + address + ", body: " + (body instanceof AmqpValue ? (String) body.getValue() : "unknown"));
                    });
                    protonReceiver.closeHandler(asyncResult3 -> {
                        ((ProtonReceiver) asyncResult3.result()).close();
                    });
                }
                protonReceiver.open();
                if (this.server.getDetachLink()) {
                    protonReceiver.setCondition(new ErrorCondition(Symbol.getSymbol("Failed Subscriber Requested"), ""));
                    protonReceiver.close();
                }
            });
        });
    }
}
