package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.class */
public class DiskIOScheduler implements Runnable, BufferRecycler, NettyServiceProducer {
    private static final Logger LOG = LoggerFactory.getLogger(DiskIOScheduler.class);
    private final TieredStoragePartitionId partitionId;
    private final ScheduledExecutorService ioExecutor;
    private final BatchShuffleReadBufferPool bufferPool;
    private final int maxBufferReadAhead;
    private final int maxRequestedBuffers;
    private final Duration bufferRequestTimeout;
    private final BiFunction<Integer, Integer, Integer> firstBufferIndexInSegmentRetriever;
    private final PartitionFileReader partitionFileReader;

    @GuardedBy("lock")
    private boolean isRunning;

    @GuardedBy("lock")
    private int numRequestedBuffers;

    @GuardedBy("lock")
    private boolean isReleased;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<NettyConnectionId, ScheduledSubpartitionReader> allScheduledReaders = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler$ScheduledSubpartitionReader.class */
    public class ScheduledSubpartitionReader implements Comparable<ScheduledSubpartitionReader> {
        private final TieredStorageSubpartitionId subpartitionId;
        private final NettyConnectionWriter nettyConnectionWriter;
        private int nextSegmentId;
        private int nextBufferIndex;
        private long priority;
        private boolean isFailed;

        @Nullable
        private PartitionFileReader.ReadProgress readProgress;

        private ScheduledSubpartitionReader(TieredStorageSubpartitionId tieredStorageSubpartitionId, NettyConnectionWriter nettyConnectionWriter) {
            this.nextSegmentId = -1;
            this.subpartitionId = tieredStorageSubpartitionId;
            this.nettyConnectionWriter = nettyConnectionWriter;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Code restructure failed: missing block: B:30:0x007d, code lost:
        
            r11.add(r0);
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void loadDiskDataToBuffers(java.util.Queue<org.apache.flink.core.memory.MemorySegment> r11, org.apache.flink.runtime.io.network.buffer.BufferRecycler r12) throws java.io.IOException {
            /*
                Method dump skipped, instructions count: 234
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler.ScheduledSubpartitionReader.loadDiskDataToBuffers(java.util.Queue, org.apache.flink.runtime.io.network.buffer.BufferRecycler):void");
        }

        @Override // java.lang.Comparable
        public int compareTo(ScheduledSubpartitionReader scheduledSubpartitionReader) {
            Preconditions.checkArgument(scheduledSubpartitionReader != null);
            return Long.compare(getPriority(), scheduledSubpartitionReader.getPriority());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareForScheduling() {
            if (this.nextSegmentId < 0) {
                updateSegmentId();
            }
            this.priority = this.nextSegmentId < 0 ? Long.MAX_VALUE : DiskIOScheduler.this.partitionFileReader.getPriority(DiskIOScheduler.this.partitionId, this.subpartitionId, this.nextSegmentId, this.nextBufferIndex, this.readProgress);
        }

        private CompositeBuffer writeFullBuffersAndGetPartialBuffer(List<Buffer> list) {
            CompositeBuffer compositeBuffer = null;
            for (int i = 0; i < list.size(); i++) {
                Buffer buffer = list.get(i);
                if (i == list.size() - 1 && isPartialBuffer(buffer)) {
                    compositeBuffer = (CompositeBuffer) buffer;
                } else {
                    writeNettyBufferAndUpdateSegmentId(buffer);
                }
            }
            return compositeBuffer;
        }

        private boolean isPartialBuffer(Buffer buffer) {
            return (buffer instanceof CompositeBuffer) && ((CompositeBuffer) buffer).missingLength() > 0;
        }

        private void writeNettyBufferAndUpdateSegmentId(Buffer buffer) {
            int i = this.nextBufferIndex;
            this.nextBufferIndex = i + 1;
            writeToNettyConnectionWriter(NettyPayload.newBuffer(buffer, i, this.subpartitionId.getSubpartitionId()));
            if (buffer.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
                this.nextSegmentId = -1;
                updateSegmentId();
            }
        }

        private void writeToNettyConnectionWriter(NettyPayload nettyPayload) {
            this.nettyConnectionWriter.writeNettyPayload(nettyPayload);
            if (this.nettyConnectionWriter.numQueuedPayloads() <= 1 || this.nettyConnectionWriter.numQueuedBufferPayloads() <= 1) {
                notifyAvailable();
            }
        }

        private long getPriority() {
            return this.priority;
        }

        private void notifyAvailable() {
            this.nettyConnectionWriter.notifyAvailable();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void failReader(Throwable th) {
            if (this.isFailed) {
                return;
            }
            this.isFailed = true;
            this.nettyConnectionWriter.close(th);
            this.nettyConnectionWriter.notifyAvailable();
        }

        private void updateSegmentId() {
            Integer num = (Integer) DiskIOScheduler.this.firstBufferIndexInSegmentRetriever.apply(Integer.valueOf(this.subpartitionId.getSubpartitionId()), Integer.valueOf(this.nextBufferIndex));
            if (num != null) {
                this.nextSegmentId = num.intValue();
                writeToNettyConnectionWriter(NettyPayload.newSegment(num.intValue()));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public NettyConnectionId getId() {
            return this.nettyConnectionWriter.getNettyConnectionId();
        }
    }

    public DiskIOScheduler(TieredStoragePartitionId tieredStoragePartitionId, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService, int i, Duration duration, int i2, BiFunction<Integer, Integer, Integer> biFunction, PartitionFileReader partitionFileReader) {
        this.partitionId = tieredStoragePartitionId;
        this.bufferPool = (BatchShuffleReadBufferPool) Preconditions.checkNotNull(batchShuffleReadBufferPool);
        this.ioExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService);
        this.maxRequestedBuffers = i;
        this.bufferRequestTimeout = (Duration) Preconditions.checkNotNull(duration);
        this.maxBufferReadAhead = i2;
        this.firstBufferIndexInSegmentRetriever = biFunction;
        this.partitionFileReader = partitionFileReader;
        batchShuffleReadBufferPool.registerRequester(this);
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        int readBuffersFromFile = readBuffersFromFile();
        synchronized (this.lock) {
            this.numRequestedBuffers += readBuffersFromFile;
            this.isRunning = false;
        }
        if (readBuffersFromFile == 0) {
            this.ioExecutor.schedule(this::triggerScheduling, 5L, TimeUnit.MILLISECONDS);
        } else {
            triggerScheduling();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer
    public void connectionEstablished(TieredStorageSubpartitionId tieredStorageSubpartitionId, NettyConnectionWriter nettyConnectionWriter) {
        synchronized (this.lock) {
            Preconditions.checkState(!this.isReleased, "DiskIOScheduler is already released.");
            this.allScheduledReaders.put(nettyConnectionWriter.getNettyConnectionId(), new ScheduledSubpartitionReader(tieredStorageSubpartitionId, nettyConnectionWriter));
            triggerScheduling();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer
    public void connectionBroken(NettyConnectionId nettyConnectionId) {
        synchronized (this.lock) {
            this.allScheduledReaders.remove(nettyConnectionId);
        }
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        synchronized (this.lock) {
            this.bufferPool.recycle(memorySegment);
            this.numRequestedBuffers--;
            triggerScheduling();
        }
    }

    public void release() {
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            this.allScheduledReaders.clear();
            this.partitionFileReader.release();
            this.bufferPool.unregisterRequester(this);
        }
    }

    private int readBuffersFromFile() {
        List<ScheduledSubpartitionReader> sortScheduledReaders = sortScheduledReaders();
        if (sortScheduledReaders.isEmpty()) {
            return 0;
        }
        try {
            Queue<MemorySegment> allocateBuffers = allocateBuffers();
            int size = allocateBuffers.size();
            if (size <= 0) {
                return 0;
            }
            for (ScheduledSubpartitionReader scheduledSubpartitionReader : sortScheduledReaders) {
                if (allocateBuffers.isEmpty()) {
                    break;
                }
                try {
                    scheduledSubpartitionReader.loadDiskDataToBuffers(allocateBuffers, this);
                } catch (Exception e) {
                    failScheduledReaders(Collections.singletonList(scheduledSubpartitionReader), e);
                    LOG.debug("Failed to read shuffle data.", e);
                }
            }
            int size2 = size - allocateBuffers.size();
            releaseBuffers(allocateBuffers);
            return size2;
        } catch (Exception e2) {
            failScheduledReaders(sortScheduledReaders, e2);
            LOG.error("Failed to request buffers for data reading.", e2);
            return 0;
        }
    }

    private List<ScheduledSubpartitionReader> sortScheduledReaders() {
        synchronized (this.lock) {
            if (this.isReleased) {
                return new ArrayList();
            }
            ArrayList arrayList = new ArrayList(this.allScheduledReaders.values());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((ScheduledSubpartitionReader) it.next()).prepareForScheduling();
            }
            Collections.sort(arrayList);
            return arrayList;
        }
    }

    private Queue<MemorySegment> allocateBuffers() throws Exception {
        long bufferRequestTimeoutTime = getBufferRequestTimeoutTime();
        while (true) {
            List<MemorySegment> requestBuffers = this.bufferPool.requestBuffers();
            if (!requestBuffers.isEmpty()) {
                return new ArrayDeque(requestBuffers);
            }
            synchronized (this.lock) {
                if (this.isReleased) {
                    return new ArrayDeque();
                }
                if (System.currentTimeMillis() >= bufferRequestTimeoutTime) {
                    long currentTimeMillis = System.currentTimeMillis();
                    bufferRequestTimeoutTime = getBufferRequestTimeoutTime();
                    if (currentTimeMillis >= currentTimeMillis) {
                        throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
                    }
                }
            }
        }
    }

    private void failScheduledReaders(List<ScheduledSubpartitionReader> list, Throwable th) {
        for (ScheduledSubpartitionReader scheduledSubpartitionReader : list) {
            synchronized (this.lock) {
                this.allScheduledReaders.remove(scheduledSubpartitionReader.getId());
            }
            scheduledSubpartitionReader.failReader(th);
        }
    }

    private void releaseBuffers(Queue<MemorySegment> queue) {
        if (queue.isEmpty()) {
            return;
        }
        try {
            this.bufferPool.recycle(queue);
            queue.clear();
        } catch (Throwable th) {
            FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th);
        }
    }

    private void triggerScheduling() {
        synchronized (this.lock) {
            if (!this.isRunning && !this.allScheduledReaders.isEmpty() && this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() <= this.maxRequestedBuffers && this.numRequestedBuffers < this.bufferPool.getAverageBuffersPerRequester()) {
                this.isRunning = true;
                this.ioExecutor.execute(() -> {
                    try {
                        run();
                    } catch (Throwable th) {
                        LOG.error("Failed to read data.", th);
                        FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th);
                    }
                });
            }
        }
    }

    private long getBufferRequestTimeoutTime() {
        return this.bufferPool.getLastBufferOperationTimestamp() + this.bufferRequestTimeout.toMillis();
    }
}
