package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManager.class */
public class TaskManager {
    private final Logger log;
    private final ChangelogReader changelogReader;
    private final UUID processId;
    private final String logPrefix;
    private final ActiveTaskCreator activeTaskCreator;
    private final StandbyTaskCreator standbyTaskCreator;
    private final InternalTopologyBuilder builder;
    private final Admin adminClient;
    private final StateDirectory stateDirectory;
    private final StreamThread.ProcessingMode processingMode;
    private Consumer<byte[], byte[]> mainConsumer;
    private DeleteRecordsResult deleteRecordsResult;
    private java.util.function.Consumer<Set<TopicPartition>> resetter;
    private final Map<TaskId, Task> tasks = new TreeMap();
    private final Map<TopicPartition, Task> partitionToTask = new HashMap();
    private boolean rebalanceInProgress = false;
    private final Set<TaskId> lockedTaskDirectories = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskManager(ChangelogReader changelogReader, UUID uuid, String str, ActiveTaskCreator activeTaskCreator, StandbyTaskCreator standbyTaskCreator, InternalTopologyBuilder internalTopologyBuilder, Admin admin, StateDirectory stateDirectory, StreamThread.ProcessingMode processingMode) {
        this.changelogReader = changelogReader;
        this.processId = uuid;
        this.logPrefix = str;
        this.activeTaskCreator = activeTaskCreator;
        this.standbyTaskCreator = standbyTaskCreator;
        this.builder = internalTopologyBuilder;
        this.adminClient = admin;
        this.stateDirectory = stateDirectory;
        this.processingMode = processingMode;
        this.log = new LogContext(str).logger(getClass());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMainConsumer(Consumer<byte[], byte[]> consumer) {
        this.mainConsumer = consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Consumer<byte[], byte[]> mainConsumer() {
        return this.mainConsumer;
    }

    public UUID processId() {
        return this.processId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InternalTopologyBuilder builder() {
        return this.builder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRebalanceInProgress() {
        return this.rebalanceInProgress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRebalanceStart(Set<String> set) {
        this.builder.addSubscribedTopicsFromMetadata(set, this.logPrefix);
        tryToLockAllNonEmptyTaskDirectories();
        this.rebalanceInProgress = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRebalanceComplete() {
        this.mainConsumer.pause(this.mainConsumer.assignment());
        releaseLockedUnassignedTaskDirectories();
        this.rebalanceInProgress = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleCorruption(Map<TaskId, Collection<TopicPartition>> map) throws TaskMigratedException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<TaskId, Collection<TopicPartition>> entry : map.entrySet()) {
            Task task = this.tasks.get(entry.getKey());
            if (task.isActive()) {
                hashMap2.put(task, entry.getValue());
            } else {
                hashMap.put(task, entry.getValue());
            }
        }
        closeAndRevive(hashMap);
        commit((Collection) tasks().values().stream().filter(task2 -> {
            return task2.state() == Task.State.RUNNING || task2.state() == Task.State.RESTORING;
        }).filter(task3 -> {
            return !map.containsKey(task3.id());
        }).collect(Collectors.toSet()));
        closeAndRevive(hashMap2);
    }

    private void closeAndRevive(Map<Task, Collection<TopicPartition>> map) {
        for (Map.Entry<Task, Collection<TopicPartition>> entry : map.entrySet()) {
            Task key = entry.getKey();
            key.markChangelogAsCorrupted(entry.getValue());
            try {
                key.prepareCommit();
            } catch (RuntimeException e) {
                this.log.error("Error flushing cache for corrupted task {} ", key.id(), e);
            }
            try {
                key.suspend();
                key.postCommit(true);
            } catch (RuntimeException e2) {
                this.log.error("Error suspending corrupted task {} ", key.id(), e2);
            }
            key.closeDirty();
            if (key.isActive()) {
                Set assignment = mainConsumer().assignment();
                Set<TopicPartition> inputPartitions = key.inputPartitions();
                Set<TopicPartition> intersection = Utils.intersection(HashSet::new, assignment, new Set[]{inputPartitions});
                if (!intersection.equals(inputPartitions)) {
                    this.log.warn("Expected the current consumer assignment {} to contain the input partitions {}. Will proceed to recover.", assignment, inputPartitions);
                }
                mainConsumer().pause(intersection);
                for (Map.Entry entry2 : mainConsumer().committed(intersection).entrySet()) {
                    OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) entry2.getValue();
                    if (offsetAndMetadata != null) {
                        mainConsumer().seek((TopicPartition) entry2.getKey(), offsetAndMetadata);
                        intersection.remove(entry2.getKey());
                    }
                }
                this.resetter.accept(intersection);
            }
            key.revive();
        }
    }

    public void handleAssignment(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
        this.log.info("Handle new assignment with:\n\tNew active tasks: {}\n\tNew standby tasks: {}\n\tExisting active tasks: {}\n\tExisting standby tasks: {}", new Object[]{map.keySet(), map2.keySet(), activeTaskIds(), standbyTaskIds()});
        this.builder.addSubscribedTopicsFromAssignment((List) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()), this.logPrefix);
        LinkedHashMap<TaskId, RuntimeException> linkedHashMap = new LinkedHashMap<>();
        HashMap hashMap = new HashMap(map);
        HashMap hashMap2 = new HashMap(map2);
        Comparator comparing = Comparator.comparing((v0) -> {
            return v0.id();
        });
        TreeSet treeSet = new TreeSet(comparing);
        TreeSet treeSet2 = new TreeSet(comparing);
        TreeSet treeSet3 = new TreeSet(comparing);
        for (Task task : this.tasks.values()) {
            if (map.containsKey(task.id()) && task.isActive()) {
                updateInputPartitionsAndResume(task, map.get(task.id()));
                hashMap.remove(task.id());
            } else if (map2.containsKey(task.id()) && !task.isActive()) {
                updateInputPartitionsAndResume(task, map2.get(task.id()));
                hashMap2.remove(task.id());
            } else if (map.containsKey(task.id()) || map2.containsKey(task.id())) {
                treeSet.add(task);
            } else {
                treeSet2.add(task);
            }
        }
        handleCloseAndRecycle(treeSet, treeSet2, treeSet3, hashMap, hashMap2, linkedHashMap);
        if (!linkedHashMap.isEmpty()) {
            this.log.error("Hit exceptions while closing / recycling tasks: {}", linkedHashMap);
            for (Map.Entry<TaskId, RuntimeException> entry : linkedHashMap.entrySet()) {
                if (!(entry.getValue() instanceof TaskMigratedException)) {
                    if (!(entry.getValue() instanceof KafkaException)) {
                        throw new RuntimeException("Unexpected failure to close " + linkedHashMap.size() + " task(s) [" + linkedHashMap.keySet() + "]. First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue());
                    }
                    throw entry.getValue();
                }
            }
            throw linkedHashMap.entrySet().iterator().next().getValue();
        }
        if (!hashMap.isEmpty()) {
            Iterator<Task> it = this.activeTaskCreator.createTasks(this.mainConsumer, hashMap).iterator();
            while (it.hasNext()) {
                addNewTask(it.next());
            }
        }
        if (hashMap2.isEmpty()) {
            return;
        }
        Iterator<Task> it2 = this.standbyTaskCreator.createTasks(hashMap2).iterator();
        while (it2.hasNext()) {
            addNewTask(it2.next());
        }
    }

    private void handleCloseAndRecycle(Set<Task> set, Set<Task> set2, Set<Task> set3, Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2, LinkedHashMap<TaskId, RuntimeException> linkedHashMap) {
        Task createActiveTaskFromStandby;
        if (!set3.isEmpty()) {
            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
        }
        ArrayList<Task> arrayList = new ArrayList(set2);
        arrayList.addAll(set);
        for (Task task : arrayList) {
            try {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                if (!prepareCommit.isEmpty()) {
                    this.log.error("Task {} should has been committed when it was suspended, but it reports non-empty offsets {} to commit; it means it fails during last commit and hence should be closed dirty", task.id(), prepareCommit);
                    set3.add(task);
                } else if (!task.isActive()) {
                    task.suspend();
                    task.postCommit(true);
                }
            } catch (RuntimeException e) {
                this.log.error(String.format("Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:", task.id()), e);
                linkedHashMap.putIfAbsent(task.id(), e);
                set3.add(task);
            }
        }
        set2.removeAll(set3);
        for (Task task2 : set2) {
            try {
                completeTaskCloseClean(task2);
                cleanUpTaskProducer(task2, linkedHashMap);
                this.tasks.remove(task2.id());
            } catch (RuntimeException e2) {
                this.log.error(String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task2.id()), e2);
                linkedHashMap.putIfAbsent(task2.id(), e2);
                set3.add(task2);
            }
        }
        set.removeAll(set3);
        for (Task task3 : set) {
            try {
                if (task3.isActive()) {
                    createActiveTaskFromStandby = this.standbyTaskCreator.createStandbyTaskFromActive((StreamTask) task3, map2.remove(task3.id()));
                    cleanUpTaskProducer(task3, linkedHashMap);
                } else {
                    createActiveTaskFromStandby = this.activeTaskCreator.createActiveTaskFromStandby((StandbyTask) task3, map.remove(task3.id()), this.mainConsumer);
                }
                this.tasks.remove(task3.id());
                addNewTask(createActiveTaskFromStandby);
            } catch (RuntimeException e3) {
                this.log.error(String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", task3.id()), e3);
                linkedHashMap.putIfAbsent(task3.id(), e3);
                set3.add(task3);
            }
        }
        for (Task task4 : set3) {
            closeTaskDirty(task4);
            cleanUpTaskProducer(task4, linkedHashMap);
            this.tasks.remove(task4.id());
        }
    }

    private void cleanUpTaskProducer(Task task, Map<TaskId, RuntimeException> map) {
        if (task.isActive()) {
            try {
                this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
            } catch (RuntimeException e) {
                this.log.error(String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id()), e);
                map.putIfAbsent(task.id(), e);
            }
        }
    }

    private void updateInputPartitionsAndResume(Task task, Set<TopicPartition> set) {
        if (!task.inputPartitions().equals(set)) {
            this.log.debug("Update task {} inputPartitions: current {}, new {}", new Object[]{task, task.inputPartitions(), set});
            Iterator<TopicPartition> it = task.inputPartitions().iterator();
            while (it.hasNext()) {
                this.partitionToTask.remove(it.next());
            }
            Iterator<TopicPartition> it2 = set.iterator();
            while (it2.hasNext()) {
                this.partitionToTask.put(it2.next(), task);
            }
            task.update(set, this.builder.nodeToSourceTopics());
        }
        task.resume();
    }

    private void addNewTask(Task task) {
        if (this.tasks.put(task.id(), task) != null) {
            throw new IllegalStateException("Attempted to create a task that we already owned: " + task.id());
        }
        Iterator<TopicPartition> it = task.inputPartitions().iterator();
        while (it.hasNext()) {
            this.partitionToTask.put(it.next(), task);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean tryToCompleteRestoration() {
        boolean z = true;
        LinkedList<Task> linkedList = new LinkedList();
        for (Task task : this.tasks.values()) {
            try {
                task.initializeIfNeeded();
            } catch (LockException | TimeoutException e) {
                this.log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
                z = false;
            }
            if (task.isActive()) {
                linkedList.add(task);
            }
        }
        if (z && !linkedList.isEmpty()) {
            Set<TopicPartition> completedChangelogs = this.changelogReader.completedChangelogs();
            for (Task task2 : linkedList) {
                if (completedChangelogs.containsAll(task2.changelogPartitions())) {
                    try {
                        task2.completeRestoration();
                    } catch (TimeoutException e2) {
                        this.log.debug("Could not complete restoration for {} due to {}; will retry", task2.id(), e2);
                        z = false;
                    }
                } else {
                    z = false;
                }
            }
        }
        if (z) {
            this.mainConsumer.resume(this.mainConsumer.assignment());
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRevocation(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet(collection);
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        HashMap hashMap = new HashMap();
        AtomicReference atomicReference = new AtomicReference(null);
        for (Task task : activeTaskIterable()) {
            if (hashSet.containsAll(task.inputPartitions())) {
                hashSet2.add(task);
                hashSet.removeAll(task.inputPartitions());
            } else if (task.commitNeeded()) {
                hashSet3.add(task);
            }
        }
        if (!hashSet.isEmpty()) {
            this.log.warn("The following partitions {} are missing from the task partitions. It could potentially due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.", hashSet);
        }
        prepareCommitAndAddOffsetsToMap(hashSet2, hashMap);
        boolean z = !hashMap.isEmpty();
        if (z) {
            prepareCommitAndAddOffsetsToMap(hashSet3, hashMap);
        }
        try {
            commitOffsetsOrTransaction(hashMap);
        } catch (RuntimeException e) {
            this.log.error("Exception caught while committing those revoked tasks " + hashSet2, e);
            atomicReference.compareAndSet(null, e);
        }
        if (atomicReference.get() == null) {
            for (Task task2 : hashSet2) {
                try {
                    task2.postCommit(true);
                } catch (RuntimeException e2) {
                    this.log.error("Exception caught while post-committing task " + task2.id(), e2);
                    atomicReference.compareAndSet(null, e2);
                }
            }
            if (z) {
                for (Task task3 : hashSet3) {
                    try {
                        task3.postCommit(false);
                    } catch (RuntimeException e3) {
                        this.log.error("Exception caught while post-committing task " + task3.id(), e3);
                        atomicReference.compareAndSet(null, e3);
                    }
                }
            }
        }
        for (Task task4 : hashSet2) {
            try {
                task4.suspend();
            } catch (RuntimeException e4) {
                this.log.error("Caught the following exception while trying to suspend revoked task " + task4.id(), e4);
                atomicReference.compareAndSet(null, new StreamsException("Failed to suspend " + task4.id(), e4));
            }
        }
        if (atomicReference.get() != null) {
            throw ((RuntimeException) atomicReference.get());
        }
    }

    private void prepareCommitAndAddOffsetsToMap(Set<Task> set, Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> map) {
        for (Task task : set) {
            Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
            if (!prepareCommit.isEmpty()) {
                map.put(task.id(), prepareCommit);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleLostAll() {
        this.log.debug("Closing lost active tasks as zombies.");
        Iterator<Task> it = this.tasks.values().iterator();
        while (it.hasNext()) {
            Task next = it.next();
            if (next.isActive()) {
                closeTaskDirty(next);
                it.remove();
                try {
                    this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(next.id());
                } catch (RuntimeException e) {
                    this.log.warn("Error closing task producer for " + next.id() + " while handling lostAll", e);
                }
            }
        }
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) {
            this.activeTaskCreator.reInitializeThreadProducer();
        }
    }

    public Map<TaskId, Long> getTaskOffsetSums() {
        HashMap hashMap = new HashMap();
        for (TaskId taskId : Utils.union(HashSet::new, new Set[]{this.lockedTaskDirectories, this.tasks.keySet()})) {
            Task task = this.tasks.get(taskId);
            if (task == null || task.state() == Task.State.CREATED || task.state() == Task.State.CLOSED) {
                File checkpointFileFor = this.stateDirectory.checkpointFileFor(taskId);
                try {
                    if (checkpointFileFor.exists()) {
                        hashMap.put(taskId, Long.valueOf(sumOfChangelogOffsets(taskId, new OffsetCheckpoint(checkpointFileFor).read())));
                    }
                } catch (IOException e) {
                    this.log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", taskId), e);
                }
            } else {
                Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
                if (changelogOffsets.isEmpty()) {
                    this.log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", taskId);
                } else {
                    hashMap.put(taskId, Long.valueOf(sumOfChangelogOffsets(taskId, changelogOffsets)));
                }
            }
        }
        return hashMap;
    }

    private void tryToLockAllNonEmptyTaskDirectories() {
        this.lockedTaskDirectories.clear();
        for (File file : this.stateDirectory.listNonEmptyTaskDirectories()) {
            try {
                TaskId parse = TaskId.parse(file.getName());
                try {
                    if (this.stateDirectory.lock(parse)) {
                        this.lockedTaskDirectories.add(parse);
                        if (!this.tasks.containsKey(parse)) {
                            this.log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", parse);
                        }
                    }
                } catch (IOException e) {
                    this.log.warn(String.format("Exception caught while attempting to lock task %s:", parse), e);
                }
            } catch (TaskIdFormatException e2) {
            }
        }
    }

    private void releaseLockedUnassignedTaskDirectories() {
        AtomicReference atomicReference = new AtomicReference(null);
        Iterator<TaskId> it = this.lockedTaskDirectories.iterator();
        while (it.hasNext()) {
            TaskId next = it.next();
            if (!this.tasks.containsKey(next)) {
                try {
                    this.stateDirectory.unlock(next);
                    it.remove();
                } catch (IOException e) {
                    this.log.error(String.format("Caught the following exception while trying to unlock task %s", next), e);
                    atomicReference.compareAndSet(null, new StreamsException(String.format("Failed to unlock task directory %s", next), e));
                }
            }
        }
        RuntimeException runtimeException = (RuntimeException) atomicReference.get();
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    private long sumOfChangelogOffsets(TaskId taskId, Map<TopicPartition, Long> map) {
        long j = 0;
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            long longValue = entry.getValue().longValue();
            if (longValue == -2) {
                return -2L;
            }
            if (longValue != -4) {
                if (longValue < 0) {
                    throw new IllegalStateException("Expected not to get a sentinel offset, but got: " + entry);
                }
                j += longValue;
                if (j < 0) {
                    this.log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", taskId);
                    return Long.MAX_VALUE;
                }
            }
        }
        return j;
    }

    private void closeTaskDirty(Task task) {
        try {
            task.prepareCommit();
        } catch (RuntimeException e) {
            this.log.error("Error flushing caches of dirty task {} ", task.id(), e);
        }
        try {
            task.suspend();
        } catch (RuntimeException e2) {
            this.log.error("Error suspending dirty task {} ", task.id(), e2);
        }
        cleanupTask(task);
        task.closeDirty();
    }

    private void completeTaskCloseClean(Task task) {
        cleanupTask(task);
        task.closeClean();
    }

    private void cleanupTask(Task task) {
        Iterator<TopicPartition> it = task.inputPartitions().iterator();
        while (it.hasNext()) {
            this.partitionToTask.remove(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown(boolean z) {
        AtomicReference<RuntimeException> atomicReference = new AtomicReference<>(null);
        HashSet hashSet = new HashSet();
        hashSet.addAll(tryCloseCleanAllActiveTasks(z, atomicReference));
        hashSet.addAll(tryCloseCleanAllStandbyTasks(z, atomicReference));
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            closeTaskDirty((Task) it.next());
        }
        for (Task task : activeTaskIterable()) {
            executeAndMaybeSwallow(z, () -> {
                this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
            }, (java.util.function.Consumer<RuntimeException>) runtimeException -> {
                atomicReference.compareAndSet(null, runtimeException);
            }, (java.util.function.Consumer<RuntimeException>) runtimeException2 -> {
                this.log.warn("Ignoring an exception while closing task " + task.id() + " producer.", runtimeException2);
            });
        }
        ActiveTaskCreator activeTaskCreator = this.activeTaskCreator;
        activeTaskCreator.getClass();
        executeAndMaybeSwallow(z, activeTaskCreator::closeThreadProducerIfNeeded, (java.util.function.Consumer<RuntimeException>) runtimeException3 -> {
            atomicReference.compareAndSet(null, runtimeException3);
        }, (java.util.function.Consumer<RuntimeException>) runtimeException4 -> {
            this.log.warn("Ignoring an exception while closing thread producer.", runtimeException4);
        });
        this.tasks.clear();
        executeAndMaybeSwallow(z, this::releaseLockedUnassignedTaskDirectories, (java.util.function.Consumer<RuntimeException>) runtimeException5 -> {
            atomicReference.compareAndSet(null, runtimeException5);
        }, (java.util.function.Consumer<RuntimeException>) runtimeException6 -> {
            this.log.warn("Ignoring an exception while unlocking remaining task directories.", runtimeException6);
        });
        RuntimeException runtimeException7 = atomicReference.get();
        if (runtimeException7 != null) {
            throw new RuntimeException("Unexpected exception while closing task", runtimeException7);
        }
    }

    private Collection<Task> tryCloseCleanAllActiveTasks(boolean z, AtomicReference<RuntimeException> atomicReference) {
        if (!z) {
            return activeTaskIterable();
        }
        Comparator comparing = Comparator.comparing((v0) -> {
            return v0.id();
        });
        TreeSet treeSet = new TreeSet(comparing);
        TreeSet treeSet2 = new TreeSet(comparing);
        TreeSet<Task> treeSet3 = new TreeSet(comparing);
        HashMap hashMap = new HashMap();
        for (Task task : activeTaskIterable()) {
            try {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                treeSet.add(task);
                if (!prepareCommit.isEmpty()) {
                    hashMap.put(task.id(), prepareCommit);
                }
                treeSet3.add(task);
            } catch (RuntimeException e) {
                atomicReference.compareAndSet(null, e);
                treeSet2.add(task);
            } catch (TaskMigratedException e2) {
                treeSet2.add(task);
            }
        }
        if (treeSet2.isEmpty()) {
            try {
                commitOffsetsOrTransaction(hashMap);
                for (Task task2 : activeTaskIterable()) {
                    try {
                        task2.postCommit(true);
                    } catch (RuntimeException e3) {
                        this.log.error("Exception caught while post-committing task " + task2.id(), e3);
                        atomicReference.compareAndSet(null, e3);
                        treeSet2.add(task2);
                        treeSet3.remove(task2);
                    }
                }
            } catch (RuntimeException e4) {
                this.log.error("Exception caught while committing tasks during shutdown", e4);
                atomicReference.compareAndSet(null, e4);
                treeSet3.removeAll(treeSet);
                treeSet2.addAll(treeSet);
            }
        } else {
            treeSet3.removeAll(treeSet);
            treeSet2.addAll(treeSet);
        }
        for (Task task3 : treeSet3) {
            try {
                task3.suspend();
                completeTaskCloseClean(task3);
            } catch (RuntimeException e5) {
                this.log.error("Exception caught while clean-closing task " + task3.id(), e5);
                atomicReference.compareAndSet(null, e5);
                treeSet2.add(task3);
            }
        }
        return treeSet2;
    }

    private Collection<Task> tryCloseCleanAllStandbyTasks(boolean z, AtomicReference<RuntimeException> atomicReference) {
        if (!z) {
            return standbyTaskIterable();
        }
        HashSet hashSet = new HashSet();
        for (Task task : standbyTaskIterable()) {
            try {
                task.prepareCommit();
                task.postCommit(true);
                task.suspend();
                completeTaskCloseClean(task);
            } catch (RuntimeException e) {
                atomicReference.compareAndSet(null, e);
                hashSet.add(task);
            } catch (TaskMigratedException e2) {
                hashSet.add(task);
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> activeTaskIds() {
        return (Set) activeTaskStream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> standbyTaskIds() {
        return (Set) standbyTaskStream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> tasks() {
        return this.tasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> activeTaskMap() {
        return (Map) activeTaskStream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, task -> {
            return task;
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<Task> activeTaskIterable() {
        return (List) activeTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> activeTaskStream() {
        return this.tasks.values().stream().filter((v0) -> {
            return v0.isActive();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> standbyTaskMap() {
        return (Map) standbyTaskStream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, task -> {
            return task;
        }));
    }

    private List<Task> standbyTaskIterable() {
        return (List) standbyTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> standbyTaskStream() {
        return this.tasks.values().stream().filter(task -> {
            return !task.isActive();
        });
    }

    int commitAll() {
        return commit(this.tasks.values());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addRecordsToTasks(ConsumerRecords<byte[], byte[]> consumerRecords) {
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            Task task = this.partitionToTask.get(topicPartition);
            if (task == null) {
                this.log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", topicPartition, toString(">"));
                throw new NullPointerException("Task was unexpectedly missing for partition " + topicPartition);
            }
            task.addRecords(topicPartition, consumerRecords.records(topicPartition));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int commit(Collection<Task> collection) {
        if (this.rebalanceInProgress) {
            return -1;
        }
        int i = 0;
        HashMap hashMap = new HashMap();
        for (Task task : collection) {
            if (task.commitNeeded()) {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                if (task.isActive()) {
                    hashMap.put(task.id(), prepareCommit);
                }
            }
        }
        commitOffsetsOrTransaction(hashMap);
        for (Task task2 : collection) {
            if (task2.commitNeeded()) {
                i++;
                task2.postCommit(false);
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommitActiveTasksPerUserRequested() {
        if (this.rebalanceInProgress) {
            return -1;
        }
        for (Task task : activeTaskIterable()) {
            if (task.commitRequested() && task.commitNeeded()) {
                return commit(activeTaskIterable());
            }
        }
        return 0;
    }

    private void commitOffsetsOrTransaction(Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> map) {
        this.log.info("Committing task offsets {}", map);
        if (map.isEmpty()) {
            return;
        }
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            for (Map.Entry<TaskId, Map<TopicPartition, OffsetAndMetadata>> entry : map.entrySet()) {
                this.activeTaskCreator.streamsProducerForTask(entry.getKey()).commitTransaction(entry.getValue(), this.mainConsumer.groupMetadata());
            }
            return;
        }
        Map<TopicPartition, OffsetAndMetadata> map2 = (Map) map.values().stream().flatMap(map3 -> {
            return map3.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) {
            this.activeTaskCreator.threadProducer().commitTransaction(map2, this.mainConsumer.groupMetadata());
            return;
        }
        try {
            this.mainConsumer.commitSync(map2);
        } catch (CommitFailedException e) {
            throw new TaskMigratedException("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group", e);
        } catch (TimeoutException e2) {
            throw new StreamsException("Timed out while committing offsets via consumer", e2);
        } catch (KafkaException e3) {
            throw new StreamsException("Error encountered committing offsets via consumer", e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process(int i, Time time) {
        int i2;
        int i3 = 0;
        long milliseconds = time.milliseconds();
        for (Task task : activeTaskIterable()) {
            long j = milliseconds;
            while (i2 < i) {
                try {
                    i2 = task.process(milliseconds) ? i2 + 1 : 0;
                } catch (RuntimeException e) {
                    this.log.error("Failed to process stream task {} due to the following error:", task.id(), e);
                    throw e;
                } catch (TaskMigratedException e2) {
                    this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", task.id());
                    throw e2;
                }
            }
            milliseconds = time.milliseconds();
            i3 += i2;
            task.recordProcessBatchTime(milliseconds - j);
        }
        return i3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordTaskProcessRatio(long j, long j2) {
        Iterator<Task> it = activeTaskIterable().iterator();
        while (it.hasNext()) {
            it.next().recordProcessTimeRatioAndBufferSize(j, j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        int i = 0;
        for (Task task : activeTaskIterable()) {
            try {
                if (task.maybePunctuateStreamTime()) {
                    i++;
                }
                if (task.maybePunctuateSystemTime()) {
                    i++;
                }
            } catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", task.id());
                throw e;
            } catch (KafkaException e2) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e2);
                throw e2;
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybePurgeCommittedRecords() {
        if (this.deleteRecordsResult == null || this.deleteRecordsResult.all().isDone()) {
            if (this.deleteRecordsResult != null && this.deleteRecordsResult.all().isCompletedExceptionally()) {
                this.log.debug("Previous delete-records request has failed: {}. Try sending the new request now", this.deleteRecordsResult.lowWatermarks());
            }
            HashMap hashMap = new HashMap();
            Iterator<Task> it = activeTaskIterable().iterator();
            while (it.hasNext()) {
                for (Map.Entry<TopicPartition, Long> entry : it.next().purgeableOffsets().entrySet()) {
                    hashMap.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue().longValue()));
                }
            }
            if (hashMap.isEmpty()) {
                return;
            }
            this.deleteRecordsResult = this.adminClient.deleteRecords(hashMap);
            this.log.trace("Sent delete-records request: {}", hashMap);
        }
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("TaskManager\n");
        sb.append(str).append("\tMetadataState:\n");
        sb.append(str).append("\tTasks:\n");
        for (Task task : this.tasks.values()) {
            sb.append(str).append("\t\t").append(task.id()).append(" ").append(task.state()).append(" ").append(task.getClass().getSimpleName()).append('(').append(task.isActive() ? "active" : "standby").append(')');
        }
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<MetricName, Metric> producerMetrics() {
        return this.activeTaskCreator.producerMetrics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> producerClientIds() {
        return this.activeTaskCreator.producerClientIds();
    }

    Set<TaskId> lockedTaskDirectories() {
        return Collections.unmodifiableSet(this.lockedTaskDirectories);
    }

    public static void executeAndMaybeSwallow(boolean z, Runnable runnable, java.util.function.Consumer<RuntimeException> consumer, java.util.function.Consumer<RuntimeException> consumer2) {
        try {
            runnable.run();
        } catch (RuntimeException e) {
            if (z) {
                consumer.accept(e);
            } else {
                consumer2.accept(e);
            }
        }
    }

    public static void executeAndMaybeSwallow(boolean z, Runnable runnable, String str, Logger logger) {
        executeAndMaybeSwallow(z, runnable, (java.util.function.Consumer<RuntimeException>) runtimeException -> {
            throw runtimeException;
        }, (java.util.function.Consumer<RuntimeException>) runtimeException2 -> {
            logger.debug("Ignoring error in unclean {}", str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean needsInitializationOrRestoration() {
        return tasks().values().stream().anyMatch((v0) -> {
            return v0.needsInitializationOrRestoration();
        });
    }

    public void setPartitionResetter(java.util.function.Consumer<Set<TopicPartition>> consumer) {
        this.resetter = consumer;
    }
}
