package io.vertx.servicediscovery.types;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscovery;
import io.vertx.servicediscovery.ServiceDiscoveryOptions;
import io.vertx.servicediscovery.ServiceReference;
import io.vertx.servicediscovery.impl.DiscoveryImpl;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/servicediscovery/types/MessageSourceTest.class */
public class MessageSourceTest {
    private Vertx vertx;
    private ServiceDiscovery discovery;

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.discovery = new DiscoveryImpl(this.vertx, new ServiceDiscoveryOptions());
    }

    @After
    public void tearDown() {
        this.discovery.close();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.vertx.close(asyncResult -> {
            atomicBoolean.set(true);
        });
        Awaitility.await().untilAtomic(atomicBoolean, Is.is(true));
    }

    @Test
    public void test() throws InterruptedException {
        Random random = new Random();
        this.vertx.setPeriodic(10L, l -> {
            this.vertx.eventBus().publish("data", Double.valueOf(random.nextDouble()));
        });
        Record createRecord = MessageSource.createRecord("Hello", "data");
        this.discovery.publish(createRecord, asyncResult -> {
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(createRecord.getRegistration() != null);
        });
        AtomicReference atomicReference = new AtomicReference();
        this.discovery.getRecord(new JsonObject().put("name", "Hello"), asyncResult2 -> {
            atomicReference.set(asyncResult2.result());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        ServiceReference reference = this.discovery.getReference((Record) atomicReference.get());
        MessageConsumer messageConsumer = (MessageConsumer) reference.get();
        ArrayList arrayList = new ArrayList();
        messageConsumer.handler(message -> {
            arrayList.add(message.body());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!arrayList.isEmpty());
        });
        reference.release();
        int size = arrayList.size();
        Thread.sleep(500L);
        Assertions.assertThat(arrayList.size()).isEqualTo(size);
        reference.release();
    }
}
