package org.kie.hacep;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.InfraFactory;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.sample.kjar.StockTickEvent;
import org.kie.remote.CommonConfig;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.TopicsConfig;
import org.kie.remote.command.FireUntilHaltCommand;
import org.kie.remote.command.InsertCommand;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.impl.consumer.Listener;
import org.kie.remote.message.ControlMessage;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/hacep/PodAsReplicaTest.class */
public class PodAsReplicaTest extends KafkaFullTopicsTests {
    private Logger logger = LoggerFactory.getLogger(PodAsReplicaTest.class);

    @Test(timeout = 30000)
    public void processOneSentMessageAsLeaderAndThenReplicaTest() {
        Bootstrap.startEngine(this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer consumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig("eventsConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer consumer2 = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig("controlConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer stringConsumer = this.kafkaServerTest.getStringConsumer("testlogs");
        this.kafkaServerTest.insertBatchStockTicketEvent(1, this.topicsConfig, RemoteKieSession.class, new Listener(CommonConfig.getTestProperties(), InfraFactory.getListenerThread(TopicsConfig.getDefaultTopicsConfig(), this.envConfig.isLocal(), CommonConfig.getTestProperties())));
        try {
            try {
                this.logger.warn("Checks on Events topic");
                ConsumerRecords poll = consumer.poll(Duration.ofSeconds(2L));
                AtomicReference atomicReference = new AtomicReference();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                while (atomicInteger.get() < 2) {
                    poll.forEach(obj -> {
                        ConsumerRecord consumerRecord = (ConsumerRecord) obj;
                        Assert.assertNotNull(consumerRecord);
                        Assert.assertEquals(consumerRecord.topic(), this.envConfig.getEventsTopicName());
                        InsertCommand insertCommand = (RemoteCommand) SerializationUtil.deserialize((byte[]) consumerRecord.value());
                        Assert.assertEquals(consumerRecord.offset(), atomicInteger.get());
                        Assert.assertNotNull(insertCommand.getId());
                        if (atomicInteger.get() == 0) {
                            Assert.assertTrue(insertCommand instanceof FireUntilHaltCommand);
                        }
                        if (atomicInteger.get() == 1) {
                            Assert.assertTrue(insertCommand instanceof InsertCommand);
                            InsertCommand insertCommand2 = insertCommand;
                            Assert.assertEquals("DEFAULT", insertCommand2.getEntryPoint());
                            Assert.assertNotNull(insertCommand2.getId());
                            Assert.assertNotNull(insertCommand2.getFactHandle());
                            Assert.assertEquals("RHT", ((StockTickEvent) insertCommand2.getFactHandle().getObject()).getCompany());
                        }
                        atomicInteger.incrementAndGet();
                        if (atomicInteger.get() > 2) {
                            throw new RuntimeException("Found " + atomicInteger.get() + " messages, more than the 2 expected.");
                        }
                        atomicReference.set(consumerRecord);
                    });
                    this.logger.warn("Attempt number:{}", Integer.valueOf(atomicInteger2.incrementAndGet()));
                    if (atomicInteger2.get() == 10) {
                        throw new RuntimeException("No Events message available after " + atomicInteger2 + "attempts.");
                    }
                }
                this.logger.warn("Checks on Control topic");
                atomicInteger.set(0);
                atomicInteger2.set(0);
                while (atomicInteger.get() < 2) {
                    consumer2.poll(Duration.ofSeconds(1L)).forEach(obj2 -> {
                        ConsumerRecord<String, byte[]> consumerRecord = (ConsumerRecord) obj2;
                        this.logger.warn("Control message found:{}", consumerRecord);
                        Assert.assertEquals(consumerRecord.topic(), this.envConfig.getControlTopicName());
                        ControlMessage controlMessage = (ControlMessage) SerializationUtil.deserialize((byte[]) consumerRecord.value());
                        Assert.assertEquals(consumerRecord.offset(), atomicInteger.get());
                        if (atomicInteger.get() == 0) {
                            Assert.assertTrue(controlMessage.getSideEffects().isEmpty());
                        }
                        if (atomicInteger.get() == 1) {
                            Assert.assertFalse(controlMessage.getSideEffects().isEmpty());
                            checkInsertSideEffects((ConsumerRecord) atomicReference.get(), consumerRecord);
                        }
                        atomicInteger.incrementAndGet();
                    });
                    this.logger.warn("Attempt number:{}", Integer.valueOf(atomicInteger2.incrementAndGet()));
                    if (atomicInteger2.get() == 10) {
                        throw new RuntimeException("No Events message available after " + atomicInteger2 + "attempts.");
                    }
                }
                Assert.assertEquals(0L, consumer.poll(Duration.ofSeconds(2L)).count());
                Assert.assertEquals(0L, consumer2.poll(Duration.ofSeconds(2L)).count());
                this.logger.warn("Switch as a replica");
                Bootstrap.getConsumerController().getCallback().updateStatus(State.REPLICA);
                ConsumerRecords poll2 = stringConsumer.poll(Duration.ofSeconds(5L));
                ArrayList<String> arrayList = new ArrayList();
                poll2.forEach(consumerRecord -> {
                    Assert.assertNotNull(consumerRecord);
                    arrayList.add(consumerRecord.value());
                    if (this.envConfig.isUnderTest()) {
                        this.logger.warn("msg:{}", consumerRecord.value());
                    }
                });
                String str = null;
                String str2 = null;
                for (String str3 : arrayList) {
                    if (str3.startsWith("sideEffectOn")) {
                        if (str3.endsWith(":null")) {
                            Assert.fail("SideEffects null");
                        }
                        if (str3.startsWith("sideEffectOnReplica:")) {
                            str2 = str3.substring(str3.indexOf("["));
                        }
                        if (str3.startsWith("sideEffectOnLeader:")) {
                            str = str3.substring(str3.indexOf("["));
                        }
                    }
                }
                Assert.assertNotNull(str);
                Assert.assertNotNull(str2);
                Assert.assertEquals(str, str2);
                consumer.close();
                consumer2.close();
                stringConsumer.close();
            } catch (Exception e) {
                this.logger.error(e.getMessage(), e);
                consumer.close();
                consumer2.close();
                stringConsumer.close();
            }
        } catch (Throwable th) {
            consumer.close();
            consumer2.close();
            stringConsumer.close();
            throw th;
        }
    }

    private void checkInsertSideEffects(ConsumerRecord<String, byte[]> consumerRecord, ConsumerRecord<String, byte[]> consumerRecord2) {
        Assert.assertEquals(consumerRecord2.topic(), this.envConfig.getControlTopicName());
        ControlMessage controlMessage = (ControlMessage) SerializationUtil.deserialize((byte[]) consumerRecord2.value());
        Assert.assertEquals(1L, consumerRecord2.offset());
        Assert.assertTrue(!controlMessage.getSideEffects().isEmpty());
        Assert.assertTrue(controlMessage.getSideEffects().size() == 1);
        Assert.assertEquals(consumerRecord2.key(), consumerRecord.key());
    }
}
