package io.vertx.kafka.client.tests;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/kafka/client/tests/KafkaReadStreamMockTest.class */
public class KafkaReadStreamMockTest extends KafkaTestBase {
    private LinkedList<ConsumerRecord<String, String>> recordsMock = new LinkedList<>();
    private int SEND_BATCH = 5;
    private int TOTAL_MESSAGES = 400;
    private final String TOPIC = "topic";
    private Long timer = null;

    private void initRecords() {
        int i = this.TOTAL_MESSAGES;
        for (int i2 = 0; i2 < i; i2++) {
            this.recordsMock.add(new ConsumerRecord<>("topic", 0, i2, "key-" + i2, "value-" + i2));
        }
    }

    private MockConsumer<String, String> createMockConsumer() {
        MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("topic", 0), 0L);
        mockConsumer.updateBeginningOffsets(hashMap);
        return mockConsumer;
    }

    private void sendNextBatch(MockConsumer<String, String> mockConsumer) {
        for (int i = 0; i < this.SEND_BATCH && this.recordsMock.size() > 0; i++) {
            mockConsumer.addRecord(this.recordsMock.pop());
        }
    }

    @Test
    public void shouldNotLoseMessages(TestContext testContext) {
        Vertx vertx = Vertx.vertx();
        Async async = testContext.async();
        initRecords();
        MockConsumer<String, String> createMockConsumer = createMockConsumer();
        KafkaConsumerImpl kafkaConsumerImpl = new KafkaConsumerImpl(KafkaReadStream.create(vertx, createMockConsumer));
        AtomicLong atomicLong = new AtomicLong(-1L);
        kafkaConsumerImpl.handler(kafkaConsumerRecord -> {
            long offset = kafkaConsumerRecord.offset();
            atomicLong.addAndGet(1L);
            testContext.assertEquals(Long.valueOf(atomicLong.get()), Long.valueOf(offset));
            if (offset == this.TOTAL_MESSAGES - 1) {
                kafkaConsumerImpl.close();
                async.complete();
            } else {
                if (this.timer != null) {
                    vertx.cancelTimer(this.timer.longValue());
                }
                this.timer = Long.valueOf(vertx.setTimer(5L, l -> {
                    kafkaConsumerImpl.pause();
                    testContext.assertEquals(0L, Long.valueOf(kafkaConsumerImpl.demand()));
                    vertx.getOrCreateContext().runOnContext(r10 -> {
                        kafkaConsumerImpl.commit();
                        kafkaConsumerImpl.resume();
                        testContext.assertEquals(Long.MAX_VALUE, Long.valueOf(kafkaConsumerImpl.demand()));
                        sendNextBatch(createMockConsumer);
                        vertx.getOrCreateContext().runOnContext(r5 -> {
                            sendNextBatch(createMockConsumer);
                        });
                    });
                }));
            }
        });
        kafkaConsumerImpl.exceptionHandler(th -> {
            testContext.fail(th);
        });
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.add(new io.vertx.kafka.client.common.TopicPartition("topic", 0));
        kafkaConsumerImpl.assign(linkedHashSet, asyncResult -> {
            sendNextBatch(createMockConsumer);
        });
    }
}
