package org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-322.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.10.2.0.jar:org/apache/kafka/clients/NetworkClient.class */
public class NetworkClient implements KafkaClient {
    private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
    private final Selectable selector;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int requestTimeoutMs;
    private final long reconnectBackoffMs;
    private final Time time;
    private final boolean discoverBrokerVersions;
    private final Map<String, NodeApiVersions> nodeApiVersions;
    private final Set<String> nodesNeedingApiVersionsFetch;
    private final List<ClientResponse> abortedSends;

    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-322.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.10.2.0.jar:org/apache/kafka/clients/NetworkClient$DefaultMetadataUpdater.class */
    class DefaultMetadataUpdater implements MetadataUpdater {
        private final Metadata metadata;
        private boolean metadataFetchInProgress = false;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public List<Node> fetchNodes() {
            return this.metadata.fetch().nodes();
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public boolean isUpdateDue(long j) {
            return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(j) == 0;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public long maybeUpdate(long j) {
            long max = Math.max(this.metadata.timeToNextUpdate(j), this.metadataFetchInProgress ? NetworkClient.this.requestTimeoutMs : 0L);
            if (max > 0) {
                return max;
            }
            Node leastLoadedNode = NetworkClient.this.leastLoadedNode(j);
            if (leastLoadedNode != null) {
                return maybeUpdate(j, leastLoadedNode);
            }
            NetworkClient.log.debug("Give up sending metadata request since no node is available");
            return NetworkClient.this.reconnectBackoffMs;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public void handleDisconnection(String str) {
            Node nodeById;
            Cluster fetch = this.metadata.fetch();
            if (fetch.isBootstrapConfigured() && (nodeById = fetch.nodeById(Integer.parseInt(str))) != null) {
                NetworkClient.log.warn("Bootstrap broker {}:{} disconnected", nodeById.host(), Integer.valueOf(nodeById.port()));
            }
            this.metadataFetchInProgress = false;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long j, MetadataResponse metadataResponse) {
            this.metadataFetchInProgress = false;
            Cluster cluster = metadataResponse.cluster();
            Map<String, Errors> errors = metadataResponse.errors();
            if (!errors.isEmpty()) {
                NetworkClient.log.warn("Error while fetching metadata with correlation id {} : {}", Integer.valueOf(requestHeader.correlationId()), errors);
            }
            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, j);
            } else {
                NetworkClient.log.trace("Ignoring empty metadata response with correlation id {}.", Integer.valueOf(requestHeader.correlationId()));
                this.metadata.failedUpdate(j);
            }
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public void requestUpdate() {
            this.metadata.requestUpdate();
        }

        private boolean isAnyNodeConnecting() {
            Iterator<Node> it = fetchNodes().iterator();
            while (it.hasNext()) {
                if (NetworkClient.this.connectionStates.isConnecting(it.next().idString())) {
                    return true;
                }
            }
            return false;
        }

        private long maybeUpdate(long j, Node node) {
            String idString = node.idString();
            if (NetworkClient.this.canSendRequest(idString)) {
                this.metadataFetchInProgress = true;
                MetadataRequest.Builder allTopics = this.metadata.needMetadataForAllTopics() ? MetadataRequest.Builder.allTopics() : new MetadataRequest.Builder(new ArrayList(this.metadata.topics()));
                NetworkClient.log.debug("Sending metadata request {} to node {}", allTopics, Integer.valueOf(node.id()));
                NetworkClient.this.sendInternalMetadataRequest(allTopics, idString, j);
                return NetworkClient.this.requestTimeoutMs;
            }
            if (isAnyNodeConnecting()) {
                return NetworkClient.this.reconnectBackoffMs;
            }
            if (!NetworkClient.this.connectionStates.canConnect(idString, j)) {
                return Long.MAX_VALUE;
            }
            NetworkClient.log.debug("Initialize connection to node {} for sending metadata request", Integer.valueOf(node.id()));
            NetworkClient.this.initiateConnect(node, j);
            return NetworkClient.this.reconnectBackoffMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-322.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.10.2.0.jar:org/apache/kafka/clients/NetworkClient$InFlightRequest.class */
    public static class InFlightRequest {
        final RequestHeader header;
        final String destination;
        final RequestCompletionHandler callback;
        final boolean expectResponse;
        final boolean isInternalRequest;
        final Send send;
        final long sendTimeMs;
        final long createdTimeMs;

        public InFlightRequest(RequestHeader requestHeader, long j, String str, RequestCompletionHandler requestCompletionHandler, boolean z, boolean z2, Send send, long j2) {
            this.header = requestHeader;
            this.destination = str;
            this.callback = requestCompletionHandler;
            this.expectResponse = z;
            this.isInternalRequest = z2;
            this.send = send;
            this.sendTimeMs = j2;
            this.createdTimeMs = j;
        }

        public ClientResponse completed(AbstractResponse abstractResponse, long j) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, j, false, null, abstractResponse);
        }

        public ClientResponse disconnected(long j) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, j, true, null, null);
        }
    }

    public NetworkClient(Selectable selectable, Metadata metadata, String str, int i, long j, int i2, int i3, int i4, Time time, boolean z) {
        this(null, metadata, selectable, str, i, j, i2, i3, i4, time, z);
    }

    public NetworkClient(Selectable selectable, MetadataUpdater metadataUpdater, String str, int i, long j, int i2, int i3, int i4, Time time, boolean z) {
        this(metadataUpdater, null, selectable, str, i, j, i2, i3, i4, time, z);
    }

    private NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, Selectable selectable, String str, int i, long j, int i2, int i3, int i4, Time time, boolean z) {
        this.nodeApiVersions = new HashMap();
        this.nodesNeedingApiVersionsFetch = new HashSet();
        this.abortedSends = new LinkedList();
        if (metadataUpdater != null) {
            this.metadataUpdater = metadataUpdater;
        } else {
            if (metadata == null) {
                throw new IllegalArgumentException("`metadata` must not be null");
            }
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        }
        this.selector = selectable;
        this.clientId = str;
        this.inFlightRequests = new InFlightRequests(i);
        this.connectionStates = new ClusterConnectionStates(j);
        this.socketSendBuffer = i2;
        this.socketReceiveBuffer = i3;
        this.correlation = 0;
        this.randOffset = new Random();
        this.requestTimeoutMs = i4;
        this.reconnectBackoffMs = j;
        this.time = time;
        this.discoverBrokerVersions = z;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean ready(Node node, long j) {
        if (node.isEmpty()) {
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
        }
        if (isReady(node, j)) {
            return true;
        }
        if (!this.connectionStates.canConnect(node.idString(), j)) {
            return false;
        }
        initiateConnect(node, j);
        return false;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void close(String str) {
        this.selector.close(str);
        for (InFlightRequest inFlightRequest : this.inFlightRequests.clearAll(str)) {
            if (inFlightRequest.isInternalRequest && inFlightRequest.header.apiKey() == ApiKeys.METADATA.id) {
                this.metadataUpdater.handleDisconnection(inFlightRequest.destination);
            }
        }
        this.connectionStates.remove(str);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public long connectionDelay(Node node, long j) {
        return this.connectionStates.connectionDelay(node.idString(), j);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean connectionFailed(Node node) {
        return this.connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean isReady(Node node, long j) {
        return !this.metadataUpdater.isUpdateDue(j) && canSendRequest(node.idString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean canSendRequest(String str) {
        return this.connectionStates.isReady(str) && this.selector.isChannelReady(str) && this.inFlightRequests.canSendMore(str);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void send(ClientRequest clientRequest, long j) {
        doSend(clientRequest, false, j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendInternalMetadataRequest(MetadataRequest.Builder builder, String str, long j) {
        doSend(newClientRequest(str, builder, j, true), true, j);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [java.lang.Object, org.apache.kafka.common.requests.AbstractRequest] */
    private void doSend(ClientRequest clientRequest, boolean z, long j) {
        String destination = clientRequest.destination();
        if (!z && !canSendRequest(destination)) {
            throw new IllegalStateException("Attempt to send a request to node " + destination + " which is not ready.");
        }
        AbstractRequest.Builder<?> requestBuilder = clientRequest.requestBuilder();
        try {
            NodeApiVersions nodeApiVersions = this.nodeApiVersions.get(destination);
            if (nodeApiVersions != null) {
                requestBuilder.setVersion(nodeApiVersions.usableVersion(clientRequest.apiKey()));
            } else if (this.discoverBrokerVersions && log.isTraceEnabled()) {
                log.trace("No version information found when sending message of type {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), destination, Short.valueOf(requestBuilder.version())});
            }
            ?? build = requestBuilder.build();
            RequestHeader makeHeader = clientRequest.makeHeader();
            if (log.isDebugEnabled()) {
                if (makeHeader.apiVersion() == ProtoUtils.latestVersion(clientRequest.apiKey().id)) {
                    log.trace("Sending {} to node {}.", (Object) build, destination);
                } else {
                    log.debug("Using older server API v{} to send {} to node {}.", new Object[]{Short.valueOf(makeHeader.apiVersion()), build, destination});
                }
            }
            InFlightRequest inFlightRequest = new InFlightRequest(makeHeader, clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), z, build.toSend(destination, makeHeader), j);
            this.inFlightRequests.add(inFlightRequest);
            this.selector.send(inFlightRequest.send);
        } catch (UnsupportedVersionException e) {
            log.debug("Version mismatch when attempting to send {} to {}", new Object[]{clientRequest.toString(), clientRequest.destination(), e});
            this.abortedSends.add(new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), j, j, false, e, null));
        }
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public List<ClientResponse> poll(long j, long j2) {
        try {
            this.selector.poll(Utils.min(j, this.metadataUpdater.maybeUpdate(j2), this.requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
        long milliseconds = this.time.milliseconds();
        ArrayList arrayList = new ArrayList();
        handleAbortedSends(arrayList);
        handleCompletedSends(arrayList, milliseconds);
        handleCompletedReceives(arrayList, milliseconds);
        handleDisconnections(arrayList, milliseconds);
        handleConnections();
        handleInitiateApiVersionRequests(milliseconds);
        handleTimedOutRequests(arrayList, milliseconds);
        Iterator<ClientResponse> it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                it.next().onComplete();
            } catch (Exception e2) {
                log.error("Uncaught error in request completion:", e2);
            }
        }
        return arrayList;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public int inFlightRequestCount() {
        return this.inFlightRequests.inFlightRequestCount();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public int inFlightRequestCount(String str) {
        return this.inFlightRequests.inFlightRequestCount(str);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.selector.close();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public Node leastLoadedNode(long j) {
        List<Node> fetchNodes = this.metadataUpdater.fetchNodes();
        int i = Integer.MAX_VALUE;
        Node node = null;
        int nextInt = this.randOffset.nextInt(fetchNodes.size());
        for (int i2 = 0; i2 < fetchNodes.size(); i2++) {
            Node node2 = fetchNodes.get((nextInt + i2) % fetchNodes.size());
            int inFlightRequestCount = this.inFlightRequests.inFlightRequestCount(node2.idString());
            if (inFlightRequestCount == 0 && this.connectionStates.isReady(node2.idString())) {
                log.trace("Found least loaded node {} connected with no in-flight requests", node2);
                return node2;
            }
            if (!this.connectionStates.isBlackedOut(node2.idString(), j) && inFlightRequestCount < i) {
                i = inFlightRequestCount;
                node = node2;
            } else if (log.isTraceEnabled()) {
                log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}", new Object[]{node2, Boolean.valueOf(this.connectionStates.isBlackedOut(node2.idString(), j)), Integer.valueOf(inFlightRequestCount)});
            }
        }
        if (node != null) {
            log.trace("Found least loaded node {}", node);
        } else {
            log.trace("Least loaded node selection failed to find an available node");
        }
        return node;
    }

    public static AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeader requestHeader) {
        ResponseHeader parse = ResponseHeader.parse(byteBuffer);
        short apiKey = requestHeader.apiKey();
        Struct read = ProtoUtils.responseSchema(apiKey, requestHeader.apiVersion()).read(byteBuffer);
        correlate(requestHeader, parse);
        return AbstractResponse.getResponse(apiKey, read);
    }

    private void processDisconnection(List<ClientResponse> list, String str, long j) {
        this.connectionStates.disconnected(str, j);
        this.nodeApiVersions.remove(str);
        this.nodesNeedingApiVersionsFetch.remove(str);
        for (InFlightRequest inFlightRequest : this.inFlightRequests.clearAll(str)) {
            log.trace("Cancelled request {} due to node {} being disconnected", inFlightRequest, str);
            if (inFlightRequest.isInternalRequest && inFlightRequest.header.apiKey() == ApiKeys.METADATA.id) {
                this.metadataUpdater.handleDisconnection(inFlightRequest.destination);
            } else {
                list.add(inFlightRequest.disconnected(j));
            }
        }
    }

    private void handleTimedOutRequests(List<ClientResponse> list, long j) {
        List<String> nodesWithTimedOutRequests = this.inFlightRequests.getNodesWithTimedOutRequests(j, this.requestTimeoutMs);
        for (String str : nodesWithTimedOutRequests) {
            this.selector.close(str);
            log.debug("Disconnecting from node {} due to request timeout.", str);
            processDisconnection(list, str, j);
        }
        if (nodesWithTimedOutRequests.isEmpty()) {
            return;
        }
        this.metadataUpdater.requestUpdate();
    }

    private void handleAbortedSends(List<ClientResponse> list) {
        list.addAll(this.abortedSends);
        this.abortedSends.clear();
    }

    private void handleCompletedSends(List<ClientResponse> list, long j) {
        for (Send send : this.selector.completedSends()) {
            InFlightRequest lastSent = this.inFlightRequests.lastSent(send.destination());
            if (!lastSent.expectResponse) {
                this.inFlightRequests.completeLastSent(send.destination());
                list.add(lastSent.completed(null, j));
            }
        }
    }

    private void handleCompletedReceives(List<ClientResponse> list, long j) {
        for (NetworkReceive networkReceive : this.selector.completedReceives()) {
            InFlightRequest completeNext = this.inFlightRequests.completeNext(networkReceive.source());
            AbstractResponse parseResponse = parseResponse(networkReceive.payload(), completeNext.header);
            log.trace("Completed receive from node {}, for key {}, received {}", new Object[]{completeNext.destination, Short.valueOf(completeNext.header.apiKey()), parseResponse});
            if (completeNext.isInternalRequest && (parseResponse instanceof MetadataResponse)) {
                this.metadataUpdater.handleCompletedMetadataResponse(completeNext.header, j, (MetadataResponse) parseResponse);
            } else if (completeNext.isInternalRequest && (parseResponse instanceof ApiVersionsResponse)) {
                handleApiVersionsResponse(list, completeNext, j, (ApiVersionsResponse) parseResponse);
            } else {
                list.add(completeNext.completed(parseResponse, j));
            }
        }
    }

    private void handleApiVersionsResponse(List<ClientResponse> list, InFlightRequest inFlightRequest, long j, ApiVersionsResponse apiVersionsResponse) {
        String str = inFlightRequest.destination;
        if (apiVersionsResponse.errorCode() != Errors.NONE.code()) {
            log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.", str, Errors.forCode(apiVersionsResponse.errorCode()));
            this.selector.close(str);
            processDisconnection(list, str, j);
        } else {
            NodeApiVersions nodeApiVersions = new NodeApiVersions(apiVersionsResponse.apiVersions());
            this.nodeApiVersions.put(str, nodeApiVersions);
            this.connectionStates.ready(str);
            if (log.isDebugEnabled()) {
                log.debug("Recorded API versions for node {}: {}", str, nodeApiVersions);
            }
        }
    }

    private void handleDisconnections(List<ClientResponse> list, long j) {
        for (String str : this.selector.disconnected()) {
            log.debug("Node {} disconnected.", str);
            processDisconnection(list, str, j);
        }
        if (this.selector.disconnected().size() > 0) {
            this.metadataUpdater.requestUpdate();
        }
    }

    private void handleConnections() {
        for (String str : this.selector.connected()) {
            if (this.discoverBrokerVersions) {
                this.connectionStates.checkingApiVersions(str);
                this.nodesNeedingApiVersionsFetch.add(str);
                log.debug("Completed connection to node {}.  Fetching API versions.", str);
            } else {
                this.connectionStates.ready(str);
                log.debug("Completed connection to node {}.  Ready.", str);
            }
        }
    }

    private void handleInitiateApiVersionRequests(long j) {
        Iterator<String> it = this.nodesNeedingApiVersionsFetch.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (this.selector.isChannelReady(next) && this.inFlightRequests.canSendMore(next)) {
                log.debug("Initiating API versions fetch from node {}.", next);
                doSend(newClientRequest(next, new ApiVersionsRequest.Builder(), j, true), true, j);
                it.remove();
            }
        }
    }

    private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
        if (requestHeader.correlationId() != responseHeader.correlationId()) {
            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initiateConnect(Node node, long j) {
        String idString = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", new Object[]{Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port())});
            this.connectionStates.connecting(idString, j);
            this.selector.connect(idString, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
        } catch (IOException e) {
            this.connectionStates.disconnected(idString, j);
            this.metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", new Object[]{Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port()), e});
        }
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public ClientRequest newClientRequest(String str, AbstractRequest.Builder<?> builder, long j, boolean z) {
        return newClientRequest(str, builder, j, z, null);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public ClientRequest newClientRequest(String str, AbstractRequest.Builder<?> builder, long j, boolean z, RequestCompletionHandler requestCompletionHandler) {
        int i = this.correlation;
        this.correlation = i + 1;
        return new ClientRequest(str, builder, i, this.clientId, j, z, requestCompletionHandler);
    }

    public boolean discoverBrokerVersions() {
        return this.discoverBrokerVersions;
    }
}
