package org.kie.remote.impl.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.kie.remote.TopicsConfig;
import org.kie.remote.message.ResultMessage;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:_bootstrap/openshift-kie-thorntail.war:WEB-INF/lib/kie-remote-7.25.0-SNAPSHOT.jar:org/kie/remote/impl/consumer/KafkaListenerThread.class
 */
/* loaded from: input_file:m2repo/org/kie/kie-remote/7.25.0-SNAPSHOT/kie-remote-7.25.0-SNAPSHOT.jar:org/kie/remote/impl/consumer/KafkaListenerThread.class */
public class KafkaListenerThread implements ListenerThread {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) KafkaListenerThread.class);
    private Properties configuration;
    private TopicsConfig topicsConfig;
    private Map<String, CompletableFuture<Object>> requestsStore;
    private KafkaConsumer consumer;
    private volatile boolean running = true;

    public KafkaListenerThread(Properties properties, TopicsConfig topicsConfig, Map<String, CompletableFuture<Object>> map) {
        this.configuration = properties;
        this.topicsConfig = topicsConfig;
        this.requestsStore = map;
        prepareConsumer();
    }

    private void prepareConsumer() {
        this.consumer = new KafkaConsumer(this.configuration);
        List<PartitionInfo> partitionsFor = this.consumer.partitionsFor(this.topicsConfig.getKieSessionInfosTopicName());
        ArrayList arrayList = new ArrayList();
        if (partitionsFor != null) {
            Iterator<PartitionInfo> it = partitionsFor.iterator();
            while (it.hasNext()) {
                arrayList.add(new TopicPartition(this.topicsConfig.getKieSessionInfosTopicName(), it.next().partition()));
            }
        }
        this.consumer.assign(arrayList);
        Long l = 0L;
        Iterator<Map.Entry<TopicPartition, Long>> it2 = this.consumer.endOffsets(arrayList).entrySet().iterator();
        while (it2.hasNext()) {
            l = it2.next().getValue();
        }
        if (l.longValue() == 0) {
            l = 1L;
        }
        Iterator<TopicPartition> it3 = this.consumer.assignment().iterator();
        while (it3.hasNext()) {
            this.consumer.seek(it3.next(), l.longValue() - 1);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                try {
                    Iterator it = this.consumer.poll(Duration.of(1000L, ChronoUnit.MILLIS)).iterator();
                    while (it.hasNext()) {
                        Object deserialize = SerializationUtil.deserialize((byte[]) ((ConsumerRecord) it.next()).value());
                        if (deserialize instanceof ResultMessage) {
                            complete(this.requestsStore, (ResultMessage) deserialize, logger);
                        }
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), (Throwable) e);
                    this.consumer.close();
                    return;
                }
            } finally {
                this.consumer.close();
            }
        }
    }

    @Override // org.kie.remote.impl.consumer.ListenerThread
    public void stop() {
        this.running = false;
    }
}
