package org.kie.hacep.consumer;

import java.util.Queue;
import org.kie.api.KieServices;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.core.GlobalStatus;
import org.kie.hacep.core.KieSessionContext;
import org.kie.hacep.core.infra.DeafultSessionSnapShooter;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.core.infra.consumer.ConsumerHandler;
import org.kie.hacep.core.infra.consumer.ItemToProcess;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.core.infra.utils.SnapshotOnDemandUtils;
import org.kie.hacep.message.ControlMessage;
import org.kie.hacep.util.PrinterUtil;
import org.kie.remote.DroolsExecutor;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.command.VisitableCommand;
import org.kie.remote.impl.producer.Producer;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/hacep/consumer/DroolsConsumerHandler.class */
public class DroolsConsumerHandler implements ConsumerHandler {
    private static final Logger logger = LoggerFactory.getLogger(DroolsConsumerHandler.class);
    private Logger loggerForTest;
    private Producer producer;
    private DeafultSessionSnapShooter snapshooter;
    private EnvConfig config;
    private KieSessionContext kieSessionContext;
    private CommandHandler commandHandler;
    private SnapshotInfos infos;
    private boolean shutdown;

    public DroolsConsumerHandler(Producer producer, EnvConfig envConfig) {
        this.config = envConfig;
        this.snapshooter = new DeafultSessionSnapShooter(this.config);
        initializeKieSessionFromSnapshot(this.config);
        this.producer = producer;
        this.commandHandler = new CommandHandler(this.kieSessionContext, this.config, producer, this.snapshooter);
        if (this.config.isUnderTest()) {
            this.loggerForTest = PrinterUtil.getKafkaLoggerForTest(envConfig);
        }
    }

    private void initializeKieSessionFromSnapshot(EnvConfig envConfig) {
        if (envConfig.isSkipOnDemanSnapshot()) {
            this.infos = this.snapshooter.deserialize();
            this.kieSessionContext = createSessionHolder(this.infos);
        } else {
            this.kieSessionContext = new KieSessionContext();
            createClasspathSession(this.kieSessionContext);
        }
    }

    public boolean initializeKieSessionFromSnapshotOnDemand(EnvConfig envConfig) {
        if (envConfig.isSkipOnDemanSnapshot()) {
            return false;
        }
        this.infos = SnapshotOnDemandUtils.askASnapshotOnDemand(envConfig, this.snapshooter);
        this.kieSessionContext = createSessionHolder(this.infos);
        return true;
    }

    public DeafultSessionSnapShooter getSnapshooter() {
        return this.snapshooter;
    }

    @Override // org.kie.hacep.core.infra.consumer.ConsumerHandler
    public void process(ItemToProcess itemToProcess, State state) {
        process((RemoteCommand) SerializationUtil.deserialize((byte[]) itemToProcess.getObject()), state);
    }

    @Override // org.kie.hacep.core.infra.consumer.ConsumerHandler
    public void process(RemoteCommand remoteCommand, State state) {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("Remote command on process:{}", remoteCommand);
        }
        if (!state.equals(State.LEADER)) {
            processCommand(remoteCommand, state);
            return;
        }
        processCommand(remoteCommand, state);
        Queue andReset = DroolsExecutor.getInstance().getAndReset();
        this.producer.produceSync(this.config.getControlTopicName(), remoteCommand.getId(), new ControlMessage(remoteCommand.getId(), andReset));
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("sideEffectOnLeader:{}", andReset);
        }
    }

    public void processSideEffectsOnReplica(Queue<Object> queue) {
        DroolsExecutor.getInstance().appendSideEffects(queue);
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("sideEffectOnReplica:{}", queue);
        }
    }

    @Override // org.kie.hacep.core.infra.consumer.ConsumerHandler
    public void processWithSnapshot(ItemToProcess itemToProcess, State state) {
        if (logger.isInfoEnabled()) {
            logger.info("SNAPSHOT");
        }
        process(itemToProcess, state);
        if (this.shutdown) {
            return;
        }
        this.snapshooter.serialize(this.kieSessionContext, itemToProcess.getKey(), itemToProcess.getOffset());
    }

    @Override // org.kie.hacep.core.infra.consumer.ConsumerHandler
    public void stop() {
        this.shutdown = true;
        if (this.kieSessionContext != null) {
            this.kieSessionContext.getKieSession().dispose();
        }
    }

    private void processCommand(RemoteCommand remoteCommand, State state) {
        if (state.equals(State.LEADER) || remoteCommand.isPermittedForReplicas()) {
            try {
                ((VisitableCommand) remoteCommand).accept(this.commandHandler);
            } catch (Throwable th) {
                GlobalStatus.nodeLive = false;
                throw th;
            }
        }
    }

    private KieSessionContext createSessionHolder(SnapshotInfos snapshotInfos) {
        KieSessionContext kieSessionContext = new KieSessionContext();
        if (snapshotInfos != null) {
            if (logger.isInfoEnabled()) {
                logger.info("start consumer with:{}", snapshotInfos);
            }
            initSessionHolder(snapshotInfos, kieSessionContext);
        } else {
            createClasspathSession(kieSessionContext);
        }
        return kieSessionContext;
    }

    private void createClasspathSession(KieSessionContext kieSessionContext) {
        if (KieServices.get() == null) {
            logger.error("KieService is null");
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Creating new Kie Session");
        }
        kieSessionContext.init(KieServices.get().newKieClasspathContainer().newKieSession());
    }

    private void initSessionHolder(SnapshotInfos snapshotInfos, KieSessionContext kieSessionContext) {
        if (snapshotInfos.getKieSession() == null) {
            kieSessionContext.init(KieServices.get().newKieClasspathContainer().newKieSession());
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Applying snapshot");
        }
        kieSessionContext.initFromSnapshot(snapshotInfos);
    }
}
