package org.kie.hacep;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.message.SnapshotMessage;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.util.SerializationUtil;

/* loaded from: input_file:org/kie/hacep/PodAsLeaderSnapshotTest.class */
public class PodAsLeaderSnapshotTest extends KafkaFullTopicsTests {
    @Test(timeout = 30000)
    public void processMessagesAsLeaderAndCreateSnapshotTest() {
        Bootstrap.startEngine(this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer consumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig("eventsProcessMessagesAsLeaderAndCreateSnapshotTest"));
        KafkaConsumer consumer2 = this.kafkaServerTest.getConsumer(this.envConfig.getSnapshotTopicName(), Config.getSnapshotConsumerConfig());
        KafkaConsumer consumer3 = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig("controlProcessMessagesAsLeaderAndCreateSnapshotTest"));
        this.kafkaServerTest.insertBatchStockTicketEvent(10, this.topicsConfig, RemoteKieSession.class);
        try {
            try {
                Assert.assertEquals(11L, consumer.poll(Duration.ofSeconds(5L)).count());
                ConsumerRecords poll = consumer2.poll(Duration.ofSeconds(5L));
                Assert.assertEquals(1L, poll.count());
                poll.forEach(obj -> {
                    SnapshotMessage snapshotMessage = (SnapshotMessage) SerializationUtil.deserialize((byte[]) ((ConsumerRecord) obj).value());
                    Assert.assertNotNull(snapshotMessage);
                    Assert.assertTrue(snapshotMessage.getLastInsertedEventOffset() > 0);
                    Assert.assertFalse(snapshotMessage.getFhMapKeys().isEmpty());
                    Assert.assertNotNull(snapshotMessage.getLastInsertedEventkey());
                    Assert.assertEquals(9L, snapshotMessage.getFhMapKeys().size());
                    Assert.assertNotNull(snapshotMessage.getLastInsertedEventkey());
                });
                int count = consumer3.poll(Duration.ofMillis(5000L)).count();
                while (count < 11) {
                    count += consumer3.poll(Duration.ofMillis(1000L)).count();
                }
                Assert.assertEquals(11L, count);
                consumer.close();
                consumer2.close();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            consumer.close();
            consumer2.close();
            throw th;
        }
    }
}
