package io.vertx.kafka.client.common.tracing;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.producer.impl.KafkaProducerImpl;
import io.vertx.kafka.client.tests.KafkaClusterTestBase;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/common/tracing/TracingTest.class */
public class TracingTest extends KafkaClusterTestBase {
    private static String CONTEXT_CONSUMER_SPAN = "--received-rq--";
    private Vertx vertx;
    private KafkaWriteStream<String, String> producer;
    private KafkaReadStream<String, String> consumer;
    private TestTracer tracer;

    /* loaded from: input_file:io/vertx/kafka/client/common/tracing/TracingTest$TestTracer.class */
    private static class TestTracer implements VertxTracer<String, String> {
        private final TestContext ctx;
        private String topic;
        private Async done;
        private LongAdder failuresCount;
        private LongAdder sentCount;
        private LongAdder receivedCount;
        private String peerAddress;
        private String host;
        private String port;

        private TestTracer(TestContext testContext) {
            this.ctx = testContext;
        }

        public <R> String receiveRequest(Context context, SpanKind spanKind, TracingPolicy tracingPolicy, R r, String str, Iterable<Map.Entry<String, String>> iterable, TagExtractor<R> tagExtractor) {
            this.receivedCount.decrement();
            if (this.receivedCount.intValue() < 0) {
                this.ctx.fail("Unexpected call to receiveRequest");
            }
            this.ctx.assertEquals(SpanKind.MESSAGING, spanKind);
            context.putLocal(TracingTest.CONTEXT_CONSUMER_SPAN, "span-" + this.receivedCount.intValue());
            Map extract = tagExtractor.extract(r);
            this.ctx.assertEquals("kafka_receive", str);
            this.ctx.assertEquals(this.peerAddress, extract.get("peer.address"));
            this.ctx.assertEquals(this.host, extract.get("peer.hostname"));
            this.ctx.assertEquals(this.port, extract.get("peer.port"));
            this.ctx.assertEquals(this.topic, extract.get("message_bus.destination"));
            this.ctx.assertEquals("kafka", extract.get("peer.service"));
            this.done.countDown();
            return "SPAN-CONSUMER";
        }

        public <R> void sendResponse(Context context, R r, String str, Throwable th, TagExtractor<R> tagExtractor) {
            this.ctx.assertEquals("SPAN-CONSUMER", str);
            context.removeLocal(TracingTest.CONTEXT_CONSUMER_SPAN);
            this.done.countDown();
        }

        public <R> String sendRequest(Context context, SpanKind spanKind, TracingPolicy tracingPolicy, R r, String str, BiConsumer<String, String> biConsumer, TagExtractor<R> tagExtractor) {
            this.sentCount.decrement();
            if (this.sentCount.intValue() < 0) {
                this.ctx.fail("Unexpected call to sendRequest");
            }
            this.ctx.assertEquals(SpanKind.MESSAGING, spanKind);
            Map extract = tagExtractor.extract(r);
            this.ctx.assertEquals("kafka_send", str);
            this.ctx.assertEquals(this.peerAddress, extract.get("peer.address"));
            this.ctx.assertEquals(this.host, extract.get("peer.hostname"));
            this.ctx.assertEquals(this.port, extract.get("peer.port"));
            this.ctx.assertEquals(this.topic, extract.get("message_bus.destination"));
            this.ctx.assertEquals("kafka", extract.get("peer.service"));
            this.done.countDown();
            return "SPAN-PRODUCER";
        }

        public <R> void receiveResponse(Context context, R r, String str, Throwable th, TagExtractor<R> tagExtractor) {
            this.ctx.assertEquals("SPAN-PRODUCER", str);
            if (th != null) {
                this.failuresCount.increment();
            }
            this.done.countDown();
        }

        void init(String str, int i, int i2) {
            init(str, i, i2, "localhost:9092,localhost:9093", "localhost", "9093");
        }

        void init(String str, int i, int i2, String str2, String str3, String str4) {
            this.sentCount = new LongAdder();
            this.sentCount.add(i);
            this.receivedCount = new LongAdder();
            this.receivedCount.add(i2);
            this.done = this.ctx.async((2 * i) + (2 * i2));
            this.topic = str;
            this.failuresCount = new LongAdder();
            this.peerAddress = str2;
            this.host = str3;
            this.port = str4;
        }

        void assertAllDone(int i) {
            this.done.await(2000L);
            this.ctx.assertEquals(Integer.valueOf(i), Integer.valueOf(this.failuresCount.intValue()));
            this.ctx.assertEquals(0, Integer.valueOf(this.sentCount.intValue()));
            this.ctx.assertEquals(0, Integer.valueOf(this.receivedCount.intValue()));
        }

        public /* bridge */ /* synthetic */ void receiveResponse(Context context, Object obj, Object obj2, Throwable th, TagExtractor tagExtractor) {
            receiveResponse(context, (Context) obj, (String) obj2, th, (TagExtractor<Context>) tagExtractor);
        }

        /* renamed from: sendRequest, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m1sendRequest(Context context, SpanKind spanKind, TracingPolicy tracingPolicy, Object obj, String str, BiConsumer biConsumer, TagExtractor tagExtractor) {
            return sendRequest(context, spanKind, tracingPolicy, (TracingPolicy) obj, str, (BiConsumer<String, String>) biConsumer, (TagExtractor<TracingPolicy>) tagExtractor);
        }

        public /* bridge */ /* synthetic */ void sendResponse(Context context, Object obj, Object obj2, Throwable th, TagExtractor tagExtractor) {
            sendResponse(context, (Context) obj, (String) obj2, th, (TagExtractor<Context>) tagExtractor);
        }

        /* renamed from: receiveRequest, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m2receiveRequest(Context context, SpanKind spanKind, TracingPolicy tracingPolicy, Object obj, String str, Iterable iterable, TagExtractor tagExtractor) {
            return receiveRequest(context, spanKind, tracingPolicy, (TracingPolicy) obj, str, (Iterable<Map.Entry<String, String>>) iterable, (TagExtractor<TracingPolicy>) tagExtractor);
        }
    }

    @Before
    public void beforeTest(TestContext testContext) {
        this.tracer = new TestTracer(testContext);
        this.vertx = Vertx.vertx(new VertxOptions().setTracingOptions(new TracingOptions().setFactory(tracingOptions -> {
            return this.tracer;
        })));
    }

    @After
    public void afterTest(TestContext testContext) {
        close(testContext, this.producer);
        close(testContext, this.consumer);
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    private KafkaWriteStream<String, String> configureProducer(TestContext testContext, Consumer<KafkaClientOptions> consumer) {
        KafkaClientOptions config = new KafkaClientOptions().setConfig(mapConfig(kafkaCluster.useTo().getProducerProperties("testTracing_producer"))).setConfig("key.serializer", StringSerializer.class).setConfig("value.serializer", StringSerializer.class);
        consumer.accept(config);
        KafkaWriteStream<String, String> producer = producer(this.vertx, config);
        testContext.getClass();
        producer.exceptionHandler(testContext::fail);
        return producer;
    }

    private KafkaReadStream<String, String> configureConsumer(TestContext testContext, Consumer<KafkaClientOptions> consumer) {
        KafkaClientOptions config = new KafkaClientOptions().setConfig(mapConfig(kafkaCluster.useTo().getConsumerProperties("testTracing_consumer", "testTracing_consumer", OffsetResetStrategy.EARLIEST))).setConfig("key.deserializer", StringDeserializer.class).setConfig("value.deserializer", StringDeserializer.class);
        consumer.accept(config);
        KafkaReadStream<String, String> create = KafkaReadStream.create(this.vertx, config);
        testContext.getClass();
        create.exceptionHandler(testContext::fail);
        return create;
    }

    @Test
    public void testTracing(TestContext testContext) {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testTracing_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        this.producer = producer(this.vertx, producerProperties);
        KafkaWriteStream<String, String> kafkaWriteStream = this.producer;
        testContext.getClass();
        kafkaWriteStream.exceptionHandler(testContext::fail);
        KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(this.vertx, this.producer);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties("testTracing_consumer", "testTracing_consumer", OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = KafkaReadStream.create(this.vertx, consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        testContext.getClass();
        kafkaReadStream.exceptionHandler(testContext::fail);
        AtomicInteger atomicInteger = new AtomicInteger(1000);
        this.consumer.handler(consumerRecord -> {
            Context currentContext = Vertx.currentContext();
            testContext.assertNotNull(currentContext);
            testContext.assertEquals("span-" + atomicInteger.decrementAndGet(), (String) currentContext.getLocal(CONTEXT_CONSUMER_SPAN));
        });
        this.consumer.subscribe(Collections.singleton("TestTracing"));
        this.tracer.init("TestTracing", 1000, 1000);
        for (int i = 0; i < 1000; i++) {
            kafkaProducerImpl.write(KafkaProducerRecord.create("TestTracing", "key-" + i, "value-" + i, 0));
        }
        this.tracer.assertAllDone(0);
    }

    @Test
    public void testTracingFailure(TestContext testContext) {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testTracing_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        this.tracer.init("topic", 2, 0);
        Date date = new Date();
        KafkaProducer create = KafkaProducer.create(this.vertx, producerProperties);
        create.write(KafkaProducerRecord.create("topic", "key", "valid string"));
        create.write(KafkaProducerRecord.create("topic", "key", date));
        this.tracer.assertAllDone(1);
    }

    @Test
    public void testTracingIgnoreConsumer(TestContext testContext) {
        Async async = testContext.async(10);
        this.producer = configureProducer(testContext, kafkaClientOptions -> {
            kafkaClientOptions.setTracingPolicy(TracingPolicy.ALWAYS);
        });
        KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(this.vertx, this.producer);
        this.consumer = configureConsumer(testContext, kafkaClientOptions2 -> {
            kafkaClientOptions2.setTracingPolicy(TracingPolicy.IGNORE);
        });
        this.consumer.handler(consumerRecord -> {
            async.countDown();
        });
        this.consumer.subscribe(Collections.singleton("TestTracingIgnoreC"));
        this.tracer.init("TestTracingIgnoreC", 10, 0);
        for (int i = 0; i < 10; i++) {
            kafkaProducerImpl.write(KafkaProducerRecord.create("TestTracingIgnoreC", "key-" + i, "value-" + i, 0));
        }
        this.tracer.assertAllDone(0);
        async.awaitSuccess(10000L);
    }

    @Test
    public void testTracingIgnoreProducer(TestContext testContext) {
        Async async = testContext.async(10);
        this.producer = configureProducer(testContext, kafkaClientOptions -> {
            kafkaClientOptions.setTracingPolicy(TracingPolicy.IGNORE);
        });
        KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(this.vertx, this.producer);
        this.consumer = configureConsumer(testContext, kafkaClientOptions2 -> {
            kafkaClientOptions2.setTracingPolicy(TracingPolicy.ALWAYS);
        });
        this.consumer.handler(consumerRecord -> {
            async.countDown();
        });
        this.consumer.subscribe(Collections.singleton("TestTracingIgnoreP"));
        this.tracer.init("TestTracingIgnoreP", 0, 10);
        for (int i = 0; i < 10; i++) {
            kafkaProducerImpl.write(KafkaProducerRecord.create("TestTracingIgnoreP", "key-" + i, "value-" + i, 0));
        }
        this.tracer.assertAllDone(0);
        async.awaitSuccess(10000L);
    }

    @Test
    public void testTracingNullPeerAddress(TestContext testContext) {
        Async async = testContext.async(10);
        this.producer = configureProducer(testContext, kafkaClientOptions -> {
            kafkaClientOptions.setTracePeerAddress((String) null);
        });
        KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(this.vertx, this.producer);
        this.consumer = configureConsumer(testContext, kafkaClientOptions2 -> {
            kafkaClientOptions2.setTracePeerAddress((String) null);
        });
        this.consumer.handler(consumerRecord -> {
            async.countDown();
        });
        this.consumer.subscribe(Collections.singleton("TestTracingNullAddr"));
        this.tracer.init("TestTracingNullAddr", 10, 10);
        for (int i = 0; i < 10; i++) {
            kafkaProducerImpl.write(KafkaProducerRecord.create("TestTracingNullAddr", "key-" + i, "value-" + i, 0));
        }
        this.tracer.assertAllDone(0);
        async.awaitSuccess(10000L);
    }

    @Test
    public void testTracingOtherPeerAddress(TestContext testContext) {
        Async async = testContext.async(10);
        this.producer = configureProducer(testContext, kafkaClientOptions -> {
            kafkaClientOptions.setTracePeerAddress("http://other:9090");
        });
        KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(this.vertx, this.producer);
        this.consumer = configureConsumer(testContext, kafkaClientOptions2 -> {
            kafkaClientOptions2.setTracePeerAddress("http://other:9090");
        });
        this.consumer.handler(consumerRecord -> {
            async.countDown();
        });
        this.consumer.subscribe(Collections.singleton("TestTracingOtherAddr"));
        this.tracer.init("TestTracingOtherAddr", 10, 10, "http://other:9090", "other", "9090");
        for (int i = 0; i < 10; i++) {
            kafkaProducerImpl.write(KafkaProducerRecord.create("TestTracingOtherAddr", "key-" + i, "value-" + i, 0));
        }
        this.tracer.assertAllDone(0);
        async.awaitSuccess(10000L);
    }
}
