/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep.core.infra;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.kie.api.KieServices;
import org.kie.api.marshalling.KieMarshallers;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.conf.KieSessionOption;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.core.KieSessionContext;
import org.kie.hacep.core.infra.SessionSnapshooter;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.message.SnapshotMessage;
import org.kie.remote.impl.producer.EventProducer;
import org.kie.remote.message.Message;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeafultSessionSnapShooter
implements SessionSnapshooter {
    private final String key = "LAST-SNAPSHOT";
    private final Logger logger = LoggerFactory.getLogger(DeafultSessionSnapShooter.class);
    private KieContainer kieContainer;
    private EnvConfig envConfig;

    public DeafultSessionSnapShooter(EnvConfig envConfig) {
        this.envConfig = envConfig;
        KieServices srv = KieServices.get();
        if (srv != null) {
            this.kieContainer = srv.newKieClasspathContainer();
        } else {
            this.logger.error("KieServices is null");
        }
    }

    @Override
    public void serialize(KieSessionContext kieSessionContext, String lastInsertedEventkey, long lastInsertedEventOffset) {
        KieMarshallers marshallers = KieServices.get().getMarshallers();
        try (ByteArrayOutputStream out = new ByteArrayOutputStream();){
            EventProducer producer = new EventProducer();
            producer.start(Config.getSnapshotProducerConfig());
            marshallers.newMarshaller(kieSessionContext.getKieSession().getKieBase()).marshall((OutputStream)out, kieSessionContext.getKieSession());
            byte[] bytes = out.toByteArray();
            SnapshotMessage message = new SnapshotMessage(UUID.randomUUID().toString(), bytes, kieSessionContext.getFhManager(), lastInsertedEventkey, lastInsertedEventOffset, LocalDateTime.now());
            producer.produceSync(this.envConfig.getSnapshotTopicName(), "LAST-SNAPSHOT", (Message)message);
            producer.stop();
        }
        catch (IOException e) {
            this.logger.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public SnapshotInfos deserialize() {
        KieServices srv = KieServices.get();
        if (srv != null) {
            SnapshotMessage snapshotMsg;
            KafkaConsumer consumer = this.getConfiguredSnapshotConsumer();
            KieMarshallers marshallers = KieServices.get().getMarshallers();
            KieSession kSession = null;
            ConsumerRecords records = consumer.poll(Duration.of(Integer.valueOf(1000).intValue(), ChronoUnit.MILLIS));
            byte[] bytes = null;
            for (ConsumerRecord record : records) {
                bytes = (byte[])record.value();
            }
            consumer.close();
            SnapshotMessage snapshotMessage = snapshotMsg = bytes != null ? (SnapshotMessage)SerializationUtil.deserialize(bytes) : null;
            if (snapshotMsg != null) {
                try (ByteArrayInputStream in = new ByteArrayInputStream(snapshotMsg.getSerializedSession());){
                    KieSessionConfiguration conf = KieServices.get().newKieSessionConfiguration();
                    conf.setOption((KieSessionOption)ClockTypeOption.get((String)"pseudo"));
                    kSession = marshallers.newMarshaller(this.kieContainer.getKieBase()).unmarshall((InputStream)in, conf, null);
                }
                catch (IOException | ClassNotFoundException e) {
                    this.logger.error(e.getMessage(), (Throwable)e);
                }
                return new SnapshotInfos(kSession, snapshotMsg.getFhManager(), snapshotMsg.getLastInsertedEventkey(), snapshotMsg.getLastInsertedEventOffset(), snapshotMsg.getTime());
            }
        }
        return null;
    }

    private KafkaConsumer getConfiguredSnapshotConsumer() {
        KafkaConsumer consumer = new KafkaConsumer(Config.getSnapshotConsumerConfig());
        List partitionsInfo = consumer.partitionsFor(this.envConfig.getSnapshotTopicName());
        List partitions = null;
        ArrayList<TopicPartition> partitionCollection = new ArrayList<TopicPartition>();
        if (partitionsInfo != null) {
            for (PartitionInfo partition : partitionsInfo) {
                if (partitions != null && !partitions.contains(partition.partition())) continue;
                partitionCollection.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            if (!partitionCollection.isEmpty()) {
                consumer.assign(partitionCollection);
            }
        }
        consumer.assignment().forEach(topicPartition -> consumer.seekToBeginning(partitionCollection));
        return consumer;
    }

    @Override
    public LocalDateTime getLastSnapshotTime() {
        SnapshotMessage snapshotMsg;
        KafkaConsumer consumer = this.getConfiguredSnapshotConsumer();
        ConsumerRecords records = consumer.poll(Duration.of(Integer.valueOf(1000).intValue(), ChronoUnit.MILLIS));
        byte[] bytes = null;
        for (ConsumerRecord record : records) {
            bytes = (byte[])record.value();
        }
        consumer.close();
        SnapshotMessage snapshotMessage = snapshotMsg = bytes != null ? (SnapshotMessage)SerializationUtil.deserialize(bytes) : null;
        if (snapshotMsg != null) {
            return snapshotMsg.getTime();
        }
        return null;
    }
}

