/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.KafkaFullTopicsTests;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.sample.kjar.StockTickEvent;
import org.kie.remote.RemoteFactHandle;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.command.FireUntilHaltCommand;
import org.kie.remote.command.InsertCommand;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.message.ControlMessage;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PodAsReplicaTest
extends KafkaFullTopicsTests {
    private Logger logger = LoggerFactory.getLogger(PodAsReplicaTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void processOneSentMessageAsLeaderAndThenReplicaTest() {
        Bootstrap.startEngine((EnvConfig)this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer eventsConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig((String)"eventsConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer controlConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig((String)"controlConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer kafkaLogConsumer = this.kafkaServerTest.getStringConsumer("testlogs");
        this.kafkaServerTest.insertBatchStockTicketEvent(1, this.topicsConfig, RemoteKieSession.class);
        try {
            ConsumerRecords controlRecords;
            this.logger.warn("Checks on Events topic");
            ConsumerRecords eventsRecords = eventsConsumer.poll(Duration.ofSeconds(2L));
            AtomicReference lastEvent = new AtomicReference();
            AtomicInteger index = new AtomicInteger(0);
            AtomicInteger attempts = new AtomicInteger(0);
            while (index.get() < 2) {
                eventsRecords.forEach(o -> {
                    ConsumerRecord eventsRecord = (ConsumerRecord)o;
                    Assert.assertNotNull((Object)eventsRecord);
                    Assert.assertEquals((Object)eventsRecord.topic(), (Object)this.envConfig.getEventsTopicName());
                    RemoteCommand remoteCommand = (RemoteCommand)SerializationUtil.deserialize((byte[])((byte[])eventsRecord.value()));
                    Assert.assertEquals((long)eventsRecord.offset(), (long)index.get());
                    Assert.assertNotNull((Object)remoteCommand.getId());
                    if (index.get() == 0) {
                        Assert.assertTrue((boolean)(remoteCommand instanceof FireUntilHaltCommand));
                    }
                    if (index.get() == 1) {
                        Assert.assertTrue((boolean)(remoteCommand instanceof InsertCommand));
                        InsertCommand insertCommand = (InsertCommand)remoteCommand;
                        Assert.assertEquals((Object)insertCommand.getEntryPoint(), (Object)"DEFAULT");
                        Assert.assertNotNull((Object)insertCommand.getId());
                        Assert.assertNotNull((Object)insertCommand.getFactHandle());
                        RemoteFactHandle remoteFactHandle = insertCommand.getFactHandle();
                        StockTickEvent eventsTicket = (StockTickEvent)remoteFactHandle.getObject();
                        Assert.assertEquals((Object)eventsTicket.getCompany(), (Object)"RHT");
                    }
                    index.incrementAndGet();
                    if (index.get() > 2) {
                        throw new RuntimeException("Found " + index.get() + " messages, more than the 2 expected.");
                    }
                    lastEvent.set(eventsRecord);
                });
                this.logger.warn("Attempt number:{}", (Object)attempts.incrementAndGet());
                if (attempts.get() != 10) continue;
                throw new RuntimeException("No Events message available after " + attempts + "attempts.");
            }
            this.logger.warn("Checks on Control topic");
            index.set(0);
            attempts.set(0);
            while (index.get() < 2) {
                controlRecords = controlConsumer.poll(Duration.ofSeconds(1L));
                controlRecords.forEach(o -> {
                    ConsumerRecord control = (ConsumerRecord)o;
                    this.logger.warn("Control message found:{}", (Object)control);
                    Assert.assertEquals((Object)control.topic(), (Object)this.envConfig.getControlTopicName());
                    ControlMessage controlMessage = (ControlMessage)SerializationUtil.deserialize((byte[])((byte[])control.value()));
                    Assert.assertEquals((long)control.offset(), (long)index.get());
                    if (index.get() == 0) {
                        Assert.assertTrue((boolean)controlMessage.getSideEffects().isEmpty());
                    }
                    if (index.get() == 1) {
                        Assert.assertFalse((boolean)controlMessage.getSideEffects().isEmpty());
                        this.checkInsertSideEffects((ConsumerRecord<String, byte[]>)((ConsumerRecord)lastEvent.get()), (ConsumerRecord<String, byte[]>)control);
                    }
                    index.incrementAndGet();
                });
                this.logger.warn("Attempt number:{}", (Object)attempts.incrementAndGet());
                if (attempts.get() != 10) continue;
                throw new RuntimeException("No Events message available after " + attempts + "attempts.");
            }
            eventsRecords = eventsConsumer.poll(Duration.ofSeconds(2L));
            Assert.assertEquals((long)0L, (long)eventsRecords.count());
            controlRecords = controlConsumer.poll(Duration.ofSeconds(2L));
            Assert.assertEquals((long)0L, (long)controlRecords.count());
            this.logger.warn("Switch as a replica");
            Bootstrap.getConsumerController().getCallback().updateStatus(State.REPLICA);
            ConsumerRecords recordsLog = kafkaLogConsumer.poll(Duration.ofSeconds(5L));
            ArrayList kafkaLoggerMsgs = new ArrayList();
            recordsLog.forEach(stringConsumerRecord -> {
                Assert.assertNotNull((Object)stringConsumerRecord);
                kafkaLoggerMsgs.add(stringConsumerRecord.value());
                if (this.envConfig.isUnderTest()) {
                    this.logger.warn("msg:{}", stringConsumerRecord.value());
                }
            });
            String sideEffectOnLeader = null;
            String sideEffectOnReplica = null;
            for (String item : kafkaLoggerMsgs) {
                if (!item.startsWith("sideEffectOn")) continue;
                if (item.endsWith(":null")) {
                    Assert.fail((String)"SideEffects null");
                }
                if (item.startsWith("sideEffectOnReplica:")) {
                    sideEffectOnReplica = item.substring(item.indexOf("["));
                }
                if (!item.startsWith("sideEffectOnLeader:")) continue;
                sideEffectOnLeader = item.substring(item.indexOf("["));
            }
            Assert.assertNotNull(sideEffectOnLeader);
            Assert.assertNotNull(sideEffectOnReplica);
            Assert.assertEquals(sideEffectOnLeader, sideEffectOnReplica);
        }
        catch (Exception ex) {
            this.logger.error(ex.getMessage(), (Throwable)ex);
        }
        finally {
            eventsConsumer.close();
            controlConsumer.close();
            kafkaLogConsumer.close();
        }
    }

    private void checkInsertSideEffects(ConsumerRecord<String, byte[]> eventsRecord, ConsumerRecord<String, byte[]> controlRecord) {
        Assert.assertEquals((Object)controlRecord.topic(), (Object)this.envConfig.getControlTopicName());
        ControlMessage controlMessage = (ControlMessage)SerializationUtil.deserialize((byte[])((byte[])controlRecord.value()));
        Assert.assertEquals((long)controlRecord.offset(), (long)1L);
        Assert.assertTrue((!controlMessage.getSideEffects().isEmpty() ? 1 : 0) != 0);
        Assert.assertTrue((controlMessage.getSideEffects().size() == 1 ? 1 : 0) != 0);
        Assert.assertEquals((Object)controlRecord.key(), (Object)eventsRecord.key());
    }
}

