package org.kie.hacep;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.message.SnapshotMessage;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/hacep/PodAsLeaderSnapshotTest.class */
public class PodAsLeaderSnapshotTest extends KafkaFullTopicsTests {
    private Logger logger = LoggerFactory.getLogger("org.hacep");

    @Test(timeout = 30000)
    public void processMessagesAsLeaderAndCreateSnapshotTest() {
        Bootstrap.startEngine(this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer consumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig("eventsProcessMessagesAsLeaderAndCreateSnapshotTest"));
        KafkaConsumer consumer2 = this.kafkaServerTest.getConsumer(this.envConfig.getSnapshotTopicName(), Config.getSnapshotConsumerConfig());
        KafkaConsumer consumer3 = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig("controlProcessMessagesAsLeaderAndCreateSnapshotTest"));
        this.kafkaServerTest.insertBatchStockTicketEvent(10, this.topicsConfig, RemoteKieSession.class);
        try {
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                this.logger.warn("Checks on Events Topic");
                int i = 0;
                while (i < 11) {
                    i += consumer.poll(Duration.ofSeconds(3L)).count();
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    this.logger.warn("Attempt number on events topic:{}", Integer.valueOf(incrementAndGet));
                    if (incrementAndGet == 30) {
                        throw new RuntimeException("No enough Events message available " + i + " after " + atomicInteger + "attempts.");
                    }
                }
                Assert.assertEquals(11L, i);
                this.logger.warn("Checks on Control Topic");
                atomicInteger.set(0);
                int i2 = 0;
                while (i2 < 11) {
                    i2 += consumer3.poll(Duration.ofSeconds(3L)).count();
                    int incrementAndGet2 = atomicInteger.incrementAndGet();
                    this.logger.warn("Attempt number on control topic:{}", Integer.valueOf(incrementAndGet2));
                    if (incrementAndGet2 == 30) {
                        throw new RuntimeException("No enough Control message available " + i2 + " after " + atomicInteger + "attempts.");
                    }
                }
                Assert.assertEquals(11L, i2);
                this.logger.warn("Checks on Snapshot Topic");
                atomicInteger.set(0);
                int i3 = 0;
                while (i3 < 1) {
                    ConsumerRecords poll = consumer2.poll(Duration.ofSeconds(3L));
                    i3 += poll.count();
                    poll.forEach(obj -> {
                        SnapshotMessage snapshotMessage = (SnapshotMessage) SerializationUtil.deserialize((byte[]) ((ConsumerRecord) obj).value());
                        Assert.assertNotNull(snapshotMessage);
                        Assert.assertTrue(snapshotMessage.getLastInsertedEventOffset() > 0);
                        Assert.assertFalse(snapshotMessage.getFhMapKeys().isEmpty());
                        Assert.assertNotNull(snapshotMessage.getLastInsertedEventkey());
                        Assert.assertEquals(9L, snapshotMessage.getFhMapKeys().size());
                        Assert.assertNotNull(snapshotMessage.getLastInsertedEventkey());
                    });
                    int incrementAndGet3 = atomicInteger.incrementAndGet();
                    this.logger.warn("Attempt number on snapshot topic:{}", Integer.valueOf(incrementAndGet3));
                    if (incrementAndGet3 == 30) {
                        throw new RuntimeException("No enough Snapshot message available " + i3 + " after " + atomicInteger + "attempts.");
                    }
                }
                Assert.assertEquals(1L, i3);
                consumer.close();
                consumer2.close();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            consumer.close();
            consumer2.close();
            throw th;
        }
    }
}
