package io.vertx.kafka.client.tests;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.serialization.BufferDeserializer;
import io.vertx.kafka.client.serialization.BufferSerializer;
import io.vertx.kafka.client.serialization.VertxSerdes;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/tests/CodecsTest.class */
public class CodecsTest extends KafkaClusterTestBase {
    private final String topic = "the_topic";
    private Vertx vertx;
    private KafkaWriteStream<?, ?> producer;
    private KafkaReadStream<?, ?> consumer;

    /* loaded from: input_file:io/vertx/kafka/client/tests/CodecsTest$CustomData.class */
    public static class CustomData {
        private String field1;
        private int field2;

        public CustomData(String str, int i) {
            this.field1 = str;
            this.field2 = i;
        }

        public JsonObject toJson() {
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("field1", this.field1);
            jsonObject.put("field2", Integer.valueOf(this.field2));
            return jsonObject;
        }
    }

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        close(testContext, this.producer);
        close(testContext, this.consumer);
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testBufferSerializer() {
        testSerializer(Buffer.class, Buffer.buffer("Hello"));
    }

    @Test
    public void testJsonObjectSerializer() {
        testSerializer(JsonObject.class, new JsonObject().put("s", "the-string").put("the-number", 3).put("the-boolean", true));
    }

    @Test
    public void testJsonArraySerializer() {
        testSerializer(JsonArray.class, new JsonArray().add(3).add("s").add(true));
    }

    private <T> void testSerializer(Class<T> cls, T t) {
        Serde serdeFrom = VertxSerdes.serdeFrom(cls);
        Deserializer deserializer = serdeFrom.deserializer();
        Serializer serializer = serdeFrom.serializer();
        Assert.assertEquals("Should get the original value after serialization and deserialization", t, deserializer.deserialize("the_topic", serializer.serialize("the_topic", t)));
        Assert.assertEquals("Should support null in serialization and deserialization", (Object) null, deserializer.deserialize("the_topic", serializer.serialize("the_topic", (Object) null)));
    }

    @Test
    public void testStringCodec(TestContext testContext) throws Exception {
        testCodec(testContext, "testStringCodec", properties -> {
            return producer(this.vertx, properties, String.class, String.class);
        }, properties2 -> {
            return KafkaReadStream.create(this.vertx, properties2, String.class, String.class);
        }, num -> {
            return "key-" + num;
        }, num2 -> {
            return "value-" + num2;
        });
    }

    @Test
    public void testBufferCodec(TestContext testContext) throws Exception {
        testCodec(testContext, "testBufferCodec", properties -> {
            return producer(this.vertx, properties, Buffer.class, Buffer.class);
        }, properties2 -> {
            return KafkaReadStream.create(this.vertx, properties2, Buffer.class, Buffer.class);
        }, num -> {
            return Buffer.buffer("key-" + num);
        }, num2 -> {
            return Buffer.buffer("value-" + num2);
        });
    }

    @Test
    public void testBufferCodecString(TestContext testContext) throws Exception {
        testCodec(testContext, "testBufferCodecString", properties -> {
            properties.put("key.serializer", BufferSerializer.class);
            properties.put("value.serializer", BufferSerializer.class);
            return KafkaWriteStream.create(this.vertx, properties);
        }, properties2 -> {
            properties2.put("key.deserializer", BufferDeserializer.class);
            properties2.put("value.deserializer", BufferDeserializer.class);
            return KafkaReadStream.create(this.vertx, properties2);
        }, num -> {
            return Buffer.buffer("key-" + num);
        }, num2 -> {
            return Buffer.buffer("value-" + num2);
        });
    }

    private <K, V> void testCodec(TestContext testContext, String str, Function<Properties, KafkaWriteStream<K, V>> function, Function<Properties, KafkaReadStream<K, V>> function2, Function<Integer, K> function3, Function<Integer, V> function4) throws Exception {
        KafkaWriteStream<K, V> apply = function.apply(kafkaCluster.useTo().getProducerProperties(str + "the_producer"));
        this.producer = apply;
        testContext.getClass();
        apply.exceptionHandler(testContext::fail);
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        ConcurrentLinkedDeque concurrentLinkedDeque2 = new ConcurrentLinkedDeque();
        for (int i = 0; i < 100000; i++) {
            K apply2 = function3.apply(Integer.valueOf(i));
            V apply3 = function4.apply(Integer.valueOf(i));
            concurrentLinkedDeque.add(apply2);
            concurrentLinkedDeque2.add(apply3);
            apply.write(new ProducerRecord(str + "the_topic", 0, apply2, apply3));
        }
        Async async = testContext.async();
        KafkaReadStream<K, V> apply4 = function2.apply(kafkaCluster.useTo().getConsumerProperties(str + "the_consumer", str + "the_consumer", OffsetResetStrategy.EARLIEST));
        this.consumer = apply4;
        AtomicInteger atomicInteger = new AtomicInteger(100000);
        testContext.getClass();
        apply4.exceptionHandler(testContext::fail);
        apply4.handler(consumerRecord -> {
            testContext.assertEquals(concurrentLinkedDeque.pop(), consumerRecord.key());
            testContext.assertEquals(concurrentLinkedDeque2.pop(), consumerRecord.value());
            if (atomicInteger.decrementAndGet() == 0) {
                async.complete();
            }
        });
        apply4.subscribe(Collections.singleton(str + "the_topic"));
    }

    @Test
    public void testCustomDataOnJsonSerializer() {
        testSerializer(JsonObject.class, new CustomData("test", 0).toJson());
    }
}
