package org.kie.hacep;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.message.ControlMessage;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.command.FireUntilHaltCommand;
import org.kie.remote.command.InsertCommand;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.util.SerializationUtil;

/* loaded from: input_file:org/kie/hacep/PodAsLeaderTest.class */
public class PodAsLeaderTest extends KafkaFullTopicsTests {
    @Test(timeout = 30000)
    public void processOneSentMessageAsLeaderTest() {
        Bootstrap.startEngine(this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer consumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig("eventsConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer consumer2 = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig("controlConsumerProcessOneSentMessageAsLeaderTest"));
        Properties properties = (Properties) Config.getProducerConfig("InsertBactchStockTickets").clone();
        properties.put("skip.listener.autostart", true);
        this.kafkaServerTest.insertBatchStockTicketEvent(1, this.topicsConfig, RemoteKieSession.class, properties);
        try {
            try {
                ConsumerRecords poll = consumer.poll(5000L);
                Assert.assertEquals(2L, poll.count());
                Iterator it = poll.iterator();
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                Assert.assertEquals(consumerRecord.topic(), this.envConfig.getEventsTopicName());
                RemoteCommand remoteCommand = (RemoteCommand) SerializationUtil.deserialize((byte[]) consumerRecord.value());
                Assert.assertEquals(consumerRecord.offset(), 0L);
                Assert.assertNotNull(remoteCommand.getId());
                Assert.assertTrue(remoteCommand instanceof FireUntilHaltCommand);
                ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                Assert.assertEquals(consumerRecord2.topic(), this.envConfig.getEventsTopicName());
                RemoteCommand remoteCommand2 = (RemoteCommand) SerializationUtil.deserialize((byte[]) consumerRecord2.value());
                Assert.assertEquals(consumerRecord2.offset(), 1L);
                Assert.assertNotNull(remoteCommand2.getId());
                Assert.assertTrue(remoteCommand2 instanceof InsertCommand);
                ArrayList arrayList = new ArrayList();
                while (arrayList.size() < 2) {
                    arrayList.add((ControlMessage) SerializationUtil.deserialize((byte[]) ((ConsumerRecord) consumer2.poll(2000L).iterator().next()).value()));
                }
                Assert.assertEquals(2L, arrayList.size());
                Iterator it2 = arrayList.iterator();
                ControlMessage controlMessage = (ControlMessage) it2.next();
                ControlMessage controlMessage2 = (ControlMessage) it2.next();
                Assert.assertEquals(controlMessage.getId(), consumerRecord.key());
                Assert.assertTrue(controlMessage.getSideEffects().isEmpty());
                Assert.assertEquals(controlMessage2.getId(), consumerRecord2.key());
                Assert.assertTrue(!controlMessage2.getSideEffects().isEmpty());
                consumer.close();
                consumer2.close();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            consumer.close();
            consumer2.close();
            throw th;
        }
    }
}
