package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-309.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.10.2.0.jar:org/apache/kafka/common/network/Selector.class */
public class Selector implements Selectable {
    public static final long NO_IDLE_TIMEOUT_MS = -1;
    private static final Logger log = LoggerFactory.getLogger(Selector.class);
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, KafkaChannel> channels;
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    private final Set<SelectionKey> immediatelyConnectedKeys;
    private final Map<String, KafkaChannel> closingChannels;
    private final List<String> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final String metricGrpPrefix;
    private final Map<String, String> metricTags;
    private final ChannelBuilder channelBuilder;
    private final int maxReceiveSize;
    private final boolean metricsPerConnection;
    private final IdleExpiryManager idleExpiryManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-309.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.10.2.0.jar:org/apache/kafka/common/network/Selector$IdleExpiryManager.class */
    public static class IdleExpiryManager {
        private final Map<String, Long> lruConnections = new LinkedHashMap(16, 0.75f, true);
        private final long connectionsMaxIdleNanos;
        private long nextIdleCloseCheckTime;

        public IdleExpiryManager(Time time, long j) {
            this.connectionsMaxIdleNanos = j * 1000 * 1000;
            this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos;
        }

        public void update(String str, long j) {
            this.lruConnections.put(str, Long.valueOf(j));
        }

        public Map.Entry<String, Long> pollExpiredConnection(long j) {
            if (j <= this.nextIdleCloseCheckTime) {
                return null;
            }
            if (this.lruConnections.isEmpty()) {
                this.nextIdleCloseCheckTime = j + this.connectionsMaxIdleNanos;
                return null;
            }
            Map.Entry<String, Long> next = this.lruConnections.entrySet().iterator().next();
            this.nextIdleCloseCheckTime = next.getValue().longValue() + this.connectionsMaxIdleNanos;
            if (j > this.nextIdleCloseCheckTime) {
                return next;
            }
            return null;
        }

        public void remove(String str) {
            this.lruConnections.remove(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-309.zip:modules/system/layers/fuse/org/apache/kafka/clients/main/kafka-clients-0.10.2.0.jar:org/apache/kafka/common/network/Selector$SelectorMetrics.class */
    public class SelectorMetrics {
        private final Metrics metrics;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;
        private final List<MetricName> topLevelMetricNames = new ArrayList();
        private final List<Sensor> sensors = new ArrayList();

        public SelectorMetrics(Metrics metrics) {
            this.metrics = metrics;
            String str = Selector.this.metricGrpPrefix + "-metrics";
            StringBuilder sb = new StringBuilder();
            for (Map.Entry entry : Selector.this.metricTags.entrySet()) {
                sb.append((String) entry.getKey());
                sb.append("-");
                sb.append((String) entry.getValue());
            }
            this.connectionClosed = sensor("connections-closed:" + sb.toString(), new Sensor[0]);
            this.connectionClosed.add(metrics.metricName("connection-close-rate", str, "Connections closed per second in the window.", Selector.this.metricTags), new Rate());
            this.connectionCreated = sensor("connections-created:" + sb.toString(), new Sensor[0]);
            this.connectionCreated.add(metrics.metricName("connection-creation-rate", str, "New connections established per second in the window.", Selector.this.metricTags), new Rate());
            this.bytesTransferred = sensor("bytes-sent-received:" + sb.toString(), new Sensor[0]);
            this.bytesTransferred.add(metrics.metricName("network-io-rate", str, "The average number of network operations (reads or writes) on all connections per second.", Selector.this.metricTags), new Rate(new Count()));
            this.bytesSent = sensor("bytes-sent:" + sb.toString(), this.bytesTransferred);
            this.bytesSent.add(metrics.metricName("outgoing-byte-rate", str, "The average number of outgoing bytes sent per second to all servers.", Selector.this.metricTags), new Rate());
            this.bytesSent.add(metrics.metricName("request-rate", str, "The average number of requests sent per second.", Selector.this.metricTags), new Rate(new Count()));
            this.bytesSent.add(metrics.metricName("request-size-avg", str, "The average size of all requests in the window..", Selector.this.metricTags), new Avg());
            this.bytesSent.add(metrics.metricName("request-size-max", str, "The maximum size of any request sent in the window.", Selector.this.metricTags), new Max());
            this.bytesReceived = sensor("bytes-received:" + sb.toString(), this.bytesTransferred);
            this.bytesReceived.add(metrics.metricName("incoming-byte-rate", str, "Bytes/second read off all sockets", Selector.this.metricTags), new Rate());
            this.bytesReceived.add(metrics.metricName("response-rate", str, "Responses received sent per second.", Selector.this.metricTags), new Rate(new Count()));
            this.selectTime = sensor("select-time:" + sb.toString(), new Sensor[0]);
            this.selectTime.add(metrics.metricName("select-rate", str, "Number of times the I/O layer checked for new I/O to perform per second", Selector.this.metricTags), new Rate(new Count()));
            this.selectTime.add(metrics.metricName("io-wait-time-ns-avg", str, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", Selector.this.metricTags), new Avg());
            this.selectTime.add(metrics.metricName("io-wait-ratio", str, "The fraction of time the I/O thread spent waiting.", Selector.this.metricTags), new Rate(TimeUnit.NANOSECONDS));
            this.ioTime = sensor("io-time:" + sb.toString(), new Sensor[0]);
            this.ioTime.add(metrics.metricName("io-time-ns-avg", str, "The average length of time for I/O per select call in nanoseconds.", Selector.this.metricTags), new Avg());
            this.ioTime.add(metrics.metricName("io-ratio", str, "The fraction of time the I/O thread spent doing I/O", Selector.this.metricTags), new Rate(TimeUnit.NANOSECONDS));
            MetricName metricName = metrics.metricName("connection-count", str, "The current number of active connections.", Selector.this.metricTags);
            this.topLevelMetricNames.add(metricName);
            this.metrics.addMetric(metricName, new Measurable() { // from class: org.apache.kafka.common.network.Selector.SelectorMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return Selector.this.channels.size();
                }
            });
        }

        private Sensor sensor(String str, Sensor... sensorArr) {
            Sensor sensor = this.metrics.sensor(str, sensorArr);
            this.sensors.add(sensor);
            return sensor;
        }

        public void maybeRegisterConnectionMetrics(String str) {
            if (str.isEmpty() || !Selector.this.metricsPerConnection) {
                return;
            }
            String str2 = "node-" + str + ".bytes-sent";
            if (this.metrics.getSensor(str2) == null) {
                String str3 = Selector.this.metricGrpPrefix + "-node-metrics";
                LinkedHashMap linkedHashMap = new LinkedHashMap(Selector.this.metricTags);
                linkedHashMap.put("node-id", "node-" + str);
                Sensor sensor = sensor(str2, new Sensor[0]);
                sensor.add(this.metrics.metricName("outgoing-byte-rate", str3, linkedHashMap), new Rate());
                sensor.add(this.metrics.metricName("request-rate", str3, "The average number of requests sent per second.", linkedHashMap), new Rate(new Count()));
                sensor.add(this.metrics.metricName("request-size-avg", str3, "The average size of all requests in the window..", linkedHashMap), new Avg());
                sensor.add(this.metrics.metricName("request-size-max", str3, "The maximum size of any request sent in the window.", linkedHashMap), new Max());
                Sensor sensor2 = sensor("node-" + str + ".bytes-received", new Sensor[0]);
                sensor2.add(this.metrics.metricName("incoming-byte-rate", str3, linkedHashMap), new Rate());
                sensor2.add(this.metrics.metricName("response-rate", str3, "The average number of responses received per second.", linkedHashMap), new Rate(new Count()));
                Sensor sensor3 = sensor("node-" + str + ".latency", new Sensor[0]);
                sensor3.add(this.metrics.metricName("request-latency-avg", str3, linkedHashMap), new Avg());
                sensor3.add(this.metrics.metricName("request-latency-max", str3, linkedHashMap), new Max());
            }
        }

        public void recordBytesSent(String str, long j) {
            long milliseconds = Selector.this.time.milliseconds();
            this.bytesSent.record(j, milliseconds);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".bytes-sent");
            if (sensor != null) {
                sensor.record(j, milliseconds);
            }
        }

        public void recordBytesReceived(String str, int i) {
            long milliseconds = Selector.this.time.milliseconds();
            this.bytesReceived.record(i, milliseconds);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".bytes-received");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void close() {
            Iterator<MetricName> it = this.topLevelMetricNames.iterator();
            while (it.hasNext()) {
                this.metrics.removeMetric(it.next());
            }
            Iterator<Sensor> it2 = this.sensors.iterator();
            while (it2.hasNext()) {
                this.metrics.removeSensor(it2.next().name());
            }
        }
    }

    public Selector(int i, long j, Metrics metrics, Time time, String str, Map<String, String> map, boolean z, ChannelBuilder channelBuilder) {
        try {
            this.nioSelector = java.nio.channels.Selector.open();
            this.maxReceiveSize = i;
            this.time = time;
            this.metricGrpPrefix = str;
            this.metricTags = map;
            this.channels = new HashMap();
            this.completedSends = new ArrayList();
            this.completedReceives = new ArrayList();
            this.stagedReceives = new HashMap();
            this.immediatelyConnectedKeys = new HashSet();
            this.closingChannels = new HashMap();
            this.connected = new ArrayList();
            this.disconnected = new ArrayList();
            this.failedSends = new ArrayList();
            this.sensors = new SelectorMetrics(metrics);
            this.channelBuilder = channelBuilder;
            this.metricsPerConnection = z;
            this.idleExpiryManager = j < 0 ? null : new IdleExpiryManager(time, j);
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

    public Selector(long j, Metrics metrics, Time time, String str, ChannelBuilder channelBuilder) {
        this(-1, j, metrics, time, str, new HashMap(), true, channelBuilder);
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void connect(String str, InetSocketAddress inetSocketAddress, int i, int i2) throws IOException {
        if (this.channels.containsKey(str)) {
            throw new IllegalStateException("There is already a connection for id " + str);
        }
        SocketChannel open = SocketChannel.open();
        open.configureBlocking(false);
        Socket socket = open.socket();
        socket.setKeepAlive(true);
        if (i != -1) {
            socket.setSendBufferSize(i);
        }
        if (i2 != -1) {
            socket.setReceiveBufferSize(i2);
        }
        socket.setTcpNoDelay(true);
        try {
            boolean connect = open.connect(inetSocketAddress);
            SelectionKey register = open.register(this.nioSelector, 8);
            KafkaChannel buildChannel = this.channelBuilder.buildChannel(str, register, this.maxReceiveSize);
            register.attach(buildChannel);
            this.channels.put(str, buildChannel);
            if (connect) {
                log.debug("Immediately connected to node {}", buildChannel.id());
                this.immediatelyConnectedKeys.add(register);
                register.interestOps(0);
            }
        } catch (IOException e) {
            open.close();
            throw e;
        } catch (UnresolvedAddressException e2) {
            open.close();
            throw new IOException("Can't resolve address: " + inetSocketAddress, e2);
        }
    }

    public void register(String str, SocketChannel socketChannel) throws ClosedChannelException {
        SelectionKey register = socketChannel.register(this.nioSelector, 1);
        KafkaChannel buildChannel = this.channelBuilder.buildChannel(str, register, this.maxReceiveSize);
        register.attach(buildChannel);
        this.channels.put(str, buildChannel);
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void wakeup() {
        this.nioSelector.wakeup();
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void close() {
        Iterator it = new ArrayList(this.channels.keySet()).iterator();
        while (it.hasNext()) {
            close((String) it.next());
        }
        try {
            this.nioSelector.close();
        } catch (IOException | SecurityException e) {
            log.error("Exception closing nioSelector:", e);
        }
        this.sensors.close();
        this.channelBuilder.close();
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void send(Send send) {
        String destination = send.destination();
        if (this.closingChannels.containsKey(destination)) {
            this.failedSends.add(destination);
            return;
        }
        KafkaChannel channelOrFail = channelOrFail(destination, false);
        try {
            channelOrFail.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(destination);
            close(channelOrFail, false);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void poll(long j) throws IOException {
        if (j < 0) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        clear();
        if (hasStagedReceives() || !this.immediatelyConnectedKeys.isEmpty()) {
            j = 0;
        }
        long nanoseconds = this.time.nanoseconds();
        int select = select(j);
        long nanoseconds2 = this.time.nanoseconds();
        this.sensors.selectTime.record(nanoseconds2 - nanoseconds, this.time.milliseconds());
        if (select > 0 || !this.immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, nanoseconds2);
            pollSelectionKeys(this.immediatelyConnectedKeys, true, nanoseconds2);
        }
        addToCompletedReceives();
        this.sensors.ioTime.record(this.time.nanoseconds() - nanoseconds2, this.time.milliseconds());
        maybeCloseOldestConnection(nanoseconds2);
    }

    /* JADX WARN: Removed duplicated region for block: B:35:0x0103 A[Catch: Exception -> 0x015b, LOOP:1: B:33:0x00f8->B:35:0x0103, LOOP_END, TryCatch #0 {Exception -> 0x015b, blocks: (B:10:0x0051, B:22:0x00ca, B:24:0x00d2, B:26:0x00da, B:27:0x00df, B:29:0x00e7, B:31:0x00ef, B:33:0x00f8, B:35:0x0103, B:37:0x010e, B:39:0x0116, B:41:0x011e, B:43:0x012a, B:44:0x0149, B:46:0x0151, B:14:0x0059, B:21:0x0061), top: B:9:0x0051 }] */
    /* JADX WARN: Removed duplicated region for block: B:36:0x010e A[EDGE_INSN: B:36:0x010e->B:37:0x010e BREAK  A[LOOP:1: B:33:0x00f8->B:35:0x0103], SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:46:0x0151 A[Catch: Exception -> 0x015b, TryCatch #0 {Exception -> 0x015b, blocks: (B:10:0x0051, B:22:0x00ca, B:24:0x00d2, B:26:0x00da, B:27:0x00df, B:29:0x00e7, B:31:0x00ef, B:33:0x00f8, B:35:0x0103, B:37:0x010e, B:39:0x0116, B:41:0x011e, B:43:0x012a, B:44:0x0149, B:46:0x0151, B:14:0x0059, B:21:0x0061), top: B:9:0x0051 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void pollSelectionKeys(java.lang.Iterable<java.nio.channels.SelectionKey> r8, boolean r9, long r10) {
        /*
            Method dump skipped, instructions count: 406
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.common.network.Selector.pollSelectionKeys(java.lang.Iterable, boolean, long):void");
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<Send> completedSends() {
        return this.completedSends;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<String> disconnected() {
        return this.disconnected;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<String> connected() {
        return this.connected;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void mute(String str) {
        mute(channelOrFail(str, true));
    }

    private void mute(KafkaChannel kafkaChannel) {
        kafkaChannel.mute();
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void unmute(String str) {
        unmute(channelOrFail(str, true));
    }

    private void unmute(KafkaChannel kafkaChannel) {
        kafkaChannel.unmute();
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void muteAll() {
        Iterator<KafkaChannel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            mute(it.next());
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void unmuteAll() {
        Iterator<KafkaChannel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            unmute(it.next());
        }
    }

    private void maybeCloseOldestConnection(long j) {
        Map.Entry<String, Long> pollExpiredConnection;
        String key;
        KafkaChannel kafkaChannel;
        if (this.idleExpiryManager == null || (pollExpiredConnection = this.idleExpiryManager.pollExpiredConnection(j)) == null || (kafkaChannel = this.channels.get((key = pollExpiredConnection.getKey()))) == null) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("About to close the idle connection from {} due to being idle for {} millis", key, Long.valueOf(((j - pollExpiredConnection.getValue().longValue()) / 1000) / 1000));
        }
        close(kafkaChannel, true);
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        Iterator<Map.Entry<String, KafkaChannel>> it = this.closingChannels.entrySet().iterator();
        while (it.hasNext()) {
            KafkaChannel value = it.next().getValue();
            Deque<NetworkReceive> deque = this.stagedReceives.get(value);
            boolean remove = this.failedSends.remove(value.id());
            if (deque == null || deque.isEmpty() || remove) {
                doClose(value, true);
                it.remove();
            }
        }
        this.disconnected.addAll(this.failedSends);
        this.failedSends.clear();
    }

    private int select(long j) throws IOException {
        if (j < 0) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        return j == 0 ? this.nioSelector.selectNow() : this.nioSelector.select(j);
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void close(String str) {
        KafkaChannel kafkaChannel = this.channels.get(str);
        if (kafkaChannel != null) {
            close(kafkaChannel, false);
        }
    }

    private void close(KafkaChannel kafkaChannel, boolean z) {
        kafkaChannel.disconnect();
        Deque<NetworkReceive> deque = this.stagedReceives.get(kafkaChannel);
        if (!z || deque == null || deque.isEmpty()) {
            doClose(kafkaChannel, z);
        } else {
            if (!kafkaChannel.isMute()) {
                addToCompletedReceives(kafkaChannel, deque);
                if (deque.isEmpty()) {
                    this.stagedReceives.remove(kafkaChannel);
                }
            }
            this.closingChannels.put(kafkaChannel.id(), kafkaChannel);
        }
        this.channels.remove(kafkaChannel.id());
        if (this.idleExpiryManager != null) {
            this.idleExpiryManager.remove(kafkaChannel.id());
        }
    }

    private void doClose(KafkaChannel kafkaChannel, boolean z) {
        try {
            kafkaChannel.close();
        } catch (IOException e) {
            log.error("Exception closing connection to node {}:", kafkaChannel.id(), e);
        }
        this.sensors.connectionClosed.record();
        this.stagedReceives.remove(kafkaChannel);
        if (z) {
            this.disconnected.add(kafkaChannel.id());
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public boolean isChannelReady(String str) {
        KafkaChannel kafkaChannel = this.channels.get(str);
        return kafkaChannel != null && kafkaChannel.ready();
    }

    private KafkaChannel channelOrFail(String str, boolean z) {
        KafkaChannel kafkaChannel = this.channels.get(str);
        if (kafkaChannel == null && z) {
            kafkaChannel = this.closingChannels.get(str);
        }
        if (kafkaChannel == null) {
            throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + str + " existing connections " + this.channels.keySet());
        }
        return kafkaChannel;
    }

    public List<KafkaChannel> channels() {
        return new ArrayList(this.channels.values());
    }

    public KafkaChannel channel(String str) {
        return this.channels.get(str);
    }

    public KafkaChannel closingChannel(String str) {
        return this.closingChannels.get(str);
    }

    private KafkaChannel channel(SelectionKey selectionKey) {
        return (KafkaChannel) selectionKey.attachment();
    }

    private boolean hasStagedReceive(KafkaChannel kafkaChannel) {
        return this.stagedReceives.containsKey(kafkaChannel);
    }

    private boolean hasStagedReceives() {
        Iterator<KafkaChannel> it = this.stagedReceives.keySet().iterator();
        while (it.hasNext()) {
            if (!it.next().isMute()) {
                return true;
            }
        }
        return false;
    }

    private void addToStagedReceives(KafkaChannel kafkaChannel, NetworkReceive networkReceive) {
        if (!this.stagedReceives.containsKey(kafkaChannel)) {
            this.stagedReceives.put(kafkaChannel, new ArrayDeque());
        }
        this.stagedReceives.get(kafkaChannel).add(networkReceive);
    }

    private void addToCompletedReceives() {
        if (this.stagedReceives.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> it = this.stagedReceives.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<KafkaChannel, Deque<NetworkReceive>> next = it.next();
            KafkaChannel key = next.getKey();
            if (!key.isMute()) {
                Deque<NetworkReceive> value = next.getValue();
                addToCompletedReceives(key, value);
                if (value.isEmpty()) {
                    it.remove();
                }
            }
        }
    }

    private void addToCompletedReceives(KafkaChannel kafkaChannel, Deque<NetworkReceive> deque) {
        NetworkReceive poll = deque.poll();
        this.completedReceives.add(poll);
        this.sensors.recordBytesReceived(kafkaChannel.id(), poll.payload().limit());
    }
}
