package org.apache.kafka.clients.producer.internals;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager.class */
public class TransactionManager {
    private static final Logger log = LoggerFactory.getLogger(TransactionManager.class);
    private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
    private final String transactionalId;
    private final int transactionTimeoutMs;
    public final String logPrefix;
    private final Map<TopicPartition, Integer> sequenceNumbers;
    private final PriorityQueue<TxnRequestHandler> pendingRequests;
    private final Set<TopicPartition> newPartitionsInTransaction;
    private final Set<TopicPartition> pendingPartitionsInTransaction;
    private final Set<TopicPartition> partitionsInTransaction;
    private final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> pendingTxnOffsetCommits;
    private final long retryBackoffMs;
    private static final long ADD_PARTITIONS_RETRY_BACKOFF_MS = 20;
    private int inFlightRequestCorrelationId;
    private Node transactionCoordinator;
    private Node consumerGroupCoordinator;
    private volatile State currentState;
    private volatile RuntimeException lastError;
    private volatile ProducerIdAndEpoch producerIdAndEpoch;
    private volatile boolean transactionStarted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$AddOffsetsToTxnHandler.class */
    public class AddOffsetsToTxnHandler extends TxnRequestHandler {
        private final AddOffsetsToTxnRequest.Builder builder;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        private AddOffsetsToTxnHandler(AddOffsetsToTxnRequest.Builder builder, Map<TopicPartition, OffsetAndMetadata> map) {
            super(TransactionManager.this);
            this.builder = builder;
            this.offsets = map;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public AddOffsetsToTxnRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        Priority priority() {
            return Priority.ADD_PARTITIONS_OR_OFFSETS;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public void handleResponse(AbstractResponse abstractResponse) {
            Errors error = ((AddOffsetsToTxnResponse) abstractResponse).error();
            if (error == Errors.NONE) {
                TransactionManager.log.debug("{}Successfully added partition for consumer group {} to transaction", TransactionManager.this.logPrefix, this.builder.consumerGroupId());
                TransactionManager.this.pendingRequests.add(TransactionManager.this.txnOffsetCommitHandler(this.result, this.offsets, this.builder.consumerGroupId()));
                TransactionManager.this.transactionStarted = true;
                return;
            }
            if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                reenqueue();
                return;
            }
            if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                reenqueue();
                return;
            }
            if (error == Errors.INVALID_PRODUCER_EPOCH) {
                fatalError(error.exception());
                return;
            }
            if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                fatalError(error.exception());
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                abortableError(new GroupAuthorizationException(this.builder.consumerGroupId()));
            } else {
                fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$AddPartitionsToTxnHandler.class */
    public class AddPartitionsToTxnHandler extends TxnRequestHandler {
        private final AddPartitionsToTxnRequest.Builder builder;
        private long retryBackoffMs;

        private AddPartitionsToTxnHandler(AddPartitionsToTxnRequest.Builder builder) {
            super(TransactionManager.this);
            this.builder = builder;
            this.retryBackoffMs = TransactionManager.this.retryBackoffMs;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public AddPartitionsToTxnRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        Priority priority() {
            return Priority.ADD_PARTITIONS_OR_OFFSETS;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public void handleResponse(AbstractResponse abstractResponse) {
            Map<TopicPartition, Errors> errors = ((AddPartitionsToTxnResponse) abstractResponse).errors();
            boolean z = false;
            HashSet hashSet = new HashSet();
            this.retryBackoffMs = TransactionManager.this.retryBackoffMs;
            for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
                TopicPartition key = entry.getKey();
                Errors value = entry.getValue();
                if (value != Errors.NONE) {
                    if (value == Errors.COORDINATOR_NOT_AVAILABLE || value == Errors.NOT_COORDINATOR) {
                        TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                        reenqueue();
                        return;
                    }
                    if (value == Errors.CONCURRENT_TRANSACTIONS) {
                        maybeOverrideRetryBackoffMs();
                        reenqueue();
                        return;
                    }
                    if (value == Errors.COORDINATOR_LOAD_IN_PROGRESS || value == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        reenqueue();
                        return;
                    }
                    if (value == Errors.INVALID_PRODUCER_EPOCH) {
                        fatalError(value.exception());
                        return;
                    }
                    if (value == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                        fatalError(value.exception());
                        return;
                    }
                    if (value == Errors.INVALID_PRODUCER_ID_MAPPING || value == Errors.INVALID_TXN_STATE) {
                        fatalError(new KafkaException(value.exception()));
                        return;
                    } else if (value == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        hashSet.add(key.topic());
                    } else if (value == Errors.OPERATION_NOT_ATTEMPTED) {
                        TransactionManager.log.debug("{}Did not attempt to add partition {} to transaction because other partitions in the batch had errors.", TransactionManager.this.logPrefix, key);
                        z = true;
                    } else {
                        TransactionManager.log.error("{}Could not add partition {} due to unexpected error {}", new Object[]{TransactionManager.this.logPrefix, key, value});
                        z = true;
                    }
                }
            }
            Set<TopicPartition> keySet = errors.keySet();
            TransactionManager.this.pendingPartitionsInTransaction.removeAll(keySet);
            if (!hashSet.isEmpty()) {
                abortableError(new TopicAuthorizationException(hashSet));
                return;
            }
            if (z) {
                abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
                return;
            }
            TransactionManager.log.debug("{}Successfully added partitions {} to transaction", TransactionManager.this.logPrefix, keySet);
            TransactionManager.this.partitionsInTransaction.addAll(keySet);
            TransactionManager.this.transactionStarted = true;
            this.result.done();
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public long retryBackoffMs() {
            return Math.min(TransactionManager.this.retryBackoffMs, this.retryBackoffMs);
        }

        private void maybeOverrideRetryBackoffMs() {
            if (TransactionManager.this.partitionsInTransaction.isEmpty()) {
                this.retryBackoffMs = 20L;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$EndTxnHandler.class */
    public class EndTxnHandler extends TxnRequestHandler {
        private final EndTxnRequest.Builder builder;

        private EndTxnHandler(EndTxnRequest.Builder builder) {
            super(TransactionManager.this);
            this.builder = builder;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public EndTxnRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        Priority priority() {
            return Priority.END_TXN;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        boolean isEndTxn() {
            return true;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public void handleResponse(AbstractResponse abstractResponse) {
            Errors error = ((EndTxnResponse) abstractResponse).error();
            if (error == Errors.NONE) {
                TransactionManager.this.completeTransaction();
                this.result.done();
                return;
            }
            if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                reenqueue();
                return;
            }
            if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                reenqueue();
                return;
            }
            if (error == Errors.INVALID_PRODUCER_EPOCH) {
                fatalError(error.exception());
                return;
            }
            if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                fatalError(error.exception());
            } else if (error == Errors.INVALID_TXN_STATE) {
                fatalError(error.exception());
            } else {
                fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$FindCoordinatorHandler.class */
    public class FindCoordinatorHandler extends TxnRequestHandler {
        private final FindCoordinatorRequest.Builder builder;

        private FindCoordinatorHandler(FindCoordinatorRequest.Builder builder) {
            super(TransactionManager.this);
            this.builder = builder;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public FindCoordinatorRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        Priority priority() {
            return Priority.FIND_COORDINATOR;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        FindCoordinatorRequest.CoordinatorType coordinatorType() {
            return null;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        String coordinatorKey() {
            return null;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public void handleResponse(AbstractResponse abstractResponse) {
            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) abstractResponse;
            Errors error = findCoordinatorResponse.error();
            if (error == Errors.NONE) {
                Node node = findCoordinatorResponse.node();
                switch (this.builder.coordinatorType()) {
                    case GROUP:
                        TransactionManager.this.consumerGroupCoordinator = node;
                        break;
                    case TRANSACTION:
                        TransactionManager.this.transactionCoordinator = node;
                        break;
                }
                this.result.done();
                return;
            }
            if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
                reenqueue();
                return;
            }
            if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                fatalError(error.exception());
            } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
                abortableError(new GroupAuthorizationException(this.builder.coordinatorKey()));
            } else {
                fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due tounexpected error: %s", this.builder.coordinatorType(), this.builder.coordinatorKey(), findCoordinatorResponse.error().message())));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$InitProducerIdHandler.class */
    public class InitProducerIdHandler extends TxnRequestHandler {
        private final InitProducerIdRequest.Builder builder;

        private InitProducerIdHandler(InitProducerIdRequest.Builder builder) {
            super(TransactionManager.this);
            this.builder = builder;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public InitProducerIdRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        Priority priority() {
            return Priority.INIT_PRODUCER_ID;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public void handleResponse(AbstractResponse abstractResponse) {
            InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) abstractResponse;
            Errors error = initProducerIdResponse.error();
            if (error == Errors.NONE) {
                TransactionManager.this.setProducerIdAndEpoch(new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch()));
                TransactionManager.this.transitionTo(State.READY);
                TransactionManager.this.lastError = null;
                this.result.done();
                return;
            }
            if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                reenqueue();
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                reenqueue();
            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                fatalError(error.exception());
            } else {
                fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$Priority.class */
    public enum Priority {
        FIND_COORDINATOR(0),
        INIT_PRODUCER_ID(1),
        ADD_PARTITIONS_OR_OFFSETS(2),
        END_TXN(3);

        final int priority;

        Priority(int i) {
            this.priority = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$State.class */
    public enum State {
        UNINITIALIZED,
        INITIALIZING,
        READY,
        IN_TRANSACTION,
        COMMITTING_TRANSACTION,
        ABORTING_TRANSACTION,
        ABORTABLE_ERROR,
        FATAL_ERROR;

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isTransitionValid(State state, State state2) {
            switch (state2) {
                case INITIALIZING:
                    return state == UNINITIALIZED;
                case READY:
                    return state == INITIALIZING || state == COMMITTING_TRANSACTION || state == ABORTING_TRANSACTION;
                case IN_TRANSACTION:
                    return state == READY;
                case COMMITTING_TRANSACTION:
                    return state == IN_TRANSACTION;
                case ABORTING_TRANSACTION:
                    return state == IN_TRANSACTION || state == ABORTABLE_ERROR;
                case ABORTABLE_ERROR:
                    return state == IN_TRANSACTION || state == COMMITTING_TRANSACTION || state == ABORTABLE_ERROR;
                case FATAL_ERROR:
                default:
                    return true;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$TxnOffsetCommitHandler.class */
    public class TxnOffsetCommitHandler extends TxnRequestHandler {
        private final TxnOffsetCommitRequest.Builder builder;

        private TxnOffsetCommitHandler(TransactionalRequestResult transactionalRequestResult, TxnOffsetCommitRequest.Builder builder) {
            super(transactionalRequestResult);
            this.builder = builder;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public TxnOffsetCommitRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        Priority priority() {
            return Priority.ADD_PARTITIONS_OR_OFFSETS;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        FindCoordinatorRequest.CoordinatorType coordinatorType() {
            return FindCoordinatorRequest.CoordinatorType.GROUP;
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        String coordinatorKey() {
            return this.builder.consumerGroupId();
        }

        @Override // org.apache.kafka.clients.producer.internals.TransactionManager.TxnRequestHandler
        public void handleResponse(AbstractResponse abstractResponse) {
            boolean z = false;
            boolean z2 = false;
            for (Map.Entry<TopicPartition, Errors> entry : ((TxnOffsetCommitResponse) abstractResponse).errors().entrySet()) {
                TopicPartition key = entry.getKey();
                Errors value = entry.getValue();
                if (value == Errors.NONE) {
                    TransactionManager.log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", new Object[]{TransactionManager.this.logPrefix, this.builder.offsets(), this.builder.consumerGroupId()});
                    TransactionManager.this.pendingTxnOffsetCommits.remove(key);
                } else if (value == Errors.COORDINATOR_NOT_AVAILABLE || value == Errors.NOT_COORDINATOR || value == Errors.REQUEST_TIMED_OUT) {
                    z2 = true;
                    if (!z) {
                        z = true;
                        TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, this.builder.consumerGroupId());
                    }
                } else {
                    if (value != Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        if (value == Errors.GROUP_AUTHORIZATION_FAILED) {
                            abortableError(new GroupAuthorizationException(this.builder.consumerGroupId()));
                            return;
                        } else if (value == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || value == Errors.INVALID_PRODUCER_EPOCH || value == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                            fatalError(value.exception());
                            return;
                        } else {
                            fatalError(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + value.message()));
                            return;
                        }
                    }
                    z2 = true;
                }
            }
            if (z2 && this.result.isSuccessful()) {
                if (TransactionManager.this.pendingTxnOffsetCommits.isEmpty()) {
                    return;
                }
                reenqueue();
            } else {
                this.result.done();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-441.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.11.0.1.jar:org/apache/kafka/clients/producer/internals/TransactionManager$TxnRequestHandler.class */
    public abstract class TxnRequestHandler implements RequestCompletionHandler {
        protected final TransactionalRequestResult result;
        private boolean isRetry;

        TxnRequestHandler(TransactionalRequestResult transactionalRequestResult) {
            this.isRetry = false;
            this.result = transactionalRequestResult;
        }

        TxnRequestHandler(TransactionManager transactionManager) {
            this(new TransactionalRequestResult());
        }

        void fatalError(RuntimeException runtimeException) {
            this.result.setError(runtimeException);
            TransactionManager.this.transitionToFatalError(runtimeException);
            this.result.done();
        }

        void abortableError(RuntimeException runtimeException) {
            this.result.setError(runtimeException);
            TransactionManager.this.transitionToAbortableError(runtimeException);
            this.result.done();
        }

        void fail(RuntimeException runtimeException) {
            this.result.setError(runtimeException);
            this.result.done();
        }

        void reenqueue() {
            synchronized (TransactionManager.this) {
                this.isRetry = true;
                TransactionManager.this.enqueueRequest(this);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public long retryBackoffMs() {
            return TransactionManager.this.retryBackoffMs;
        }

        @Override // org.apache.kafka.clients.RequestCompletionHandler
        public void onComplete(ClientResponse clientResponse) {
            if (clientResponse.requestHeader().correlationId() != TransactionManager.this.inFlightRequestCorrelationId) {
                fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
                return;
            }
            TransactionManager.this.clearInFlightRequestCorrelationId();
            if (clientResponse.wasDisconnected()) {
                TransactionManager.log.debug("{}Disconnected from {}. Will retry.", TransactionManager.this.logPrefix, clientResponse.destination());
                if (needsCoordinator()) {
                    TransactionManager.this.lookupCoordinator(coordinatorType(), coordinatorKey());
                }
                reenqueue();
                return;
            }
            if (clientResponse.versionMismatch() != null) {
                fatalError(clientResponse.versionMismatch());
                return;
            }
            if (!clientResponse.hasResponse()) {
                fatalError(new KafkaException("Could not execute transactional request for unknown reasons"));
                return;
            }
            TransactionManager.log.trace("{}Received transactional response {} for request {}", new Object[]{TransactionManager.this.logPrefix, clientResponse.responseBody(), requestBuilder()});
            synchronized (TransactionManager.this) {
                handleResponse(clientResponse.responseBody());
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean needsCoordinator() {
            return coordinatorType() != null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public FindCoordinatorRequest.CoordinatorType coordinatorType() {
            return FindCoordinatorRequest.CoordinatorType.TRANSACTION;
        }

        String coordinatorKey() {
            return TransactionManager.this.transactionalId;
        }

        void setRetry() {
            this.isRetry = true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isRetry() {
            return this.isRetry;
        }

        boolean isEndTxn() {
            return false;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AbstractRequest.Builder<?> requestBuilder();

        abstract void handleResponse(AbstractResponse abstractResponse);

        abstract Priority priority();
    }

    public TransactionManager(String str, int i, long j) {
        this.inFlightRequestCorrelationId = -1;
        this.currentState = State.UNINITIALIZED;
        this.lastError = null;
        this.transactionStarted = false;
        this.producerIdAndEpoch = new ProducerIdAndEpoch(-1L, (short) -1);
        this.sequenceNumbers = new HashMap();
        this.transactionalId = str;
        this.logPrefix = str == null ? "" : "[TransactionalId " + str + "] ";
        this.transactionTimeoutMs = i;
        this.transactionCoordinator = null;
        this.consumerGroupCoordinator = null;
        this.newPartitionsInTransaction = new HashSet();
        this.pendingPartitionsInTransaction = new HashSet();
        this.partitionsInTransaction = new HashSet();
        this.pendingTxnOffsetCommits = new HashMap();
        this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() { // from class: org.apache.kafka.clients.producer.internals.TransactionManager.1
            @Override // java.util.Comparator
            public int compare(TxnRequestHandler txnRequestHandler, TxnRequestHandler txnRequestHandler2) {
                return Integer.compare(txnRequestHandler.priority().priority, txnRequestHandler2.priority().priority);
            }
        });
        this.retryBackoffMs = j;
    }

    TransactionManager() {
        this(null, 0, 100L);
    }

    public synchronized TransactionalRequestResult initializeTransactions() {
        ensureTransactional();
        transitionTo(State.INITIALIZING);
        setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
        this.sequenceNumbers.clear();
        InitProducerIdHandler initProducerIdHandler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(this.transactionalId, this.transactionTimeoutMs));
        enqueueRequest(initProducerIdHandler);
        return initProducerIdHandler.result;
    }

    public synchronized void beginTransaction() {
        ensureTransactional();
        maybeFailWithError();
        transitionTo(State.IN_TRANSACTION);
    }

    public synchronized TransactionalRequestResult beginCommit() {
        ensureTransactional();
        maybeFailWithError();
        transitionTo(State.COMMITTING_TRANSACTION);
        return beginCompletingTransaction(TransactionResult.COMMIT);
    }

    public synchronized TransactionalRequestResult beginAbort() {
        ensureTransactional();
        if (this.currentState != State.ABORTABLE_ERROR) {
            maybeFailWithError();
        }
        transitionTo(State.ABORTING_TRANSACTION);
        this.newPartitionsInTransaction.clear();
        return beginCompletingTransaction(TransactionResult.ABORT);
    }

    private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
        if (!this.newPartitionsInTransaction.isEmpty()) {
            enqueueRequest(addPartitionsToTransactionHandler());
        }
        EndTxnHandler endTxnHandler = new EndTxnHandler(new EndTxnRequest.Builder(this.transactionalId, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, transactionResult));
        enqueueRequest(endTxnHandler);
        return endTxnHandler.result;
    }

    public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) {
        ensureTransactional();
        maybeFailWithError();
        if (this.currentState != State.IN_TRANSACTION) {
            throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an active transaction");
        }
        log.debug("{}Begin adding offsets {} for consumer group {} to transaction", new Object[]{this.logPrefix, map, str});
        AddOffsetsToTxnHandler addOffsetsToTxnHandler = new AddOffsetsToTxnHandler(new AddOffsetsToTxnRequest.Builder(this.transactionalId, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, str), map);
        enqueueRequest(addOffsetsToTxnHandler);
        return addOffsetsToTxnHandler.result;
    }

    public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
        failIfNotReadyForSend();
        if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) {
            return;
        }
        log.debug("{}Begin adding new partition {} to transaction", this.logPrefix, topicPartition);
        this.newPartitionsInTransaction.add(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeException lastError() {
        return this.lastError;
    }

    public synchronized void failIfNotReadyForSend() {
        if (hasError()) {
            throw new KafkaException("Cannot perform send because at least one previous transactional or idempotent request has failed with errors.", this.lastError);
        }
        if (isTransactional()) {
            if (!hasProducerId()) {
                throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
            }
            if (this.currentState != State.IN_TRANSACTION) {
                throw new IllegalStateException("Cannot call send in state " + this.currentState);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isSendToPartitionAllowed(TopicPartition topicPartition) {
        if (hasFatalError()) {
            return false;
        }
        return !isTransactional() || this.partitionsInTransaction.contains(topicPartition);
    }

    public String transactionalId() {
        return this.transactionalId;
    }

    public boolean hasProducerId() {
        return this.producerIdAndEpoch.isValid();
    }

    public boolean isTransactional() {
        return this.transactionalId != null;
    }

    synchronized boolean hasPartitionsToAdd() {
        return (this.newPartitionsInTransaction.isEmpty() && this.pendingPartitionsInTransaction.isEmpty()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isCompleting() {
        return this.currentState == State.COMMITTING_TRANSACTION || this.currentState == State.ABORTING_TRANSACTION;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean hasError() {
        return this.currentState == State.ABORTABLE_ERROR || this.currentState == State.FATAL_ERROR;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isAborting() {
        return this.currentState == State.ABORTING_TRANSACTION;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void transitionToAbortableError(RuntimeException runtimeException) {
        if (this.currentState == State.ABORTING_TRANSACTION) {
            log.debug("Skipping transition to abortable error state since the transaction is already being aborted. Underlying exception: ", runtimeException);
        } else {
            transitionTo(State.ABORTABLE_ERROR, runtimeException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void transitionToFatalError(RuntimeException runtimeException) {
        transitionTo(State.FATAL_ERROR, runtimeException);
    }

    synchronized boolean isPartitionAdded(TopicPartition topicPartition) {
        return this.partitionsInTransaction.contains(topicPartition);
    }

    synchronized boolean isPartitionPendingAdd(TopicPartition topicPartition) {
        return this.newPartitionsInTransaction.contains(topicPartition) || this.pendingPartitionsInTransaction.contains(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProducerIdAndEpoch producerIdAndEpoch() {
        return this.producerIdAndEpoch;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasProducerId(long j) {
        return this.producerIdAndEpoch.producerId == j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasProducerIdAndEpoch(long j, short s) {
        ProducerIdAndEpoch producerIdAndEpoch = this.producerIdAndEpoch;
        return producerIdAndEpoch.producerId == j && producerIdAndEpoch.epoch == s;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
        log.info("{}ProducerId set to {} with epoch {}", new Object[]{this.logPrefix, Long.valueOf(producerIdAndEpoch.producerId), Short.valueOf(producerIdAndEpoch.epoch)});
        this.producerIdAndEpoch = producerIdAndEpoch;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void resetProducerId() {
        if (isTransactional()) {
            throw new IllegalStateException("Cannot reset producer state for a transactional producer. You must either abort the ongoing transaction or reinitialize the transactional producer instead");
        }
        setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
        this.sequenceNumbers.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Integer sequenceNumber(TopicPartition topicPartition) {
        Integer num = this.sequenceNumbers.get(topicPartition);
        if (num == null) {
            num = 0;
            this.sequenceNumbers.put(topicPartition, null);
        }
        return num;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void incrementSequenceNumber(TopicPartition topicPartition, int i) {
        Integer num = this.sequenceNumbers.get(topicPartition);
        if (num == null) {
            throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
        }
        this.sequenceNumbers.put(topicPartition, Integer.valueOf(num.intValue() + i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized TxnRequestHandler nextRequestHandler(boolean z) {
        if (!this.newPartitionsInTransaction.isEmpty()) {
            enqueueRequest(addPartitionsToTransactionHandler());
        }
        TxnRequestHandler peek = this.pendingRequests.peek();
        if (peek == null) {
            return null;
        }
        if (peek.isEndTxn() && z) {
            return null;
        }
        this.pendingRequests.poll();
        if (maybeTerminateRequestWithError(peek)) {
            log.trace("{}Not sending transactional request {} because we are in an error state", this.logPrefix, peek.requestBuilder());
            return null;
        }
        if (peek.isEndTxn() && !this.transactionStarted) {
            peek.result.done();
            if (this.currentState != State.FATAL_ERROR) {
                log.debug("{}Not sending EndTxn for completed transaction since no partitions or offsets were successfully added", this.logPrefix);
                completeTransaction();
            }
            peek = this.pendingRequests.poll();
        }
        if (peek != null) {
            log.trace("{}Request {} dequeued for sending", this.logPrefix, peek.requestBuilder());
        }
        return peek;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void retry(TxnRequestHandler txnRequestHandler) {
        txnRequestHandler.setRetry();
        enqueueRequest(txnRequestHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node coordinator(FindCoordinatorRequest.CoordinatorType coordinatorType) {
        switch (coordinatorType) {
            case GROUP:
                return this.consumerGroupCoordinator;
            case TRANSACTION:
                return this.transactionCoordinator;
            default:
                throw new IllegalStateException("Received an invalid coordinator type: " + coordinatorType);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void lookupCoordinator(TxnRequestHandler txnRequestHandler) {
        lookupCoordinator(txnRequestHandler.coordinatorType(), txnRequestHandler.coordinatorKey());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setInFlightRequestCorrelationId(int i) {
        this.inFlightRequestCorrelationId = i;
    }

    void clearInFlightRequestCorrelationId() {
        this.inFlightRequestCorrelationId = -1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasInFlightRequest() {
        return this.inFlightRequestCorrelationId != -1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasFatalError() {
        return this.currentState == State.FATAL_ERROR;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasAbortableError() {
        return this.currentState == State.ABORTABLE_ERROR;
    }

    synchronized boolean transactionContainsPartition(TopicPartition topicPartition) {
        return this.partitionsInTransaction.contains(topicPartition);
    }

    synchronized boolean hasPendingOffsetCommits() {
        return !this.pendingTxnOffsetCommits.isEmpty();
    }

    synchronized boolean hasOngoingTransaction() {
        return this.currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError();
    }

    synchronized boolean isReady() {
        return isTransactional() && this.currentState == State.READY;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void transitionTo(State state) {
        transitionTo(state, null);
    }

    private synchronized void transitionTo(State state, RuntimeException runtimeException) {
        if (!this.currentState.isTransitionValid(this.currentState, state)) {
            throw new KafkaException((this.transactionalId == null ? "" : "TransactionalId " + this.transactionalId + ": ") + "Invalid transition attempted from state " + this.currentState.name() + " to state " + state.name());
        }
        if (state != State.FATAL_ERROR && state != State.ABORTABLE_ERROR) {
            this.lastError = null;
        } else {
            if (runtimeException == null) {
                throw new IllegalArgumentException("Cannot transition to " + state + " with an null exception");
            }
            this.lastError = runtimeException;
        }
        if (this.lastError != null) {
            log.debug("{}Transition from state {} to error state {}", new Object[]{this.logPrefix, this.currentState, state, this.lastError});
        } else {
            log.debug("{}Transition from state {} to {}", new Object[]{this.logPrefix, this.currentState, state});
        }
        this.currentState = state;
    }

    private void ensureTransactional() {
        if (!isTransactional()) {
            throw new IllegalStateException("Transactional method invoked on a non-transactional producer.");
        }
    }

    private void maybeFailWithError() {
        if (hasError()) {
            throw new KafkaException("Cannot execute transactional method because we are in an error state", this.lastError);
        }
    }

    private boolean maybeTerminateRequestWithError(TxnRequestHandler txnRequestHandler) {
        if (!hasError()) {
            return false;
        }
        if (hasAbortableError() && (txnRequestHandler instanceof FindCoordinatorHandler)) {
            return false;
        }
        txnRequestHandler.fail(this.lastError);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueRequest(TxnRequestHandler txnRequestHandler) {
        log.debug("{}Enqueuing transactional request {}", this.logPrefix, txnRequestHandler.requestBuilder());
        this.pendingRequests.add(txnRequestHandler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType coordinatorType, String str) {
        switch (coordinatorType) {
            case GROUP:
                this.consumerGroupCoordinator = null;
                break;
            case TRANSACTION:
                this.transactionCoordinator = null;
                break;
            default:
                throw new IllegalStateException("Invalid coordinator type: " + coordinatorType);
        }
        enqueueRequest(new FindCoordinatorHandler(new FindCoordinatorRequest.Builder(coordinatorType, str)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void completeTransaction() {
        transitionTo(State.READY);
        this.lastError = null;
        this.transactionStarted = false;
        this.newPartitionsInTransaction.clear();
        this.pendingPartitionsInTransaction.clear();
        this.partitionsInTransaction.clear();
    }

    private synchronized TxnRequestHandler addPartitionsToTransactionHandler() {
        this.pendingPartitionsInTransaction.addAll(this.newPartitionsInTransaction);
        this.newPartitionsInTransaction.clear();
        return new AddPartitionsToTxnHandler(new AddPartitionsToTxnRequest.Builder(this.transactionalId, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, new ArrayList(this.pendingPartitionsInTransaction)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult transactionalRequestResult, Map<TopicPartition, OffsetAndMetadata> map, String str) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            OffsetAndMetadata value = entry.getValue();
            this.pendingTxnOffsetCommits.put(entry.getKey(), new TxnOffsetCommitRequest.CommittedOffset(value.offset(), value.metadata()));
        }
        return new TxnOffsetCommitHandler(transactionalRequestResult, new TxnOffsetCommitRequest.Builder(this.transactionalId, str, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, this.pendingTxnOffsetCommits));
    }
}
