package org.kie.hacep;

import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.message.ControlMessage;
import org.kie.hacep.sample.kjar.StockTickEvent;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.TopicsConfig;
import org.kie.remote.command.FireUntilHaltCommand;
import org.kie.remote.command.InsertCommand;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/hacep/PodAsReplicaTest.class */
public class PodAsReplicaTest {
    private KafkaUtilTest kafkaServerTest;
    private EnvConfig config;
    private TopicsConfig topicsConfig;
    private final String TEST_KAFKA_LOGGER_TOPIC = "logs";
    private final String TEST_TOPIC = "test";
    private Logger logger = LoggerFactory.getLogger(PodAsReplicaTest.class);
    private Logger kafkaLogger = LoggerFactory.getLogger("org.hacep");

    @Before
    public void setUp() throws Exception {
        this.config = KafkaUtilTest.getEnvConfig();
        this.topicsConfig = TopicsConfig.getDefaultTopicsConfig();
        this.kafkaServerTest = new KafkaUtilTest();
        this.kafkaServerTest.startServer();
        this.kafkaServerTest.createTopics("logs", "test", this.config.getEventsTopicName(), this.config.getControlTopicName(), this.config.getSnapshotTopicName(), this.config.getKieSessionInfosTopicName());
    }

    @After
    public void tearDown() {
        try {
            Bootstrap.stopEngine();
        } catch (ConcurrentModificationException e) {
        }
        this.kafkaServerTest.shutdownServer();
    }

    @Test(timeout = 60000)
    public void processOneSentMessageAsLeaderAndThenReplicaTest() {
        Bootstrap.startEngine(this.config);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer consumer = this.kafkaServerTest.getConsumer("", this.config.getEventsTopicName(), Config.getConsumerConfig("eventsConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer consumer2 = this.kafkaServerTest.getConsumer("", this.config.getControlTopicName(), Config.getConsumerConfig("controlConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer stringConsumer = this.kafkaServerTest.getStringConsumer("logs");
        this.kafkaServerTest.insertBatchStockTicketEvent(1, this.topicsConfig, RemoteKieSession.class);
        try {
            try {
                ConsumerRecords poll = consumer.poll(5000L);
                Assert.assertEquals(2L, poll.count());
                Iterator it = poll.iterator();
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                Assert.assertEquals(consumerRecord.topic(), this.config.getEventsTopicName());
                RemoteCommand remoteCommand = (RemoteCommand) SerializationUtil.deserialize((byte[]) consumerRecord.value());
                Assert.assertEquals(consumerRecord.offset(), 0L);
                Assert.assertNotNull(remoteCommand.getId());
                Assert.assertTrue(remoteCommand instanceof FireUntilHaltCommand);
                ConsumerRecord<String, byte[]> consumerRecord2 = (ConsumerRecord) it.next();
                Assert.assertEquals(consumerRecord2.topic(), this.config.getEventsTopicName());
                InsertCommand insertCommand = (RemoteCommand) SerializationUtil.deserialize((byte[]) consumerRecord2.value());
                Assert.assertEquals(consumerRecord2.offset(), 1L);
                Assert.assertNotNull(insertCommand.getId());
                InsertCommand insertCommand2 = insertCommand;
                Assert.assertEquals(insertCommand2.getEntryPoint(), "DEFAULT");
                Assert.assertNotNull(insertCommand2.getId());
                Assert.assertNotNull(insertCommand2.getFactHandle());
                Assert.assertEquals(((StockTickEvent) insertCommand2.getFactHandle().getObject()).getCompany(), "RHT");
                ConsumerRecords waitForControlMessage = waitForControlMessage(consumer2);
                Iterator it2 = waitForControlMessage.iterator();
                checkFireSideEffects((ConsumerRecord) it2.next());
                if (waitForControlMessage.count() == 2) {
                    checkInsertSideEffects(consumerRecord2, (ConsumerRecord) it2.next());
                } else {
                    checkInsertSideEffects(consumerRecord2, (ConsumerRecord) waitForControlMessage(consumer2).iterator().next());
                }
                Assert.assertEquals(0L, consumer.poll(5000L).count());
                Assert.assertEquals(0L, consumer2.poll(5000L).count());
                Bootstrap.getConsumerController().getCallback().updateStatus(State.REPLICA);
                Iterator it3 = stringConsumer.poll(20000L).iterator();
                ArrayList<String> arrayList = new ArrayList();
                while (it3.hasNext()) {
                    arrayList.add(((ConsumerRecord) it3.next()).value());
                }
                String str = null;
                String str2 = null;
                for (String str3 : arrayList) {
                    if (str3.startsWith("sideEffectOn")) {
                        if (str3.endsWith(":null")) {
                            Assert.fail("SideEffects null");
                        }
                        if (str3.startsWith("sideEffectOnReplica:")) {
                            str2 = str3.substring(str3.indexOf("["));
                        }
                        if (str3.startsWith("sideEffectOnLeader:")) {
                            str = str3.substring(str3.indexOf("["));
                        }
                    }
                }
                Assert.assertNotNull(str);
                Assert.assertNotNull(str2);
                Assert.assertEquals(str, str2);
                consumer.close();
                consumer2.close();
                stringConsumer.close();
            } catch (Exception e) {
                this.logger.error(e.getMessage(), e);
                consumer.close();
                consumer2.close();
                stringConsumer.close();
            }
        } catch (Throwable th) {
            consumer.close();
            consumer2.close();
            stringConsumer.close();
            throw th;
        }
    }

    private ConsumerRecords waitForControlMessage(KafkaConsumer kafkaConsumer) throws InterruptedException {
        ConsumerRecords poll = kafkaConsumer.poll(5000L);
        while (true) {
            ConsumerRecords consumerRecords = poll;
            if (consumerRecords.count() != 0) {
                return consumerRecords;
            }
            Thread.sleep(10L);
            poll = kafkaConsumer.poll(5000L);
        }
    }

    private void checkFireSideEffects(ConsumerRecord<String, byte[]> consumerRecord) {
        Assert.assertEquals(consumerRecord.topic(), this.config.getControlTopicName());
        ControlMessage controlMessage = (ControlMessage) SerializationUtil.deserialize((byte[]) consumerRecord.value());
        Assert.assertEquals(consumerRecord.offset(), 0L);
        Assert.assertTrue(controlMessage.getSideEffects().isEmpty());
    }

    private void checkInsertSideEffects(ConsumerRecord<String, byte[]> consumerRecord, ConsumerRecord<String, byte[]> consumerRecord2) {
        Assert.assertEquals(consumerRecord2.topic(), this.config.getControlTopicName());
        ControlMessage controlMessage = (ControlMessage) SerializationUtil.deserialize((byte[]) consumerRecord2.value());
        Assert.assertEquals(consumerRecord2.offset(), 1L);
        Assert.assertTrue(!controlMessage.getSideEffects().isEmpty());
        Assert.assertTrue(controlMessage.getSideEffects().size() == 1);
        controlMessage.getSideEffects().iterator().next().toString();
        Assert.assertEquals(consumerRecord2.key(), consumerRecord.key());
    }
}
