package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.IndicativeSentencesGeneration;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.class */
public class MembershipManagerImpl implements MembershipManager, ClusterResourceListener {
    static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();
    static final Utils.TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new Utils.TopicIdPartitionComparator();
    private final String groupId;
    private final Optional<String> groupInstanceId;
    private final int rebalanceTimeoutMs;
    private final Optional<String> serverAssignor;
    private final SubscriptionState subscriptions;
    private final ConsumerMetadata metadata;
    private final Logger log;
    private final CommitRequestManager commitRequestManager;
    private boolean reconciliationInProgress;
    private int memberEpochOnReconciliationStart;
    private boolean isRegisteredForMetadataUpdates;
    private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Time time;
    private String memberId = "";
    private int memberEpoch = 0;
    private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
    private MemberState state = MemberState.UNSUBSCRIBED;
    private final Map<Uuid, String> assignedTopicNamesCache = new HashMap();
    private final Map<Uuid, SortedSet<Integer>> assignmentUnresolved = new HashMap();
    private final SortedSet<TopicIdPartition> assignmentReadyToReconcile = new TreeSet(TOPIC_ID_PARTITION_COMPARATOR);
    private Map<Uuid, SortedSet<Integer>> currentAssignment = new HashMap();
    private final List<MemberStateListener> stateUpdatesListeners = new ArrayList();

    public MembershipManagerImpl(String str, Optional<String> optional, int i, Optional<String> optional2, SubscriptionState subscriptionState, CommitRequestManager commitRequestManager, ConsumerMetadata consumerMetadata, LogContext logContext, Optional<ClientTelemetryReporter> optional3, BackgroundEventHandler backgroundEventHandler, Time time) {
        this.groupId = str;
        this.serverAssignor = optional2;
        this.groupInstanceId = optional;
        this.subscriptions = subscriptionState;
        this.commitRequestManager = commitRequestManager;
        this.metadata = consumerMetadata;
        this.log = logContext.logger(MembershipManagerImpl.class);
        this.clientTelemetryReporter = optional3;
        this.rebalanceTimeoutMs = i;
        this.backgroundEventHandler = backgroundEventHandler;
        this.time = time;
    }

    private void transitionTo(MemberState memberState) {
        if (!this.state.equals(memberState) && !memberState.getPreviousValidStates().contains(this.state)) {
            throw new IllegalStateException(String.format("Invalid state transition from %s to %s", this.state, memberState));
        }
        this.log.trace("Member {} with epoch {} transitioned from {} to {}.", this.memberId, Integer.valueOf(this.memberEpoch), this.state, memberState);
        this.state = memberState;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public String groupId() {
        return this.groupId;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public Optional<String> groupInstanceId() {
        return this.groupInstanceId;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public String memberId() {
        return this.memberId;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public int memberEpoch() {
        return this.memberEpoch;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public boolean isStaled() {
        return this.state == MemberState.STALE;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData consumerGroupHeartbeatResponseData) {
        if (consumerGroupHeartbeatResponseData.errorCode() != Errors.NONE.code()) {
            throw new IllegalArgumentException(String.format("Unexpected error in Heartbeat response. Expected no error, but received: %s", Errors.forCode(consumerGroupHeartbeatResponseData.errorCode())));
        }
        if (consumerGroupHeartbeatResponseData.memberId() != null && !consumerGroupHeartbeatResponseData.memberId().equals(this.memberId)) {
            this.clientTelemetryReporter.ifPresent(clientTelemetryReporter -> {
                clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, consumerGroupHeartbeatResponseData.memberId()));
            });
        }
        this.memberId = consumerGroupHeartbeatResponseData.memberId();
        updateMemberEpoch(consumerGroupHeartbeatResponseData.memberEpoch());
        ConsumerGroupHeartbeatResponseData.Assignment assignment = consumerGroupHeartbeatResponseData.assignment();
        if (assignment == null) {
            if (allPendingAssignmentsReconciled()) {
                transitionTo(MemberState.STABLE);
            }
        } else if (this.state.canHandleNewAssignment()) {
            processAssignmentReceived(assignment);
        } else {
            this.log.debug("Ignoring new assignment {} received from server because member is in {} state.", assignment, this.state);
        }
    }

    private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        replaceUnresolvedAssignmentWithNewAssignment(assignment);
        if (!this.assignmentUnresolved.equals(this.currentAssignment)) {
            transitionTo(MemberState.RECONCILING);
            this.assignmentReadyToReconcile.clear();
            resolveMetadataForUnresolvedAssignment();
            reconcile();
            return;
        }
        this.log.debug("Target assignment {} received from the broker is equals to the member current assignment {}. Nothing to reconcile.", this.assignmentUnresolved, this.currentAssignment);
        if (this.state == MemberState.RECONCILING || this.state == MemberState.JOINING) {
            transitionTo(MemberState.STABLE);
        }
    }

    private void replaceUnresolvedAssignmentWithNewAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        this.assignmentUnresolved.clear();
        assignment.topicPartitions().forEach(topicPartitions -> {
            this.assignmentUnresolved.put(topicPartitions.topicId(), new TreeSet(topicPartitions.partitions()));
        });
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void transitionToFenced() {
        if (this.state == MemberState.PREPARE_LEAVING) {
            this.log.debug("Member {} with epoch {} got fenced but it is already preparing to leave the group, so it will stop sending heartbeat and won't attempt to rejoin.", this.memberId, Integer.valueOf(this.memberEpoch));
            transitionTo(MemberState.UNSUBSCRIBED);
        } else {
            if (this.state == MemberState.LEAVING) {
                this.log.debug("Member {} with epoch {} got fenced but it is already leaving the group with state {}, so it won't attempt to rejoin.", this.memberId, Integer.valueOf(this.memberEpoch), this.state);
                return;
            }
            if (this.state == MemberState.UNSUBSCRIBED) {
                this.log.debug("Member {} with epoch {} got fenced but it already left the group, so it won't attempt to rejoin.", this.memberId, Integer.valueOf(this.memberEpoch));
                return;
            }
            transitionTo(MemberState.FENCED);
            resetEpoch();
            this.log.debug("Member {} with epoch {} transitioned to {} state. It will release its assignment and rejoin the group.", this.memberId, Integer.valueOf(this.memberEpoch), MemberState.FENCED);
            invokeOnPartitionsLostCallback(this.subscriptions.assignedPartitions()).whenComplete((r6, th) -> {
                if (th != null) {
                    this.log.error("onPartitionsLost callback invocation failed while releasing assignment after member got fenced. Member will rejoin the group anyways.", th);
                }
                updateSubscription(new TreeSet(TOPIC_ID_PARTITION_COMPARATOR), true);
                transitionToJoining();
            });
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void transitionToFatal() {
        MemberState memberState = this.state;
        transitionTo(MemberState.FATAL);
        this.log.error("Member {} with epoch {} transitioned to {} state", this.memberId, Integer.valueOf(this.memberEpoch), MemberState.FATAL);
        notifyEpochChange(Optional.empty(), Optional.empty());
        if (memberState == MemberState.UNSUBSCRIBED) {
            this.log.debug("Member {} with epoch {} got fatal error from the broker but it already left the group, so onPartitionsLost callback won't be triggered.", this.memberId, Integer.valueOf(this.memberEpoch));
        } else {
            invokeOnPartitionsLostCallback(this.subscriptions.assignedPartitions()).whenComplete((r6, th) -> {
                if (th != null) {
                    this.log.error("onPartitionsLost callback invocation failed while releasing assignmentafter member failed with fatal error.", th);
                }
                updateSubscription(new TreeSet(TOPIC_ID_PARTITION_COMPARATOR), true);
            });
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void onSubscriptionUpdated() {
        if (this.state == MemberState.UNSUBSCRIBED) {
            transitionToJoining();
        }
    }

    private void updateSubscription(SortedSet<TopicIdPartition> sortedSet, boolean z) {
        this.subscriptions.assignFromSubscribed(toTopicPartitionSet(sortedSet));
        updateCurrentAssignment(sortedSet);
        if (z) {
            clearPendingAssignmentsAndLocalNamesCache();
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void transitionToJoining() {
        if (this.state == MemberState.FATAL) {
            this.log.warn("No action taken to join the group with the updated subscription because the member is in FATAL state");
            return;
        }
        resetEpoch();
        transitionTo(MemberState.JOINING);
        clearPendingAssignmentsAndLocalNamesCache();
        registerForMetadataUpdates();
    }

    private void registerForMetadataUpdates() {
        if (this.isRegisteredForMetadataUpdates) {
            return;
        }
        this.metadata.addClusterUpdateListener(this);
        this.isRegisteredForMetadataUpdates = true;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public CompletableFuture<Void> leaveGroup() {
        if (this.state == MemberState.UNSUBSCRIBED || this.state == MemberState.FATAL) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.state == MemberState.PREPARE_LEAVING || this.state == MemberState.LEAVING) {
            return this.leaveGroupInProgress.get();
        }
        transitionTo(MemberState.PREPARE_LEAVING);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.leaveGroupInProgress = Optional.of(completableFuture);
        invokeOnPartitionsRevokedOrLostToReleaseAssignment().whenComplete((r6, th) -> {
            updateSubscription(new TreeSet(TOPIC_ID_PARTITION_COMPARATOR), true);
            transitionToSendingLeaveGroup();
        });
        return completableFuture;
    }

    private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
        TreeSet treeSet = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet.addAll(this.subscriptions.assignedPartitions());
        return treeSet.isEmpty() ? CompletableFuture.completedFuture(null) : this.memberEpoch > 0 ? revokePartitions(treeSet) : invokeOnPartitionsLostCallback(treeSet);
    }

    void transitionToSendingLeaveGroup() {
        if (this.state == MemberState.FATAL) {
            this.log.warn("Member {} with epoch {} won't send leave group request because it is in FATAL state", this.memberId, Integer.valueOf(this.memberEpoch));
        } else {
            if (this.state == MemberState.UNSUBSCRIBED) {
                this.log.warn("Member {} won't send leave group request because it is already out of the group.", this.memberId);
                return;
            }
            updateMemberEpoch(this.groupInstanceId.isPresent() ? -2 : -1);
            this.currentAssignment = new HashMap();
            transitionTo(MemberState.LEAVING);
        }
    }

    private void notifyEpochChange(Optional<Integer> optional, Optional<String> optional2) {
        this.stateUpdatesListeners.forEach(memberStateListener -> {
            memberStateListener.onMemberEpochUpdated(optional, optional2);
        });
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public boolean shouldHeartbeatNow() {
        MemberState state = state();
        return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void onHeartbeatRequestSent() {
        MemberState state = state();
        if (isStaled()) {
            this.log.debug("Member {} is staled and is therefore leaving the group.  It will rejoin upon the next poll.", Integer.valueOf(this.memberEpoch));
            transitionToJoining();
        } else if (state != MemberState.ACKNOWLEDGING) {
            if (state == MemberState.LEAVING) {
                transitionToUnsubscribed();
            }
        } else if (allPendingAssignmentsReconciled()) {
            transitionTo(MemberState.STABLE);
        } else {
            this.log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent to ack a previous reconciliation. New assignments are ready to be reconciled.", this.memberId, Integer.valueOf(this.memberEpoch), MemberState.RECONCILING);
            transitionTo(MemberState.RECONCILING);
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void onHeartbeatRequestSkipped() {
        if (this.state == MemberState.LEAVING) {
            this.log.debug("Heartbeat for leaving group could not be sent. Member {} with epoch {} will transition to {}.", this.memberId, Integer.valueOf(this.memberEpoch), MemberState.UNSUBSCRIBED);
            transitionToUnsubscribed();
        }
    }

    private void transitionToUnsubscribed() {
        transitionTo(MemberState.UNSUBSCRIBED);
        this.leaveGroupInProgress.get().complete(null);
        this.leaveGroupInProgress = Optional.empty();
    }

    private boolean allPendingAssignmentsReconciled() {
        return this.assignmentUnresolved.isEmpty() && this.assignmentReadyToReconcile.isEmpty();
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public boolean shouldSkipHeartbeat() {
        MemberState state = state();
        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void transitionToStale() {
        this.memberEpoch = -1;
        updateSubscription(new TreeSet(TOPIC_ID_PARTITION_COMPARATOR), true);
        transitionTo(MemberState.STALE);
    }

    boolean reconcile() {
        if (this.reconciliationInProgress) {
            this.log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + this.assignmentReadyToReconcile + " will be handled in the next reconciliation loop.");
            return false;
        }
        TreeSet treeSet = new TreeSet(TOPIC_ID_PARTITION_COMPARATOR);
        treeSet.addAll(this.assignmentReadyToReconcile);
        TreeSet treeSet2 = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet2.addAll(this.subscriptions.assignedPartitions());
        SortedSet<TopicPartition> topicPartitionSet = toTopicPartitionSet(treeSet);
        if (topicPartitionSet.equals(treeSet2)) {
            this.log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} is equal to the member current assignment {}.", topicPartitionSet, treeSet2);
            return false;
        }
        markReconciliationInProgress();
        TreeSet treeSet3 = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet3.addAll(topicPartitionSet);
        treeSet3.removeAll(treeSet2);
        TreeSet treeSet4 = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet4.addAll(treeSet2);
        treeSet4.removeAll(topicPartitionSet);
        this.log.info("Updating assignment with\n\tAssigned partitions:                       {}\n\tCurrent owned partitions:                  {}\n\tAdded partitions (assigned - owned):       {}\n\tRevoked partitions (owned - assigned):     {}\n", treeSet, treeSet2, treeSet3, treeSet4);
        this.commitRequestManager.maybeAutoCommitAllConsumedNow(Optional.of(Long.valueOf(getExpirationTimeForTimeout(this.rebalanceTimeoutMs))), true).whenComplete((r9, th) -> {
            if (th != null) {
                this.log.error("Auto-commit request before reconciling new assignment failed. Will proceed with the reconciliation anyway.", th);
            } else {
                this.log.debug("Auto-commit before reconciling new assignment completed successfully.");
            }
            revokeAndAssign(treeSet, treeSet4, treeSet3);
        });
        return true;
    }

    long getExpirationTimeForTimeout(long j) {
        long milliseconds = this.time.milliseconds() + j;
        if (milliseconds < 0) {
            return Long.MAX_VALUE;
        }
        return milliseconds;
    }

    private void revokeAndAssign(SortedSet<TopicIdPartition> sortedSet, SortedSet<TopicPartition> sortedSet2, SortedSet<TopicPartition> sortedSet3) {
        (!sortedSet2.isEmpty() ? revokePartitions(sortedSet2) : CompletableFuture.completedFuture(null)).thenCompose(r10 -> {
            boolean z = this.memberEpochOnReconciliationStart != this.memberEpoch;
            if (this.state == MemberState.RECONCILING && !z) {
                this.commitRequestManager.resetAutoCommitTimer();
                return assignPartitions(sortedSet, sortedSet3);
            }
            this.log.debug("Revocation callback completed but the member already transitioned out of the reconciling state for epoch {} into {} state with epoch {}. Interrupting reconciliation as it's not relevant anymore,", Integer.valueOf(this.memberEpochOnReconciliationStart), this.state, Integer.valueOf(this.memberEpoch));
            String interruptedReconciliationErrorMessage = interruptedReconciliationErrorMessage();
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new KafkaException("Interrupting reconciliation after revocation. " + interruptedReconciliationErrorMessage));
            return completableFuture;
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r6, th) -> {
            markReconciliationCompleted();
            if (th != null) {
                this.log.error("Reconciliation failed.", th);
                return;
            }
            boolean z = this.memberEpochOnReconciliationStart != this.memberEpoch;
            if (this.state != MemberState.RECONCILING || z) {
                this.log.error("Interrupting reconciliation after partitions assigned callback completed. " + interruptedReconciliationErrorMessage());
            } else {
                transitionTo(MemberState.ACKNOWLEDGING);
                this.assignmentReadyToReconcile.removeAll(sortedSet);
            }
        });
    }

    void updateCurrentAssignment(Set<TopicIdPartition> set) {
        this.currentAssignment.clear();
        set.forEach(topicIdPartition -> {
            this.currentAssignment.computeIfAbsent(topicIdPartition.topicId(), uuid -> {
                return new TreeSet();
            }).add(Integer.valueOf(topicIdPartition.partition()));
        });
    }

    private SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition> sortedSet) {
        TreeSet treeSet = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        sortedSet.forEach(topicIdPartition -> {
            treeSet.add(topicIdPartition.topicPartition());
        });
        return treeSet;
    }

    private String interruptedReconciliationErrorMessage() {
        return this.state != MemberState.RECONCILING ? "The member already transitioned out of the reconciling state into " + this.state : "The member has re-joined the group.";
    }

    void markReconciliationInProgress() {
        this.reconciliationInProgress = true;
        this.memberEpochOnReconciliationStart = this.memberEpoch;
    }

    void markReconciliationCompleted() {
        this.reconciliationInProgress = false;
    }

    private void resolveMetadataForUnresolvedAssignment() {
        this.assignmentReadyToReconcile.clear();
        Iterator<Map.Entry<Uuid, SortedSet<Integer>>> it = this.assignmentUnresolved.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Uuid, SortedSet<Integer>> next = it.next();
            Uuid key = next.getKey();
            SortedSet<Integer> value = next.getValue();
            findTopicNameInGlobalOrLocalCache(key).ifPresent(str -> {
                addToAssignmentReadyToReconcile(key, str, value);
                it.remove();
            });
        }
        if (this.assignmentUnresolved.isEmpty()) {
            return;
        }
        this.log.debug("Topic Ids {} received in target assignment were not found in metadata and are not currently assigned. Requesting a metadata update now to resolve topic names.", this.assignmentUnresolved.keySet());
        this.metadata.requestUpdate(true);
    }

    private Optional<String> findTopicNameInGlobalOrLocalCache(Uuid uuid) {
        String orDefault = this.metadata.topicNames().getOrDefault(uuid, null);
        if (orDefault == null) {
            return Optional.ofNullable(this.assignedTopicNamesCache.getOrDefault(uuid, null));
        }
        this.assignedTopicNamesCache.put(uuid, orDefault);
        return Optional.of(orDefault);
    }

    private void addToAssignmentReadyToReconcile(Uuid uuid, String str, SortedSet<Integer> sortedSet) {
        sortedSet.forEach(num -> {
            this.assignmentReadyToReconcile.add(new TopicIdPartition(uuid, new TopicPartition(str, num.intValue())));
        });
    }

    CompletableFuture<Void> revokePartitions(Set<TopicPartition> set) {
        this.log.info("Revoking previously assigned partitions {}", org.apache.kafka.common.utils.Utils.join(set, IndicativeSentencesGeneration.DEFAULT_SEPARATOR));
        logPausedPartitionsBeingRevoked(set);
        markPendingRevocationToPauseFetching(set);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.state != MemberState.FATAL) {
            invokeOnPartitionsRevokedCallback(set).whenComplete((r8, th) -> {
                if (th == null) {
                    completableFuture.complete(null);
                } else {
                    this.log.error("onPartitionsRevoked callback invocation failed for partitions {}", set, th);
                    completableFuture.completeExceptionally(th);
                }
            });
            return completableFuture;
        }
        String format = String.format("Member %s with epoch %s received a fatal error while waiting for a revocation commit to complete. Will abort revocation without triggering user callback.", this.memberId, Integer.valueOf(this.memberEpoch));
        this.log.debug(format);
        completableFuture.completeExceptionally(new KafkaException(format));
        return completableFuture;
    }

    private CompletableFuture<Void> assignPartitions(SortedSet<TopicIdPartition> sortedSet, SortedSet<TopicPartition> sortedSet2) {
        updateSubscription(sortedSet, false);
        CompletableFuture<Void> invokeOnPartitionsAssignedCallback = invokeOnPartitionsAssignedCallback(sortedSet2);
        this.assignedTopicNamesCache.values().retainAll((Set) sortedSet.stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet()));
        return invokeOnPartitionsAssignedCallback;
    }

    private void markPendingRevocationToPauseFetching(Set<TopicPartition> set) {
        this.log.debug("Marking partitions pending for revocation: {}", set);
        this.subscriptions.markPendingRevocation(set);
    }

    private CompletableFuture<Void> invokeOnPartitionsRevokedCallback(Set<TopicPartition> set) {
        return (set.isEmpty() || !this.subscriptions.rebalanceListener().isPresent()) ? CompletableFuture.completedFuture(null) : enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, set);
    }

    private CompletableFuture<Void> invokeOnPartitionsAssignedCallback(Set<TopicPartition> set) {
        return this.subscriptions.rebalanceListener().isPresent() ? enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, set) : CompletableFuture.completedFuture(null);
    }

    CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartition> set) {
        return (set.isEmpty() || !this.subscriptions.rebalanceListener().isPresent()) ? CompletableFuture.completedFuture(null) : enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST, set);
    }

    private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName consumerRebalanceListenerMethodName, Set<TopicPartition> set) {
        TreeSet treeSet = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet.addAll(set);
        ConsumerRebalanceListenerCallbackNeededEvent consumerRebalanceListenerCallbackNeededEvent = new ConsumerRebalanceListenerCallbackNeededEvent(consumerRebalanceListenerMethodName, treeSet);
        this.backgroundEventHandler.add(consumerRebalanceListenerCallbackNeededEvent);
        this.log.debug("The event to trigger the {} method execution was enqueued successfully", consumerRebalanceListenerMethodName.fullyQualifiedMethodName());
        return consumerRebalanceListenerCallbackNeededEvent.future();
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent consumerRebalanceListenerCallbackCompletedEvent) {
        ConsumerRebalanceListenerMethodName methodName = consumerRebalanceListenerCallbackCompletedEvent.methodName();
        Optional<KafkaException> error = consumerRebalanceListenerCallbackCompletedEvent.error();
        CompletableFuture<Void> future = consumerRebalanceListenerCallbackCompletedEvent.future();
        if (!error.isPresent()) {
            this.log.debug("The {} method completed successfully; signaling to continue to the next phase of rebalance", methodName.fullyQualifiedMethodName());
            future.complete(null);
        } else {
            KafkaException kafkaException = error.get();
            this.log.warn("The {} method completed with an error ({}); signaling to continue to the next phase of rebalance", methodName.fullyQualifiedMethodName(), kafkaException.getMessage());
            future.completeExceptionally(kafkaException);
        }
    }

    private void logPausedPartitionsBeingRevoked(Set<TopicPartition> set) {
        Set<TopicPartition> pausedPartitions = this.subscriptions.pausedPartitions();
        pausedPartitions.retainAll(set);
        if (pausedPartitions.isEmpty()) {
            return;
        }
        this.log.info("The pause flag in partitions [{}] will be removed due to revocation.", org.apache.kafka.common.utils.Utils.join(pausedPartitions, IndicativeSentencesGeneration.DEFAULT_SEPARATOR));
    }

    private void clearPendingAssignmentsAndLocalNamesCache() {
        this.assignmentUnresolved.clear();
        this.assignmentReadyToReconcile.clear();
        this.assignedTopicNamesCache.clear();
    }

    private void resetEpoch() {
        updateMemberEpoch(0);
    }

    private void updateMemberEpoch(int i) {
        boolean z = this.memberEpoch != i;
        this.memberEpoch = i;
        if (z) {
            if (this.memberEpoch > 0) {
                notifyEpochChange(Optional.of(Integer.valueOf(this.memberEpoch)), Optional.ofNullable(this.memberId));
            } else {
                notifyEpochChange(Optional.empty(), Optional.empty());
            }
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public MemberState state() {
        return this.state;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public Optional<String> serverAssignor() {
        return this.serverAssignor;
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public Map<Uuid, SortedSet<Integer>> currentAssignment() {
        return this.currentAssignment;
    }

    Set<Uuid> topicsWaitingForMetadata() {
        return Collections.unmodifiableSet(this.assignmentUnresolved.keySet());
    }

    Set<TopicIdPartition> assignmentReadyToReconcile() {
        return Collections.unmodifiableSet(this.assignmentReadyToReconcile);
    }

    boolean reconciliationInProgress() {
        return this.reconciliationInProgress;
    }

    @Override // org.apache.kafka.common.ClusterResourceListener
    public void onUpdate(ClusterResource clusterResource) {
        resolveMetadataForUnresolvedAssignment();
        if (this.assignmentReadyToReconcile.isEmpty()) {
            return;
        }
        reconcile();
    }

    @Override // org.apache.kafka.clients.consumer.internals.MembershipManager
    public void registerStateListener(MemberStateListener memberStateListener) {
        if (memberStateListener == null) {
            throw new IllegalArgumentException("State updates listener cannot be null");
        }
        this.stateUpdatesListeners.add(memberStateListener);
    }
}
