package org.kie.hacep.core.infra;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.kie.api.KieServices;
import org.kie.api.marshalling.KieMarshallers;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.core.KieSessionContext;
import org.kie.hacep.message.SnapshotMessage;
import org.kie.remote.impl.producer.EventProducer;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/openshift-kie-hacep-7.26.0-SNAPSHOT.jar:org/kie/hacep/core/infra/DeafultSessionSnapShooter.class */
public class DeafultSessionSnapShooter implements SessionSnapshooter {
    private final String key = "LAST-SNAPSHOT";
    private final Logger logger = LoggerFactory.getLogger((Class<?>) DeafultSessionSnapShooter.class);
    private KieContainer kieContainer;
    private EnvConfig envConfig;

    public DeafultSessionSnapShooter(EnvConfig envConfig) {
        this.envConfig = envConfig;
        KieServices kieServices = KieServices.get();
        if (kieServices != null) {
            this.kieContainer = kieServices.newKieClasspathContainer();
        } else {
            this.logger.error("KieServices is null");
        }
    }

    @Override // org.kie.hacep.core.infra.SessionSnapshooter
    public void serialize(KieSessionContext kieSessionContext, String str, long j) {
        KieMarshallers marshallers = KieServices.get().getMarshallers();
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    EventProducer eventProducer = new EventProducer();
                    eventProducer.start(Config.getSnapshotProducerConfig());
                    marshallers.newMarshaller(kieSessionContext.getKieSession().getKieBase()).marshall(byteArrayOutputStream, kieSessionContext.getKieSession());
                    eventProducer.produceSync(this.envConfig.getSnapshotTopicName(), "LAST-SNAPSHOT", new SnapshotMessage(UUID.randomUUID().toString(), byteArrayOutputStream.toByteArray(), kieSessionContext.getFhManager(), str, j, LocalDateTime.now()));
                    eventProducer.stop();
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            this.logger.error(e.getMessage(), (Throwable) e);
        }
    }

    @Override // org.kie.hacep.core.infra.SessionSnapshooter
    public SnapshotInfos deserialize() {
        if (KieServices.get() == null) {
            return null;
        }
        KafkaConsumer configuredSnapshotConsumer = getConfiguredSnapshotConsumer();
        KieMarshallers marshallers = KieServices.get().getMarshallers();
        KieSession kieSession = null;
        Integer num = 1000;
        byte[] bArr = null;
        Iterator it = configuredSnapshotConsumer.poll(Duration.of(num.intValue(), ChronoUnit.MILLIS)).iterator();
        while (it.hasNext()) {
            bArr = (byte[]) ((ConsumerRecord) it.next()).value();
        }
        configuredSnapshotConsumer.close();
        SnapshotMessage snapshotMessage = bArr != null ? (SnapshotMessage) SerializationUtil.deserialize(bArr) : null;
        if (snapshotMessage == null) {
            return null;
        }
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(snapshotMessage.getSerializedSession());
            Throwable th = null;
            try {
                try {
                    KieSessionConfiguration newKieSessionConfiguration = KieServices.get().newKieSessionConfiguration();
                    newKieSessionConfiguration.setOption(ClockTypeOption.get("pseudo"));
                    kieSession = marshallers.newMarshaller(this.kieContainer.getKieBase()).unmarshall(byteArrayInputStream, newKieSessionConfiguration, null);
                    if (byteArrayInputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayInputStream.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException | ClassNotFoundException e) {
            this.logger.error(e.getMessage(), (Throwable) e);
        }
        return new SnapshotInfos(kieSession, snapshotMessage.getFhManager(), snapshotMessage.getLastInsertedEventkey(), snapshotMessage.getLastInsertedEventOffset(), snapshotMessage.getTime());
    }

    private KafkaConsumer getConfiguredSnapshotConsumer() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(Config.getSnapshotConsumerConfig());
        List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(this.envConfig.getSnapshotTopicName());
        List list = null;
        ArrayList arrayList = new ArrayList();
        if (partitionsFor != null) {
            for (PartitionInfo partitionInfo : partitionsFor) {
                if (0 == 0 || list.contains(Integer.valueOf(partitionInfo.partition()))) {
                    arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }
            }
            if (!arrayList.isEmpty()) {
                kafkaConsumer.assign(arrayList);
            }
        }
        kafkaConsumer.assignment().forEach(topicPartition -> {
            kafkaConsumer.seekToBeginning(arrayList);
        });
        return kafkaConsumer;
    }

    @Override // org.kie.hacep.core.infra.SessionSnapshooter
    public LocalDateTime getLastSnapshotTime() {
        KafkaConsumer configuredSnapshotConsumer = getConfiguredSnapshotConsumer();
        Integer num = 1000;
        byte[] bArr = null;
        Iterator it = configuredSnapshotConsumer.poll(Duration.of(num.intValue(), ChronoUnit.MILLIS)).iterator();
        while (it.hasNext()) {
            bArr = (byte[]) ((ConsumerRecord) it.next()).value();
        }
        configuredSnapshotConsumer.close();
        SnapshotMessage snapshotMessage = bArr != null ? (SnapshotMessage) SerializationUtil.deserialize(bArr) : null;
        if (snapshotMessage != null) {
            return snapshotMessage.getTime();
        }
        return null;
    }
}
