package org.kie.hacep.core.infra.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.consumer.DroolsConsumerHandler;
import org.kie.hacep.core.infra.DefaultSessionSnapShooter;
import org.kie.hacep.core.infra.OffsetManager;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.core.infra.utils.ConsumerUtils;
import org.kie.hacep.message.ControlMessage;
import org.kie.hacep.util.Printer;
import org.kie.hacep.util.PrinterUtil;
import org.kie.remote.DroolsExecutor;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/openshift-kie-hacep-7.28.0.Final.jar:org/kie/hacep/core/infra/consumer/DefaultKafkaConsumer.class */
public class DefaultKafkaConsumer<T> implements EventConsumer {
    private Consumer<String, T> kafkaConsumer;
    private Consumer<String, T> kafkaSecondaryConsumer;
    private DroolsConsumerHandler consumerHandler;
    private volatile long processingKeyOffset;
    private volatile long lastProcessedControlOffset;
    private volatile long lastProcessedEventOffset;
    private volatile boolean started;
    private int iterationBetweenSnapshot;
    private List<ConsumerRecord<String, T>> eventsBuffer;
    private List<ConsumerRecord<String, T>> controlBuffer;
    private AtomicInteger counter;
    private SnapshotInfos snapshotInfos;
    private DefaultSessionSnapShooter snapShooter;
    private Printer printer;
    private EnvConfig config;
    private Logger loggerForTest;
    private volatile boolean askedSnapshotOnDemand;
    private Logger logger = LoggerFactory.getLogger((Class<?>) DefaultKafkaConsumer.class);
    private Map<TopicPartition, OffsetAndMetadata> offsetsEvents = new HashMap();
    private volatile String processingKey = "";
    private volatile boolean exit = false;
    private volatile State currentState = State.REPLICA;
    private volatile PolledTopic polledTopic = PolledTopic.CONTROL;

    /* loaded from: input_file:WEB-INF/lib/openshift-kie-hacep-7.28.0.Final.jar:org/kie/hacep/core/infra/consumer/DefaultKafkaConsumer$PolledTopic.class */
    public enum PolledTopic {
        EVENTS,
        CONTROL,
        NONE
    }

    public DefaultKafkaConsumer(EnvConfig envConfig) {
        this.config = envConfig;
        if (this.config.isSkipOnDemanSnapshot()) {
            this.counter = new AtomicInteger(0);
        }
        this.iterationBetweenSnapshot = this.config.getIterationBetweenSnapshot();
        this.printer = PrinterUtil.getPrinter(this.config);
        if (this.config.isUnderTest()) {
            this.loggerForTest = PrinterUtil.getKafkaLoggerForTest(this.config);
        }
    }

    @Override // org.kie.hacep.core.infra.consumer.EventConsumer
    public void initConsumer(ConsumerHandler consumerHandler) {
        this.consumerHandler = (DroolsConsumerHandler) consumerHandler;
        this.snapShooter = this.consumerHandler.getSnapshooter();
        this.kafkaConsumer = new KafkaConsumer(Config.getConsumerConfig("PrimaryConsumer"));
        if (this.currentState.equals(State.REPLICA)) {
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
        }
    }

    protected void restartConsumer() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Restart Consumers");
        }
        this.snapshotInfos = this.snapShooter.deserialize();
        this.kafkaConsumer = new KafkaConsumer(Config.getConsumerConfig("PrimaryConsumer"));
        assign();
        if (this.currentState.equals(State.REPLICA)) {
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
        } else {
            this.kafkaSecondaryConsumer = null;
        }
    }

    @Override // org.kie.hacep.core.infra.consumer.EventConsumer
    public void stop() {
        stopConsume();
        this.kafkaConsumer.wakeup();
        if (this.kafkaSecondaryConsumer != null) {
            this.kafkaSecondaryConsumer.wakeup();
        }
        this.exit = true;
        this.consumerHandler.stop();
    }

    @Override // org.kie.hacep.core.infra.election.LeadershipCallback
    public void updateStatus(State state) {
        boolean z = !state.equals(this.currentState);
        if (this.currentState == null || z) {
            this.currentState = state;
        }
        if (this.started && z && !this.currentState.equals(State.BECOMING_LEADER)) {
            updateOnRunningConsumer(state);
            return;
        }
        if (this.started) {
            return;
        }
        if (state.equals(State.REPLICA) && !this.config.isSkipOnDemanSnapshot() && !this.askedSnapshotOnDemand) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("askAndProcessSnapshotOnDemand:");
            }
            askAndProcessSnapshotOnDemand();
        }
        if (state.equals(State.LEADER) || state.equals(State.REPLICA)) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("enableConsumeAndStartLoop:{}", state);
            }
            enableConsumeAndStartLoop(state);
        }
    }

    protected void askAndProcessSnapshotOnDemand() {
        this.askedSnapshotOnDemand = true;
        boolean initializeKieSessionFromSnapshotOnDemand = this.consumerHandler.initializeKieSessionFromSnapshotOnDemand(this.config);
        if (this.logger.isInfoEnabled()) {
            this.logger.info("askAndProcessSnapshotOnDemand:{}", Boolean.valueOf(initializeKieSessionFromSnapshotOnDemand));
        }
        if (!initializeKieSessionFromSnapshotOnDemand) {
            throw new RuntimeException("Can't obtain a snapshot on demand");
        }
    }

    protected void assign() {
        if (this.currentState.equals(State.LEADER)) {
            assignAsALeader();
        } else {
            assignReplica();
        }
    }

    protected void assignAsALeader() {
        assignConsumer(this.kafkaConsumer, this.config.getEventsTopicName());
    }

    protected void assignReplica() {
        assignConsumer(this.kafkaConsumer, this.config.getEventsTopicName());
        assignConsumer(this.kafkaSecondaryConsumer, this.config.getControlTopicName());
    }

    protected void assignConsumer(Consumer<String, T> consumer, String str) {
        List<PartitionInfo> partitionsFor = consumer.partitionsFor(str);
        ArrayList arrayList = new ArrayList();
        if (partitionsFor != null) {
            for (PartitionInfo partitionInfo : partitionsFor) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
            if (!arrayList.isEmpty()) {
                consumer.assign(arrayList);
            }
        }
        if (this.snapshotInfos != null) {
            if (arrayList.size() > 1) {
                throw new RuntimeException("The system must run with only one partition per topic");
            }
            consumer.assignment().forEach(topicPartition -> {
                consumer.seek((TopicPartition) arrayList.iterator().next(), this.snapshotInfos.getOffsetDuringSnapshot());
            });
        } else if (this.currentState.equals(State.LEADER)) {
            consumer.assignment().forEach(topicPartition2 -> {
                consumer.seek((TopicPartition) arrayList.iterator().next(), this.lastProcessedEventOffset);
            });
        } else if (this.currentState.equals(State.REPLICA)) {
            consumer.assignment().forEach(topicPartition3 -> {
                consumer.seek((TopicPartition) arrayList.iterator().next(), this.lastProcessedControlOffset);
            });
        }
    }

    @Override // org.kie.hacep.core.infra.consumer.EventConsumer
    public void poll(int i) {
        Thread currentThread = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.logger.info("Starting exit...\n");
            this.kafkaConsumer.wakeup();
            if (this.kafkaSecondaryConsumer != null) {
                this.kafkaSecondaryConsumer.wakeup();
            }
            try {
                currentThread.join();
            } catch (InterruptedException e) {
                this.logger.error(e.getMessage(), (Throwable) e);
            }
        }));
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("Can't poll, kafkaConsumer not subscribed or null!");
        }
        if (this.kafkaSecondaryConsumer == null) {
            throw new IllegalStateException("Can't poll, kafkaConsumer not subscribed or null!");
        }
        while (!this.exit) {
            try {
                consume(i);
            } catch (WakeupException e) {
                try {
                    this.kafkaConsumer.commitSync();
                    this.kafkaSecondaryConsumer.commitSync();
                    if (this.logger.isDebugEnabled()) {
                        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsetsEvents.entrySet()) {
                            this.logger.debug("Consumer partition %s - lastOffset %s\n", Integer.valueOf(entry.getKey().partition()), Long.valueOf(entry.getValue().offset()));
                        }
                    }
                    OffsetManager.store(this.offsetsEvents);
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                    return;
                } catch (WakeupException e2) {
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                    return;
                } catch (Throwable th) {
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    this.kafkaConsumer.commitSync();
                    this.kafkaSecondaryConsumer.commitSync();
                    if (this.logger.isDebugEnabled()) {
                        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : this.offsetsEvents.entrySet()) {
                            this.logger.debug("Consumer partition %s - lastOffset %s\n", Integer.valueOf(entry2.getKey().partition()), Long.valueOf(entry2.getValue().offset()));
                        }
                    }
                    OffsetManager.store(this.offsetsEvents);
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                } catch (WakeupException e3) {
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                } catch (Throwable th3) {
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                    throw th3;
                }
                throw th2;
            }
        }
        try {
            this.kafkaConsumer.commitSync();
            this.kafkaSecondaryConsumer.commitSync();
            if (this.logger.isDebugEnabled()) {
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry3 : this.offsetsEvents.entrySet()) {
                    this.logger.debug("Consumer partition %s - lastOffset %s\n", Integer.valueOf(entry3.getKey().partition()), Long.valueOf(entry3.getValue().offset()));
                }
            }
            OffsetManager.store(this.offsetsEvents);
            this.logger.info("Closing kafkaConsumer on the loop");
            this.kafkaConsumer.close();
            this.kafkaSecondaryConsumer.close();
        } catch (WakeupException e4) {
            this.logger.info("Closing kafkaConsumer on the loop");
            this.kafkaConsumer.close();
            this.kafkaSecondaryConsumer.close();
        } catch (Throwable th4) {
            this.logger.info("Closing kafkaConsumer on the loop");
            this.kafkaConsumer.close();
            this.kafkaSecondaryConsumer.close();
            throw th4;
        }
    }

    protected void updateOnRunningConsumer(State state) {
        this.logger.info("updateOnRunning COnsumer");
        if (state.equals(State.LEADER)) {
            DroolsExecutor.setAsLeader();
            restart(state);
        } else if (state.equals(State.REPLICA)) {
            DroolsExecutor.setAsReplica();
            restart(state);
        }
    }

    protected void restart(State state) {
        stopConsume();
        restartConsumer();
        enableConsumeAndStartLoop(state);
    }

    protected void enableConsumeAndStartLoop(State state) {
        if (state.equals(State.LEADER)) {
            this.currentState = State.LEADER;
            DroolsExecutor.setAsLeader();
        } else if (state.equals(State.REPLICA)) {
            this.currentState = State.REPLICA;
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
            DroolsExecutor.setAsReplica();
        }
        setLastProcessedKey();
        assignAndStartConsume();
    }

    protected void setLastProcessedKey() {
        ControlMessage lastEvent = ConsumerUtils.getLastEvent(this.config.getControlTopicName(), Integer.valueOf(this.config.getPollTimeout()));
        settingsOnAEmptyControlTopic(lastEvent);
        this.processingKey = lastEvent.getId();
        this.processingKeyOffset = lastEvent.getOffset();
    }

    protected void settingsOnAEmptyControlTopic(ControlMessage controlMessage) {
        if (controlMessage.getId() == null && this.currentState.equals(State.REPLICA)) {
            pollControl();
        }
    }

    protected void assignAndStartConsume() {
        assign();
        startConsume();
    }

    protected void consume(int i) {
        if (this.started) {
            if (this.currentState.equals(State.LEADER)) {
                defaultProcessAsLeader(i);
            } else {
                defaultProcessAsAReplica(i);
            }
        }
    }

    protected void defaultProcessAsLeader(int i) {
        pollEvents();
        if (this.eventsBuffer != null && this.eventsBuffer.size() > 0) {
            consumeEventsFromBufferAsALeader();
        }
        ConsumerRecords<String, T> poll = this.kafkaConsumer.poll(Duration.of(i, ChronoUnit.MILLIS));
        if (poll.isEmpty()) {
            pollControl();
            return;
        }
        ConsumerRecord<String, T> next = poll.iterator().next();
        this.eventsBuffer = poll.records(new TopicPartition(next.topic(), next.partition()));
        consumeEventsFromBufferAsALeader();
    }

    protected void processLeader(ConsumerRecord<String, T> consumerRecord) {
        if (this.config.isSkipOnDemanSnapshot()) {
            handleSnapshotBetweenIteration(consumerRecord);
        } else {
            this.consumerHandler.process(ItemToProcess.getItemToProcess(consumerRecord), this.currentState);
        }
        this.processingKey = consumerRecord.key();
        saveOffset(consumerRecord, this.kafkaConsumer);
        if (this.logger.isInfoEnabled() || this.config.isUnderTest()) {
            this.printer.prettyPrinter("DefaulImprovedKafkaConsumer.processLeader record:{}", consumerRecord, true);
        }
    }

    protected void consumeEventsFromBufferAsALeader() {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("Pre consumeEventsFromBufferAsALeader eventsBufferSize:{}", Integer.valueOf(this.eventsBuffer.size()));
        }
        int i = 0;
        int size = this.eventsBuffer.size();
        Iterator<ConsumerRecord<String, T>> it = this.eventsBuffer.iterator();
        if (it.hasNext()) {
            processLeader(it.next());
            i = 0 + 1;
            if (size > i) {
                this.eventsBuffer = this.eventsBuffer.subList(i, size);
            }
        }
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("post consumeEventsFromBufferAsALeader eventsBufferSize:{}", Integer.valueOf(this.eventsBuffer.size()));
        }
        if (size == i) {
            this.eventsBuffer = null;
        }
    }

    protected void handleSnapshotBetweenIteration(ConsumerRecord<String, T> consumerRecord) {
        if (this.counter.incrementAndGet() != this.iterationBetweenSnapshot) {
            this.consumerHandler.process(ItemToProcess.getItemToProcess(consumerRecord), this.currentState);
        } else {
            this.counter.set(0);
            this.consumerHandler.processWithSnapshot(ItemToProcess.getItemToProcess(consumerRecord), this.currentState);
        }
    }

    protected void defaultProcessAsAReplica(int i) {
        if (this.polledTopic.equals(PolledTopic.EVENTS)) {
            if (this.eventsBuffer != null && this.eventsBuffer.size() > 0) {
                consumeEventsFromBufferAsAReplica();
            }
            ConsumerRecords<String, T> poll = this.kafkaConsumer.poll(Duration.of(i, ChronoUnit.MILLIS));
            if (poll.isEmpty()) {
                pollControl();
            } else {
                ConsumerRecord<String, T> next = poll.iterator().next();
                this.eventsBuffer = poll.records(new TopicPartition(next.topic(), next.partition()));
                consumeEventsFromBufferAsAReplica();
            }
        }
        if (this.polledTopic.equals(PolledTopic.CONTROL)) {
            if (this.controlBuffer != null && this.controlBuffer.size() > 0) {
                consumeControlFromBufferAsAReplica();
            }
            ConsumerRecords<String, T> poll2 = this.kafkaSecondaryConsumer.poll(Duration.of(i, ChronoUnit.MILLIS));
            if (poll2.count() > 0) {
                ConsumerRecord<String, T> next2 = poll2.iterator().next();
                this.controlBuffer = poll2.records(new TopicPartition(next2.topic(), next2.partition()));
                consumeControlFromBufferAsAReplica();
            }
        }
    }

    protected void consumeEventsFromBufferAsAReplica() {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("consumeEventsFromBufferAsAReplica eventsBufferSize:{}", Integer.valueOf(this.eventsBuffer.size()));
        }
        int i = 0;
        int size = this.eventsBuffer.size();
        Iterator<ConsumerRecord<String, T>> it = this.eventsBuffer.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            processEventsAsAReplica(it.next());
            i++;
            if (this.polledTopic.equals(PolledTopic.CONTROL)) {
                if (size > i) {
                    this.eventsBuffer = this.eventsBuffer.subList(i, size);
                }
            }
        }
        if (size == i) {
            this.eventsBuffer = null;
        }
    }

    protected void consumeControlFromBufferAsAReplica() {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("consumeControlFromBufferAsAReplica:{}", Integer.valueOf(this.controlBuffer.size()));
        }
        int i = 0;
        int size = this.controlBuffer.size();
        Iterator<ConsumerRecord<String, T>> it = this.controlBuffer.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            processControlAsAReplica(it.next());
            i++;
            if (this.polledTopic.equals(PolledTopic.EVENTS)) {
                if (size > i) {
                    this.controlBuffer = this.controlBuffer.subList(i, size);
                }
            }
        }
        if (size == i) {
            this.controlBuffer = null;
        }
    }

    protected void processEventsAsAReplica(ConsumerRecord<String, T> consumerRecord) {
        ItemToProcess itemToProcess = ItemToProcess.getItemToProcess(consumerRecord);
        if (!consumerRecord.key().equals(this.processingKey)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("processEventsAsAReplica still {} events in the eventsBuffer to consume and processing item:{}.", Integer.valueOf(this.eventsBuffer.size()), itemToProcess);
            }
            this.consumerHandler.process(ItemToProcess.getItemToProcess(consumerRecord), this.currentState);
            saveOffset(consumerRecord, this.kafkaConsumer);
            return;
        }
        this.lastProcessedEventOffset = consumerRecord.offset();
        pollControl();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("processEventsAsAReplica change topic, switch to consume control, still {} events in the eventsBuffer to consume and processing item:{}.", Integer.valueOf(this.eventsBuffer.size()), itemToProcess);
        }
        this.consumerHandler.process(itemToProcess, this.currentState);
        saveOffset(consumerRecord, this.kafkaConsumer);
    }

    protected void processControlAsAReplica(ConsumerRecord<String, T> consumerRecord) {
        if (consumerRecord.offset() == this.processingKeyOffset + 1 || consumerRecord.offset() == 0) {
            this.lastProcessedControlOffset = consumerRecord.offset();
            this.processingKey = consumerRecord.key();
            this.processingKeyOffset = consumerRecord.offset();
            this.consumerHandler.processSideEffectsOnReplica(((ControlMessage) SerializationUtil.deserialize((byte[]) consumerRecord.value())).getSideEffects());
            pollEvents();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("change topic, switch to consume events");
            }
        }
        if (this.processingKey == null) {
            this.processingKey = consumerRecord.key();
            this.processingKeyOffset = consumerRecord.offset();
        }
        saveOffset(consumerRecord, this.kafkaSecondaryConsumer);
    }

    protected void saveOffset(ConsumerRecord<String, T> consumerRecord, Consumer<String, T> consumer) {
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
        consumer.commitSync(hashMap);
    }

    protected void startConsume() {
        this.started = true;
    }

    protected void stopConsume() {
        this.started = false;
    }

    protected void pollControl() {
        this.polledTopic = PolledTopic.CONTROL;
    }

    protected void pollEvents() {
        this.polledTopic = PolledTopic.EVENTS;
    }
}
