/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep.core.infra.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.consumer.DroolsConsumerHandler;
import org.kie.hacep.core.infra.DeafultSessionSnapShooter;
import org.kie.hacep.core.infra.OffsetManager;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.core.infra.consumer.ConsumerHandler;
import org.kie.hacep.core.infra.consumer.EventConsumer;
import org.kie.hacep.core.infra.consumer.ItemToProcess;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.core.infra.utils.ConsumerUtils;
import org.kie.hacep.message.ControlMessage;
import org.kie.hacep.util.Printer;
import org.kie.hacep.util.PrinterUtil;
import org.kie.remote.DroolsExecutor;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaConsumer<T>
implements EventConsumer {
    private Logger logger = LoggerFactory.getLogger(DefaultKafkaConsumer.class);
    private Map<TopicPartition, OffsetAndMetadata> offsetsEvents = new HashMap<TopicPartition, OffsetAndMetadata>();
    private Consumer<String, T> kafkaConsumer;
    private Consumer<String, T> kafkaSecondaryConsumer;
    private DroolsConsumerHandler consumerHandler;
    private volatile State currentState;
    private volatile String processingKey = "";
    private volatile long processingKeyOffset;
    private volatile boolean leader = false;
    private volatile boolean started = false;
    private volatile boolean processingLeader;
    private volatile boolean processingReplica = false;
    private volatile boolean pollingEvents;
    private volatile boolean pollingControl = true;
    private int iterationBetweenSnapshot;
    private List<ConsumerRecord<String, T>> eventsBuffer;
    private List<ConsumerRecord<String, T>> controlBuffer;
    private AtomicInteger counter = new AtomicInteger(0);
    private SnapshotInfos snapshotInfos;
    private DeafultSessionSnapShooter snapShooter;
    private Printer printer;
    private EnvConfig config;
    private Logger loggerForTest;
    private volatile boolean askedSnapshotOnDemand;

    public DefaultKafkaConsumer(EnvConfig config) {
        this.config = config;
        this.iterationBetweenSnapshot = config.getIterationBetweenSnapshot();
        this.printer = PrinterUtil.getPrinter(config);
        if (config.isUnderTest()) {
            this.loggerForTest = PrinterUtil.getKafkaLoggerForTest(config);
        }
    }

    @Override
    public void initConsumer(ConsumerHandler consumerHandler) {
        this.consumerHandler = (DroolsConsumerHandler)consumerHandler;
        this.snapShooter = this.consumerHandler.getSnapshooter();
        this.kafkaConsumer = new KafkaConsumer(Config.getConsumerConfig("PrimaryConsumer"));
        if (!this.leader) {
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
        }
    }

    private void restartConsumer() {
        this.logger.info("Restart Consumers");
        this.snapshotInfos = this.snapShooter.deserialize();
        this.kafkaConsumer = new KafkaConsumer(Config.getConsumerConfig("PrimaryConsumer"));
        this.assign();
        this.kafkaSecondaryConsumer = !this.leader ? new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer")) : null;
    }

    @Override
    public void stop() {
        this.stopConsume();
        this.stopPollingControl();
        this.stopPollingEvents();
        this.kafkaConsumer.wakeup();
        if (this.kafkaSecondaryConsumer != null) {
            this.kafkaSecondaryConsumer.wakeup();
        }
        this.consumerHandler.stop();
    }

    @Override
    public void updateStatus(State state) {
        if (this.started) {
            this.updateOnRunningConsumer(state);
        } else {
            if (state.equals((Object)State.REPLICA) && !this.config.isSkipOnDemanSnapshot() && !this.askedSnapshotOnDemand) {
                this.askAndProcessSnapshotOnDemand();
            }
            if (state.equals((Object)State.LEADER) || state.equals((Object)State.REPLICA)) {
                this.enableConsumeAndStartLoop(state);
            }
        }
        this.currentState = state;
    }

    private void askAndProcessSnapshotOnDemand() {
        this.askedSnapshotOnDemand = true;
        boolean completed = this.consumerHandler.initializeKieSessionFromSnapshotOnDemand(this.config);
        if (!completed) {
            throw new RuntimeException("Can't obtain a snapshot on demand");
        }
    }

    private void assign() {
        if (this.leader) {
            this.assignAsALeader();
        } else {
            this.assignNotLeader();
        }
    }

    private void assignAsALeader() {
        this.assignConsumer(this.kafkaConsumer, this.config.getEventsTopicName());
    }

    private void assignNotLeader() {
        this.assignConsumer(this.kafkaConsumer, this.config.getEventsTopicName());
        this.assignConsumer(this.kafkaSecondaryConsumer, this.config.getControlTopicName());
    }

    private void assignConsumer(Consumer<String, T> kafkaConsumer, String topic) {
        List partitionsInfo = kafkaConsumer.partitionsFor(topic);
        ArrayList<TopicPartition> partitionCollection = new ArrayList<TopicPartition>();
        if (partitionsInfo != null) {
            for (PartitionInfo partition : partitionsInfo) {
                partitionCollection.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            if (!partitionCollection.isEmpty()) {
                kafkaConsumer.assign(partitionCollection);
            }
        }
        if (this.snapshotInfos != null) {
            if (partitionCollection.size() > 1) {
                throw new RuntimeException("The system must run with only one partition per topic");
            }
            kafkaConsumer.assignment().forEach(topicPartition -> kafkaConsumer.seek((TopicPartition)partitionCollection.iterator().next(), this.snapshotInfos.getOffsetDuringSnapshot()));
        } else {
            kafkaConsumer.assignment().forEach(topicPartition -> kafkaConsumer.seekToBeginning(partitionCollection));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void poll(int durationMillis) {
        Thread mainThread = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.logger.info("Starting exit...\n");
            this.kafkaConsumer.wakeup();
            if (this.kafkaSecondaryConsumer != null) {
                this.kafkaSecondaryConsumer.wakeup();
            }
            try {
                mainThread.join();
            }
            catch (InterruptedException e) {
                this.logger.error(e.getMessage(), (Throwable)e);
            }
        }));
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("Can't poll, kafkaConsumer not subscribed or null!");
        }
        if (this.kafkaSecondaryConsumer == null) {
            throw new IllegalStateException("Can't poll, kafkaConsumer not subscribed or null!");
        }
        try {
            try {
                while (true) {
                    this.consume(durationMillis);
                }
            }
            catch (WakeupException wakeupException) {
                try {
                    this.kafkaConsumer.commitSync();
                    this.kafkaSecondaryConsumer.commitSync();
                    if (this.logger.isDebugEnabled()) {
                        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsetsEvents.entrySet()) {
                            this.logger.debug("Consumer partition %s - lastOffset %s\n", (Object)entry.getKey().partition(), (Object)entry.getValue().offset());
                        }
                    }
                    OffsetManager.store(this.offsetsEvents);
                }
                catch (WakeupException wakeupException2) {
                }
                finally {
                    this.logger.info("Closing kafkaConsumer on the loop");
                    this.kafkaConsumer.close();
                    this.kafkaSecondaryConsumer.close();
                }
            }
        }
        catch (Throwable throwable) {
            try {
                this.kafkaConsumer.commitSync();
                this.kafkaSecondaryConsumer.commitSync();
                if (this.logger.isDebugEnabled()) {
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsetsEvents.entrySet()) {
                        this.logger.debug("Consumer partition %s - lastOffset %s\n", (Object)entry.getKey().partition(), (Object)entry.getValue().offset());
                    }
                }
                OffsetManager.store(this.offsetsEvents);
            }
            catch (WakeupException wakeupException) {
            }
            finally {
                this.logger.info("Closing kafkaConsumer on the loop");
                this.kafkaConsumer.close();
                this.kafkaSecondaryConsumer.close();
            }
            throw throwable;
        }
    }

    private void updateOnRunningConsumer(State state) {
        if (state.equals((Object)State.LEADER) && !this.leader) {
            DroolsExecutor.setAsLeader();
            this.restart(state);
        } else if (state.equals((Object)State.REPLICA) && this.leader) {
            DroolsExecutor.setAsReplica();
            this.restart(state);
        }
    }

    private void restart(State state) {
        this.stopConsume();
        this.restartConsumer();
        this.enableConsumeAndStartLoop(state);
    }

    private void enableConsumeAndStartLoop(State state) {
        if (state.equals((Object)State.LEADER) && !this.leader) {
            this.leader = true;
            DroolsExecutor.setAsLeader();
            this.stopLeaderProcessing();
        } else if (state.equals((Object)State.REPLICA) && this.leader) {
            this.leader = false;
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
            DroolsExecutor.setAsReplica();
            this.startProcessingNotLeader();
            this.startPollingEvents();
            this.stopPollingControl();
        } else if (state.equals((Object)State.REPLICA) && !this.leader) {
            this.leader = false;
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
            DroolsExecutor.setAsReplica();
            this.startProcessingNotLeader();
            this.stopPollingEvents();
            this.startPollingControl();
        }
        this.setLastProcessedKey();
        this.assignAndStartConsume();
    }

    private void setLastProcessedKey() {
        ControlMessage lastControlMessage = ConsumerUtils.getLastEvent(this.config.getControlTopicName(), this.config.getPollTimeout());
        this.settingsOnAEmptyControlTopic(lastControlMessage);
        this.processingKey = lastControlMessage.getId();
        this.processingKeyOffset = lastControlMessage.getOffset();
    }

    private void settingsOnAEmptyControlTopic(ControlMessage lastWrapper) {
        if (lastWrapper.getId() == null) {
            if (this.leader) {
                this.startProcessingLeader();
            } else {
                this.stopProcessingNotLeader();
                this.stopPollingEvents();
                this.startPollingControl();
            }
        }
    }

    private void assignAndStartConsume() {
        this.assign();
        this.startConsume();
    }

    private void consume(int millisTimeout) {
        if (this.started) {
            if (this.leader) {
                this.defaultProcessAsLeader(millisTimeout);
            } else {
                this.defaultProcessAsAReplica(millisTimeout);
            }
        }
    }

    private void defaultProcessAsLeader(int millisTimeout) {
        this.startPollingEvents();
        this.startProcessingLeader();
        ConsumerRecords records = this.kafkaConsumer.poll(Duration.of(millisTimeout, ChronoUnit.MILLIS));
        for (ConsumerRecord record : records) {
            this.processLeader(record, this.counter);
        }
    }

    private void processLeader(ConsumerRecord<String, T> record, AtomicInteger counter) {
        if (this.processingLeader) {
            if (this.config.isSkipOnDemanSnapshot()) {
                this.handleSnapshotBetweenIteration(record, counter);
            } else {
                this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
            }
            this.processingKey = (String)record.key();
            this.saveOffset(record, this.kafkaConsumer);
        }
        if (this.logger.isInfoEnabled() || this.config.isUnderTest()) {
            this.printer.prettyPrinter("DefaulKafkaConsumer.processLeader record:{}", record, this.processingLeader);
        }
    }

    private void handleSnapshotBetweenIteration(ConsumerRecord<String, T> record, AtomicInteger counter) {
        int iteration = counter.incrementAndGet();
        if (iteration == this.iterationBetweenSnapshot) {
            counter.set(0);
            this.consumerHandler.processWithSnapshot(ItemToProcess.getItemToProcess(record), this.currentState);
        } else {
            this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
        }
    }

    private void defaultProcessAsAReplica(int millisTimeout) {
        ConsumerRecord first;
        ConsumerRecords records;
        if (this.pollingEvents) {
            if (this.eventsBuffer != null && this.eventsBuffer.size() > 0) {
                this.consumeEventsFromBufferAsAReplica();
            }
            if ((records = this.kafkaConsumer.poll(Duration.of(millisTimeout, ChronoUnit.MILLIS))).count() > 0) {
                first = (ConsumerRecord)records.iterator().next();
                this.eventsBuffer = records.records(new TopicPartition(first.topic(), first.partition()));
                this.consumeEventsFromBufferAsAReplica();
            } else {
                this.stopPollingEvents();
                this.startPollingControl();
            }
        }
        if (this.pollingControl) {
            if (this.controlBuffer != null && this.controlBuffer.size() > 0) {
                this.consumeControlFromBufferAsAReplica();
            }
            if ((records = this.kafkaSecondaryConsumer.poll(Duration.of(millisTimeout, ChronoUnit.MILLIS))).count() > 0) {
                first = (ConsumerRecord)records.iterator().next();
                this.controlBuffer = records.records(new TopicPartition(first.topic(), first.partition()));
                this.consumeControlFromBufferAsAReplica();
            }
        }
    }

    private void consumeEventsFromBufferAsAReplica() {
        int index = 0;
        int end = this.eventsBuffer.size();
        for (ConsumerRecord<String, T> record : this.eventsBuffer) {
            this.processEventsAsAReplica(record);
            ++index;
            if (this.pollingEvents) continue;
            if (end <= index) break;
            this.eventsBuffer = this.eventsBuffer.subList(index, end);
            break;
        }
        if (end == index) {
            this.eventsBuffer = null;
        }
    }

    private void consumeControlFromBufferAsAReplica() {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("consumeControlFromBufferAsAReplica:{}", (Object)this.controlBuffer.size());
        }
        int index = 0;
        int end = this.controlBuffer.size();
        for (ConsumerRecord<String, T> record : this.controlBuffer) {
            this.processControlAsAReplica(record);
            ++index;
            if (this.pollingControl) continue;
            if (end <= index) break;
            this.controlBuffer = this.controlBuffer.subList(index, end);
            break;
        }
        if (end == index) {
            this.controlBuffer = null;
        }
    }

    private void processEventsAsAReplica(ConsumerRecord<String, T> record) {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("DefaulKafkaConsumer.processEventsAsAReplica record:{}", record);
        }
        if (((String)record.key()).equals(this.processingKey)) {
            this.stopPollingEvents();
            this.startPollingControl();
            this.stopProcessingNotLeader();
            this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
            this.saveOffset(record, this.kafkaConsumer);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("change topic, switch to consume control");
            }
        } else if (this.processingReplica) {
            this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
            this.saveOffset(record, this.kafkaConsumer);
        }
    }

    private void processControlAsAReplica(ConsumerRecord<String, T> record) {
        if (this.config.isUnderTest()) {
            this.loggerForTest.warn("DefaulKafkaConsumer.processControlAsAReplica record:{}", record);
        }
        if (record.offset() == this.processingKeyOffset + 1L || record.offset() == 0L) {
            this.processingKey = (String)record.key();
            this.processingKeyOffset = record.offset();
            ControlMessage wr = (ControlMessage)SerializationUtil.deserialize((byte[])((byte[])record.value()));
            this.consumerHandler.processSideEffectsOnReplica(wr.getSideEffects());
            this.stopPollingControl();
            this.startPollingEvents();
            this.startProcessingNotLeader();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("change topic, switch to consume events");
            }
        }
        if (this.processingKey == null) {
            this.processingKey = (String)record.key();
            this.processingKeyOffset = record.offset();
        }
        this.saveOffset(record, this.kafkaSecondaryConsumer);
    }

    private void saveOffset(ConsumerRecord<String, T> record, Consumer<String, T> kafkaConsumer) {
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
        kafkaConsumer.commitSync(map);
    }

    private void stopProcessingNotLeader() {
        this.processingReplica = false;
    }

    private void startProcessingLeader() {
        this.processingLeader = true;
    }

    private void startPollingControl() {
        this.pollingControl = true;
    }

    private void stopPollingControl() {
        this.pollingControl = false;
    }

    private void startConsume() {
        this.started = true;
    }

    private void stopConsume() {
        this.started = false;
    }

    private void startProcessingNotLeader() {
        this.processingReplica = true;
    }

    private void stopPollingEvents() {
        this.pollingEvents = false;
    }

    private void stopLeaderProcessing() {
        this.processingLeader = false;
    }

    private void startPollingEvents() {
        this.pollingEvents = true;
    }
}

