/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep.core.infra.utils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.kie.api.KieServices;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.conf.KieSessionOption;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.core.GlobalStatus;
import org.kie.hacep.core.infra.SessionSnapshooter;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.message.SnapshotMessage;
import org.kie.remote.TopicsConfig;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.command.SnapshotOnDemandCommand;
import org.kie.remote.impl.producer.Sender;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotOnDemandUtils {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotOnDemandUtils.class);

    public static SnapshotInfos askASnapshotOnDemand(EnvConfig config, SessionSnapshooter snapshooter) {
        LocalDateTime infosTime = snapshooter.getLastSnapshotTime();
        LocalDateTime limitAge = LocalDateTime.now().minusSeconds(config.getMaxSnapshotAge());
        if (infosTime != null && limitAge.isBefore(infosTime)) {
            if (logger.isInfoEnabled()) {
                logger.info("Deserialize a recent snapshot");
            }
            return snapshooter.deserialize();
        }
        if (logger.isInfoEnabled()) {
            logger.info("Build NewSnapshotOnDemand ");
        }
        return SnapshotOnDemandUtils.buildNewSnapshotOnDemand(config, limitAge);
    }

    private static SnapshotInfos buildNewSnapshotOnDemand(EnvConfig config, LocalDateTime limitAge) {
        SnapshotMessage snapshotMsg = SnapshotOnDemandUtils.askAndReadSnapshotOnDemand(config, limitAge);
        KieSession kSession = null;
        try (ByteArrayInputStream in = new ByteArrayInputStream(snapshotMsg.getSerializedSession());){
            KieSessionConfiguration conf = KieServices.get().newKieSessionConfiguration();
            conf.setOption((KieSessionOption)ClockTypeOption.get((String)"pseudo"));
            kSession = KieServices.get().getMarshallers().newMarshaller(SnapshotOnDemandUtils.getKieContainer().getKieBase()).unmarshall((InputStream)in, conf, null);
        }
        catch (IOException | ClassNotFoundException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
        return new SnapshotInfos(kSession, snapshotMsg.getFhManager(), snapshotMsg.getLastInsertedEventkey(), snapshotMsg.getLastInsertedEventOffset(), snapshotMsg.getTime());
    }

    private static KieContainer getKieContainer() {
        KieServices srv = KieServices.get();
        if (srv != null) {
            return srv.newKieClasspathContainer();
        }
        throw new RuntimeException("KieServices is null");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static SnapshotMessage askAndReadSnapshotOnDemand(EnvConfig envConfig, LocalDateTime limitAge) {
        Properties props = Config.getProducerConfig("SnapshotOnDemandUtils.askASnapshotOnDemand");
        Sender sender = new Sender(props);
        sender.start();
        sender.sendCommand((RemoteCommand)new SnapshotOnDemandCommand(), TopicsConfig.getDefaultTopicsConfig().getEventsTopicName());
        sender.stop();
        KafkaConsumer consumer = SnapshotOnDemandUtils.getConfiguredSnapshotConsumer(envConfig);
        boolean snapshotReady = false;
        SnapshotMessage msg = null;
        try {
            GlobalStatus.canBecomeLeader = false;
            int counter = 0;
            while (!snapshotReady) {
                SnapshotMessage snapshotMsg;
                ConsumerRecords records = consumer.poll(Duration.of(Integer.valueOf(1000).intValue(), ChronoUnit.MILLIS));
                byte[] bytes = null;
                for (ConsumerRecord record : records) {
                    bytes = (byte[])record.value();
                }
                SnapshotMessage snapshotMessage = snapshotMsg = bytes != null ? (SnapshotMessage)SerializationUtil.deserialize(bytes) : null;
                if (snapshotMsg != null && limitAge.isBefore(snapshotMsg.getTime())) {
                    snapshotReady = true;
                    msg = snapshotMsg;
                    continue;
                }
                if (++counter <= envConfig.getMaxSnapshotRequestAttempts()) continue;
                GlobalStatus.nodeLive = false;
                String errorMessage = "Impossible to retrieve a snapshot and start after " + counter + " attempts";
                logger.error(errorMessage);
                throw new IllegalStateException(errorMessage);
            }
        }
        finally {
            consumer.close();
            GlobalStatus.canBecomeLeader = true;
        }
        return msg;
    }

    private static KafkaConsumer getConfiguredSnapshotConsumer(EnvConfig envConfig) {
        KafkaConsumer consumer = new KafkaConsumer(Config.getSnapshotConsumerConfig());
        List partitionsInfo = consumer.partitionsFor(envConfig.getSnapshotTopicName());
        List partitions = null;
        ArrayList<TopicPartition> partitionCollection = new ArrayList<TopicPartition>();
        if (partitionsInfo != null) {
            for (PartitionInfo partition : partitionsInfo) {
                if (partitions != null && !partitions.contains(partition.partition())) continue;
                partitionCollection.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            if (!partitionCollection.isEmpty()) {
                consumer.assign(partitionCollection);
            }
        }
        consumer.assignment().forEach(topicPartition -> consumer.seekToBeginning(partitionCollection));
        return consumer;
    }
}

