package io.vertx.test.core;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.test.core.EventBusTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

/* loaded from: input_file:io/vertx/test/core/ClusteredEventBusTestBase.class */
public class ClusteredEventBusTestBase extends EventBusTestBase {
    protected static final String ADDRESS1 = "some-address1";

    @Override // io.vertx.test.core.VertxTestBase
    protected ClusterManager getClusterManager() {
        return new FakeClusterManager();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.EventBusTestBase
    public <T, R> void testSend(T t, R r, Consumer<T> consumer, DeliveryOptions deliveryOptions) {
        if (this.vertices == null) {
            startNodes(2);
        }
        this.vertices[1].eventBus().consumer(ADDRESS1).handler(message -> {
            if (consumer == null) {
                assertTrue(message.isSend());
                assertEquals(r, message.body());
                if (deliveryOptions != null) {
                    assertNotNull(message.headers());
                    int size = deliveryOptions.getHeaders() != null ? deliveryOptions.getHeaders().size() : 0;
                    assertEquals(size, message.headers().size());
                    if (size != 0) {
                        for (Map.Entry entry : deliveryOptions.getHeaders().entries()) {
                            assertEquals(message.headers().get((String) entry.getKey()), entry.getValue());
                        }
                    }
                }
            } else {
                consumer.accept(message.body());
            }
            testComplete();
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            if (deliveryOptions == null) {
                this.vertices[0].eventBus().send(ADDRESS1, t);
            } else {
                this.vertices[0].eventBus().send(ADDRESS1, t, deliveryOptions);
            }
        });
        await();
    }

    @Override // io.vertx.test.core.EventBusTestBase
    protected <T> void testSend(T t, Consumer<T> consumer) {
        testSend(t, t, consumer, null);
    }

    @Override // io.vertx.test.core.EventBusTestBase
    protected <T> void testReply(T t, Consumer<T> consumer) {
        testReply(t, t, consumer, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.EventBusTestBase
    public <T, R> void testReply(T t, R r, Consumer<R> consumer, DeliveryOptions deliveryOptions) {
        if (this.vertices == null) {
            startNodes(2);
        }
        String randomUnicodeString = TestUtils.randomUnicodeString(1000);
        this.vertices[1].eventBus().consumer(ADDRESS1).handler(message -> {
            assertEquals(randomUnicodeString, message.body());
            if (deliveryOptions == null) {
                message.reply(t);
            } else {
                message.reply(t, deliveryOptions);
            }
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[0].eventBus().send(ADDRESS1, randomUnicodeString, onSuccess(message2 -> {
                if (consumer == null) {
                    assertTrue(message2.isSend());
                    assertEquals(r, message2.body());
                    if (deliveryOptions != null && deliveryOptions.getHeaders() != null) {
                        assertNotNull(message2.headers());
                        assertEquals(deliveryOptions.getHeaders().size(), message2.headers().size());
                        for (Map.Entry entry : deliveryOptions.getHeaders().entries()) {
                            assertEquals(message2.headers().get((String) entry.getKey()), entry.getValue());
                        }
                    }
                } else {
                    consumer.accept(message2.body());
                }
                testComplete();
            }));
        });
        await();
    }

    @Test
    public void testRegisterRemote1() {
        startNodes(2);
        String randomUnicodeString = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1).handler(message -> {
            assertEquals(randomUnicodeString, message.body());
            testComplete();
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, randomUnicodeString);
        });
        await();
    }

    @Test
    public void testRegisterRemote2() {
        startNodes(2);
        String randomUnicodeString = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1, message -> {
            assertEquals(randomUnicodeString, message.body());
            testComplete();
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, randomUnicodeString);
        });
        await();
    }

    @Override // io.vertx.test.core.EventBusTestBase
    protected <T> void testPublish(final T t, final Consumer<T> consumer) {
        final int i = 3;
        startNodes(3);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.vertices[2].eventBus().consumer(ADDRESS1).handler(new Handler<Message<T>>() { // from class: io.vertx.test.core.ClusteredEventBusTestBase.1MyHandler
            /* JADX WARN: Multi-variable type inference failed */
            public void handle(Message<T> message) {
                if (consumer == null) {
                    ClusteredEventBusTestBase.this.assertFalse(message.isSend());
                    ClusteredEventBusTestBase.this.assertEquals(t, message.body());
                } else {
                    consumer.accept(message.body());
                }
                if (atomicInteger.incrementAndGet() == i - 1) {
                    ClusteredEventBusTestBase.this.testComplete();
                }
            }
        }).completionHandler(new Handler<AsyncResult<Void>>() { // from class: io.vertx.test.core.ClusteredEventBusTestBase.1MyRegisterHandler
            public void handle(AsyncResult<Void> asyncResult) {
                ClusteredEventBusTestBase.this.assertTrue(asyncResult.succeeded());
                if (atomicInteger2.incrementAndGet() == 2) {
                    ClusteredEventBusTestBase.this.vertices[0].eventBus().publish(ClusteredEventBusTestBase.ADDRESS1, t);
                }
            }
        });
        this.vertices[1].eventBus().consumer(ADDRESS1).handler(new Handler<Message<T>>() { // from class: io.vertx.test.core.ClusteredEventBusTestBase.1MyHandler
            /* JADX WARN: Multi-variable type inference failed */
            public void handle(Message<T> message) {
                if (consumer == null) {
                    ClusteredEventBusTestBase.this.assertFalse(message.isSend());
                    ClusteredEventBusTestBase.this.assertEquals(t, message.body());
                } else {
                    consumer.accept(message.body());
                }
                if (atomicInteger.incrementAndGet() == i - 1) {
                    ClusteredEventBusTestBase.this.testComplete();
                }
            }
        }).completionHandler(new Handler<AsyncResult<Void>>() { // from class: io.vertx.test.core.ClusteredEventBusTestBase.1MyRegisterHandler
            public void handle(AsyncResult<Void> asyncResult) {
                ClusteredEventBusTestBase.this.assertTrue(asyncResult.succeeded());
                if (atomicInteger2.incrementAndGet() == 2) {
                    ClusteredEventBusTestBase.this.vertices[0].eventBus().publish(ClusteredEventBusTestBase.ADDRESS1, t);
                }
            }
        });
        this.vertices[0].eventBus().publish(ADDRESS1, t);
        await();
    }

    @Test
    public void testSendWhileUnsubscribing() throws Exception {
        startNodes(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        AbstractVerticle abstractVerticle = new AbstractVerticle() { // from class: io.vertx.test.core.ClusteredEventBusTestBase.1
            public void start() throws Exception {
                getVertx().runOnContext(r3 -> {
                    sendMsg();
                });
            }

            private void sendMsg() {
                if (atomicBoolean.get()) {
                    getVertx().eventBus().send("whatever", "marseille", asyncResult -> {
                        ReplyException cause = asyncResult.cause();
                        ClusteredEventBusTestBase.this.assertThat(cause, CoreMatchers.instanceOf(ReplyException.class));
                        ClusteredEventBusTestBase.this.assertEquals(ReplyFailure.NO_HANDLERS, cause.failureType());
                        ClusteredEventBusTestBase.this.testComplete();
                    });
                } else {
                    getVertx().eventBus().send("whatever", "marseille");
                    this.vertx.setTimer(1L, l -> {
                        sendMsg();
                    });
                }
            }
        };
        AbstractVerticle abstractVerticle2 = new AbstractVerticle() { // from class: io.vertx.test.core.ClusteredEventBusTestBase.2
            boolean unregisterCalled;

            public void start(Future<Void> future) throws Exception {
                MessageConsumer consumer = getVertx().eventBus().consumer("whatever");
                AtomicBoolean atomicBoolean2 = atomicBoolean;
                consumer.handler(message -> {
                    if (!this.unregisterCalled) {
                        consumer.unregister(asyncResult -> {
                            atomicBoolean2.set(true);
                        });
                        this.unregisterCalled = true;
                    }
                    message.reply("ok");
                }).completionHandler(future);
            }
        };
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.vertices[0].exceptionHandler(this::fail).deployVerticle(abstractVerticle2, onSuccess(str -> {
            this.vertices[1].exceptionHandler(this::fail).deployVerticle(abstractVerticle, onSuccess(str -> {
                countDownLatch.countDown();
            }));
        }));
        awaitLatch(countDownLatch);
        await();
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        this.vertices[0].close(asyncResult -> {
            countDownLatch2.countDown();
        });
        this.vertices[1].close(asyncResult2 -> {
            countDownLatch2.countDown();
        });
        awaitLatch(countDownLatch2);
    }

    @Test
    public void testMessageBodyInterceptor() throws Exception {
        String randomUnicodeString = TestUtils.randomUnicodeString(13);
        startNodes(2);
        waitFor(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.vertices[0].eventBus().registerCodec(new EventBusTestBase.StringLengthCodec()).consumer("whatever", message -> {
            assertEquals(randomUnicodeString.length(), ((Integer) message.body()).intValue());
            complete();
        }).completionHandler(asyncResult -> {
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
        EventBusTestBase.StringLengthCodec stringLengthCodec = new EventBusTestBase.StringLengthCodec();
        this.vertices[1].eventBus().registerCodec(stringLengthCodec).addInterceptor(sendContext -> {
            if ("whatever".equals(sendContext.message().address())) {
                assertEquals(randomUnicodeString, sendContext.sentBody());
                complete();
            }
            sendContext.next();
        }).send("whatever", randomUnicodeString, new DeliveryOptions().setCodecName(stringLengthCodec.name()));
        await();
    }
}
