/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep.core.infra.consumer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.consumer.DroolsConsumerHandler;
import org.kie.hacep.core.infra.DefaultSessionSnapShooter;
import org.kie.hacep.core.infra.OffsetManager;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.core.infra.consumer.ConsumerHandler;
import org.kie.hacep.core.infra.consumer.EventConsumer;
import org.kie.hacep.core.infra.consumer.ItemToProcess;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.core.infra.utils.ConsumerUtils;
import org.kie.hacep.message.ControlMessage;
import org.kie.hacep.util.Printer;
import org.kie.hacep.util.PrinterUtil;
import org.kie.remote.DroolsExecutor;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaConsumer<T>
implements EventConsumer {
    private Logger logger = LoggerFactory.getLogger(DefaultKafkaConsumer.class);
    private Map<TopicPartition, OffsetAndMetadata> offsetsEvents = new HashMap<TopicPartition, OffsetAndMetadata>();
    private Consumer<String, T> kafkaConsumer;
    private Consumer<String, T> kafkaSecondaryConsumer;
    private DroolsConsumerHandler consumerHandler;
    private volatile String processingKey = "";
    private volatile long processingKeyOffset;
    private volatile long lastProcessedControlOffset;
    private volatile long lastProcessedEventOffset;
    private volatile boolean started;
    private volatile boolean exit = false;
    private volatile State currentState = State.REPLICA;
    private volatile PolledTopic polledTopic = PolledTopic.CONTROL;
    private int iterationBetweenSnapshot;
    private List<ConsumerRecord<String, T>> eventsBuffer;
    private List<ConsumerRecord<String, T>> controlBuffer;
    private AtomicInteger counter;
    private SnapshotInfos snapshotInfos;
    private DefaultSessionSnapShooter snapShooter;
    private Printer printer;
    private EnvConfig envConfig;
    private Logger loggerForTest;
    private volatile boolean askedSnapshotOnDemand;

    public DefaultKafkaConsumer(EnvConfig config) {
        this.envConfig = config;
        if (this.envConfig.isSkipOnDemanSnapshot()) {
            this.counter = new AtomicInteger(0);
        }
        this.iterationBetweenSnapshot = this.envConfig.getIterationBetweenSnapshot();
        this.printer = PrinterUtil.getPrinter(this.envConfig);
        if (this.envConfig.isUnderTest()) {
            this.loggerForTest = PrinterUtil.getKafkaLoggerForTest(this.envConfig);
        }
    }

    @Override
    public void initConsumer(ConsumerHandler consumerHandler) {
        this.consumerHandler = (DroolsConsumerHandler)consumerHandler;
        this.snapShooter = this.consumerHandler.getSnapshooter();
        this.kafkaConsumer = new KafkaConsumer(Config.getConsumerConfig("PrimaryConsumer"));
        if (this.currentState.equals((Object)State.REPLICA)) {
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
        }
    }

    protected void restartConsumer() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Restart Consumers");
        }
        this.snapshotInfos = this.snapShooter.deserialize();
        this.kafkaConsumer = new KafkaConsumer(Config.getConsumerConfig("PrimaryConsumer"));
        this.assign();
        this.kafkaSecondaryConsumer = this.currentState.equals((Object)State.REPLICA) ? new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer")) : null;
    }

    @Override
    public void stop() {
        this.stopConsume();
        this.kafkaConsumer.wakeup();
        if (this.kafkaSecondaryConsumer != null) {
            this.kafkaSecondaryConsumer.wakeup();
        }
        this.exit = true;
        this.consumerHandler.stop();
    }

    @Override
    public void updateStatus(State state) {
        boolean changedState;
        boolean bl = changedState = !state.equals((Object)this.currentState);
        if (this.currentState == null || changedState) {
            this.currentState = state;
        }
        if (this.started && changedState && !this.currentState.equals((Object)State.BECOMING_LEADER)) {
            this.updateOnRunningConsumer(state);
        } else if (!this.started) {
            if (state.equals((Object)State.REPLICA) && !this.envConfig.isSkipOnDemanSnapshot() && !this.askedSnapshotOnDemand) {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("askAndProcessSnapshotOnDemand:");
                }
                this.askAndProcessSnapshotOnDemand();
            }
            if (state.equals((Object)State.LEADER) || state.equals((Object)State.REPLICA)) {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("enableConsumeAndStartLoop:{}", (Object)state);
                }
                this.enableConsumeAndStartLoop(state);
            }
        }
    }

    protected void askAndProcessSnapshotOnDemand() {
        this.askedSnapshotOnDemand = true;
        boolean completed = this.consumerHandler.initializeKieSessionFromSnapshotOnDemand(this.envConfig);
        if (this.logger.isInfoEnabled()) {
            this.logger.info("askAndProcessSnapshotOnDemand:{}", (Object)completed);
        }
        if (!completed) {
            throw new RuntimeException("Can't obtain a snapshot on demand");
        }
    }

    protected void assign() {
        if (this.currentState.equals((Object)State.LEADER)) {
            this.assignAsALeader();
        } else {
            this.assignReplica();
        }
    }

    protected void assignAsALeader() {
        this.assignConsumer(this.kafkaConsumer, this.envConfig.getEventsTopicName());
    }

    protected void assignReplica() {
        this.assignConsumer(this.kafkaConsumer, this.envConfig.getEventsTopicName());
        this.assignConsumer(this.kafkaSecondaryConsumer, this.envConfig.getControlTopicName());
    }

    protected void assignConsumer(Consumer<String, T> kafkaConsumer, String topic) {
        List partitionsInfo = kafkaConsumer.partitionsFor(topic);
        ArrayList<TopicPartition> partitionCollection = new ArrayList<TopicPartition>();
        if (partitionsInfo != null) {
            for (PartitionInfo partition : partitionsInfo) {
                partitionCollection.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            if (!partitionCollection.isEmpty()) {
                kafkaConsumer.assign(partitionCollection);
            }
        }
        if (this.snapshotInfos != null) {
            if (partitionCollection.size() > 1) {
                throw new RuntimeException("The system must run with only one partition per topic");
            }
            kafkaConsumer.assignment().forEach(topicPartition -> kafkaConsumer.seek((TopicPartition)partitionCollection.iterator().next(), this.snapshotInfos.getOffsetDuringSnapshot()));
        } else if (this.currentState.equals((Object)State.LEADER)) {
            kafkaConsumer.assignment().forEach(topicPartition -> kafkaConsumer.seek((TopicPartition)partitionCollection.iterator().next(), this.lastProcessedEventOffset));
        } else if (this.currentState.equals((Object)State.REPLICA)) {
            kafkaConsumer.assignment().forEach(topicPartition -> kafkaConsumer.seek((TopicPartition)partitionCollection.iterator().next(), this.lastProcessedControlOffset));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void poll() {
        Thread mainThread = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.logger.info("Starting exit...\n");
            this.kafkaConsumer.wakeup();
            if (this.kafkaSecondaryConsumer != null) {
                this.kafkaSecondaryConsumer.wakeup();
            }
            try {
                mainThread.join();
            }
            catch (InterruptedException e) {
                this.logger.error(e.getMessage(), (Throwable)e);
            }
        }));
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("Can't poll, kafkaConsumer not subscribed or null!");
        }
        if (this.kafkaSecondaryConsumer == null) {
            throw new IllegalStateException("Can't poll, kafkaConsumer not subscribed or null!");
        }
        try {
            while (!this.exit) {
                this.consume();
            }
        }
        catch (WakeupException wakeupException) {
            try {
                this.kafkaConsumer.commitSync();
                this.kafkaSecondaryConsumer.commitSync();
                if (this.logger.isDebugEnabled()) {
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsetsEvents.entrySet()) {
                        this.logger.debug("Consumer partition %s - lastOffset %s\n", (Object)entry.getKey().partition(), (Object)entry.getValue().offset());
                    }
                }
                OffsetManager.store(this.offsetsEvents);
                return;
            }
            catch (WakeupException wakeupException2) {
                return;
            }
            finally {
                this.logger.info("Closing kafkaConsumer on the loop");
                this.kafkaConsumer.close();
                this.kafkaSecondaryConsumer.close();
            }
        }
        catch (Throwable throwable) {
            try {
                this.kafkaConsumer.commitSync();
                this.kafkaSecondaryConsumer.commitSync();
                if (this.logger.isDebugEnabled()) {
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsetsEvents.entrySet()) {
                        this.logger.debug("Consumer partition %s - lastOffset %s\n", (Object)entry.getKey().partition(), (Object)entry.getValue().offset());
                    }
                }
                OffsetManager.store(this.offsetsEvents);
                throw throwable;
            }
            catch (WakeupException wakeupException) {
                throw throwable;
            }
            finally {
                this.logger.info("Closing kafkaConsumer on the loop");
                this.kafkaConsumer.close();
                this.kafkaSecondaryConsumer.close();
            }
        }
        try {
            this.kafkaConsumer.commitSync();
            this.kafkaSecondaryConsumer.commitSync();
            if (this.logger.isDebugEnabled()) {
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsetsEvents.entrySet()) {
                    this.logger.debug("Consumer partition %s - lastOffset %s\n", (Object)entry.getKey().partition(), (Object)entry.getValue().offset());
                }
            }
            OffsetManager.store(this.offsetsEvents);
            return;
        }
        catch (WakeupException wakeupException) {
            return;
        }
        finally {
            this.logger.info("Closing kafkaConsumer on the loop");
            this.kafkaConsumer.close();
            this.kafkaSecondaryConsumer.close();
        }
    }

    protected void updateOnRunningConsumer(State state) {
        this.logger.info("updateOnRunning COnsumer");
        if (state.equals((Object)State.LEADER)) {
            DroolsExecutor.setAsLeader();
            this.restart(state);
        } else if (state.equals((Object)State.REPLICA)) {
            DroolsExecutor.setAsReplica();
            this.restart(state);
        }
    }

    protected void restart(State state) {
        this.stopConsume();
        this.restartConsumer();
        this.enableConsumeAndStartLoop(state);
    }

    protected void enableConsumeAndStartLoop(State state) {
        if (state.equals((Object)State.LEADER)) {
            this.currentState = State.LEADER;
            DroolsExecutor.setAsLeader();
        } else if (state.equals((Object)State.REPLICA)) {
            this.currentState = State.REPLICA;
            this.kafkaSecondaryConsumer = new KafkaConsumer(Config.getConsumerConfig("SecondaryConsumer"));
            DroolsExecutor.setAsReplica();
        }
        this.setLastProcessedKey();
        this.assignAndStartConsume();
    }

    protected void setLastProcessedKey() {
        ControlMessage lastControlMessage = ConsumerUtils.getLastEvent(this.envConfig.getControlTopicName(), this.envConfig.getPollTimeout());
        this.settingsOnAEmptyControlTopic(lastControlMessage);
        this.processingKey = lastControlMessage.getId();
        this.processingKeyOffset = lastControlMessage.getOffset();
    }

    protected void settingsOnAEmptyControlTopic(ControlMessage lastWrapper) {
        if (lastWrapper.getId() == null && this.currentState.equals((Object)State.REPLICA)) {
            this.pollControl();
        }
    }

    protected void assignAndStartConsume() {
        this.assign();
        this.startConsume();
    }

    protected void consume() {
        if (this.started) {
            if (this.currentState.equals((Object)State.LEADER)) {
                this.defaultProcessAsLeader();
            } else {
                this.defaultProcessAsAReplica();
            }
        }
    }

    protected void defaultProcessAsLeader() {
        ConsumerRecords records;
        this.pollEvents();
        if (this.eventsBuffer != null && this.eventsBuffer.size() > 0) {
            this.consumeEventsFromBufferAsALeader();
        }
        if (!(records = this.kafkaConsumer.poll(this.envConfig.getPollDuration())).isEmpty() && this.eventsBuffer == null) {
            ConsumerRecord first = (ConsumerRecord)records.iterator().next();
            this.eventsBuffer = records.records(new TopicPartition(first.topic(), first.partition()));
            this.consumeEventsFromBufferAsALeader();
        } else {
            this.pollControl();
        }
    }

    protected void processLeader(ConsumerRecord<String, T> record) {
        if (this.envConfig.isSkipOnDemanSnapshot()) {
            this.handleSnapshotBetweenIteration(record);
        } else {
            this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
        }
        this.processingKey = (String)record.key();
        this.saveOffset(record, this.kafkaConsumer);
        if (this.logger.isInfoEnabled() || this.envConfig.isUnderTest()) {
            this.printer.prettyPrinter("DefaulImprovedKafkaConsumer.processLeader record:{}", record, true);
        }
    }

    protected void consumeEventsFromBufferAsALeader() {
        for (ConsumerRecord<String, T> record : this.eventsBuffer) {
            this.processLeader(record);
        }
        this.eventsBuffer = null;
    }

    protected void handleSnapshotBetweenIteration(ConsumerRecord<String, T> record) {
        int iteration = this.counter.incrementAndGet();
        if (iteration == this.iterationBetweenSnapshot) {
            this.counter.set(0);
            this.consumerHandler.processWithSnapshot(ItemToProcess.getItemToProcess(record), this.currentState);
        } else {
            this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
        }
    }

    protected void defaultProcessAsAReplica() {
        ConsumerRecord first;
        ConsumerRecords records;
        if (this.polledTopic.equals((Object)PolledTopic.EVENTS)) {
            if (this.eventsBuffer != null && this.eventsBuffer.size() > 0) {
                this.consumeEventsFromBufferAsAReplica();
            }
            if (!(records = this.kafkaConsumer.poll(this.envConfig.getPollDuration())).isEmpty()) {
                first = (ConsumerRecord)records.iterator().next();
                this.eventsBuffer = records.records(new TopicPartition(first.topic(), first.partition()));
                this.consumeEventsFromBufferAsAReplica();
            } else {
                this.pollControl();
            }
        }
        if (this.polledTopic.equals((Object)PolledTopic.CONTROL)) {
            if (this.controlBuffer != null && this.controlBuffer.size() > 0) {
                this.consumeControlFromBufferAsAReplica();
            }
            if ((records = this.kafkaSecondaryConsumer.poll(this.envConfig.getPollDuration())).count() > 0) {
                first = (ConsumerRecord)records.iterator().next();
                this.controlBuffer = records.records(new TopicPartition(first.topic(), first.partition()));
                this.consumeControlFromBufferAsAReplica();
            }
        }
    }

    protected void consumeEventsFromBufferAsAReplica() {
        if (this.envConfig.isUnderTest()) {
            this.loggerForTest.warn("consumeEventsFromBufferAsAReplica eventsBufferSize:{}", (Object)this.eventsBuffer.size());
        }
        int index = 0;
        int end = this.eventsBuffer.size();
        for (ConsumerRecord<String, T> record : this.eventsBuffer) {
            this.processEventsAsAReplica(record);
            ++index;
            if (!this.polledTopic.equals((Object)PolledTopic.CONTROL)) continue;
            if (end <= index) break;
            this.eventsBuffer = this.eventsBuffer.subList(index, end);
            break;
        }
        if (end == index) {
            this.eventsBuffer = null;
        }
    }

    protected void consumeControlFromBufferAsAReplica() {
        for (ConsumerRecord<String, T> record : this.controlBuffer) {
            this.processControlAsAReplica(record);
        }
        this.controlBuffer = null;
    }

    protected void processEventsAsAReplica(ConsumerRecord<String, T> record) {
        ItemToProcess item = ItemToProcess.getItemToProcess(record);
        if (((String)record.key()).equals(this.processingKey)) {
            this.lastProcessedEventOffset = record.offset();
            this.pollControl();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("processEventsAsAReplica change topic, switch to consume control, still {} events in the eventsBuffer to consume and processing item:{}.", (Object)this.eventsBuffer.size(), (Object)item);
            }
            this.consumerHandler.process(item, this.currentState);
            this.saveOffset(record, this.kafkaConsumer);
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("processEventsAsAReplica still {} events in the eventsBuffer to consume and processing item:{}.", (Object)this.eventsBuffer.size(), (Object)item);
            }
            this.consumerHandler.process(ItemToProcess.getItemToProcess(record), this.currentState);
            this.saveOffset(record, this.kafkaConsumer);
        }
    }

    protected void processControlAsAReplica(ConsumerRecord<String, T> record) {
        if (record.offset() == this.processingKeyOffset + 1L || record.offset() == 0L) {
            this.lastProcessedControlOffset = record.offset();
            this.processingKey = (String)record.key();
            this.processingKeyOffset = record.offset();
            ControlMessage wr = (ControlMessage)SerializationUtil.deserialize((byte[])((byte[])record.value()));
            this.consumerHandler.processSideEffectsOnReplica(wr.getSideEffects());
            this.pollEvents();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("change topic, switch to consume events");
            }
        }
        if (this.processingKey == null) {
            this.processingKey = (String)record.key();
            this.processingKeyOffset = record.offset();
        }
        this.saveOffset(record, this.kafkaSecondaryConsumer);
    }

    protected void saveOffset(ConsumerRecord<String, T> record, Consumer<String, T> kafkaConsumer) {
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
        kafkaConsumer.commitSync(map);
    }

    protected void startConsume() {
        this.started = true;
    }

    protected void stopConsume() {
        this.started = false;
    }

    protected void pollControl() {
        this.polledTopic = PolledTopic.CONTROL;
    }

    protected void pollEvents() {
        this.polledTopic = PolledTopic.EVENTS;
    }

    public static enum PolledTopic {
        EVENTS,
        CONTROL,
        NONE;

    }
}

