package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/HashBasedDataBuffer.class */
public class HashBasedDataBuffer implements DataBuffer {
    private final BufferPool bufferPool;
    private final int numGuaranteedBuffers;
    private final ArrayDeque<BufferConsumer>[] buffers;
    private long numTotalBytes;
    private long numTotalRecords;
    private boolean isFull;
    private boolean isFinished;
    private boolean isReleased;
    private final BufferBuilder[] builders;
    private int numBuffersOccupied;
    private int readOrderIndex;
    private final int[] subpartitionReadOrder;
    private long numTotalBytesRead;

    public HashBasedDataBuffer(BufferPool bufferPool, int i, int i2, @Nullable int[] iArr) {
        Preconditions.checkArgument(i2 > 0, "No guaranteed buffers for sort.");
        this.bufferPool = (BufferPool) Preconditions.checkNotNull(bufferPool);
        this.numGuaranteedBuffers = i2;
        this.builders = new BufferBuilder[i];
        this.buffers = new ArrayDeque[i];
        for (int i3 = 0; i3 < i; i3++) {
            this.buffers[i3] = new ArrayDeque<>();
        }
        this.subpartitionReadOrder = new int[i];
        if (iArr != null) {
            Preconditions.checkArgument(iArr.length == i, "Illegal data read order.");
            System.arraycopy(iArr, 0, this.subpartitionReadOrder, 0, i);
        } else {
            for (int i4 = 0; i4 < i; i4++) {
                this.subpartitionReadOrder[i4] = i4;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean append(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        Preconditions.checkArgument(byteBuffer.hasRemaining(), "Cannot append empty data.");
        Preconditions.checkState(!this.isFull, "Sort buffer is already full.");
        Preconditions.checkState(!this.isFinished, "Sort buffer is already finished.");
        Preconditions.checkState(!this.isReleased, "Sort buffer is already released.");
        int remaining = byteBuffer.remaining();
        if (dataType.isBuffer()) {
            writeRecord(byteBuffer, i);
        } else {
            writeEvent(byteBuffer, i, dataType);
        }
        this.isFull = byteBuffer.hasRemaining();
        if (!this.isFull) {
            this.numTotalRecords++;
        }
        this.numTotalBytes += remaining - byteBuffer.remaining();
        return this.isFull;
    }

    private void writeEvent(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) {
        BufferBuilder bufferBuilder = this.builders[i];
        if (bufferBuilder != null) {
            bufferBuilder.finish();
            this.buffers[i].add(bufferBuilder.createBufferConsumerFromBeginning());
            bufferBuilder.close();
            this.builders[i] = null;
        }
        MemorySegment allocateUnpooledOffHeapMemory = MemorySegmentFactory.allocateUnpooledOffHeapMemory(byteBuffer.remaining());
        allocateUnpooledOffHeapMemory.put(0, byteBuffer, allocateUnpooledOffHeapMemory.size());
        this.buffers[i].add(new BufferConsumer(new NetworkBuffer(allocateUnpooledOffHeapMemory, FreeingBufferRecycler.INSTANCE, dataType), allocateUnpooledOffHeapMemory.size()));
    }

    private void writeRecord(ByteBuffer byteBuffer, int i) throws IOException {
        do {
            BufferBuilder bufferBuilder = this.builders[i];
            if (bufferBuilder == null) {
                bufferBuilder = requestBufferFromPool();
                if (bufferBuilder == null) {
                    return;
                }
                this.numBuffersOccupied++;
                this.builders[i] = bufferBuilder;
            }
            bufferBuilder.append(byteBuffer);
            if (bufferBuilder.isFull()) {
                bufferBuilder.finish();
                this.buffers[i].add(bufferBuilder.createBufferConsumerFromBeginning());
                bufferBuilder.close();
                this.builders[i] = null;
            }
        } while (byteBuffer.hasRemaining());
    }

    private BufferBuilder requestBufferFromPool() throws IOException {
        try {
            return this.numBuffersOccupied < this.numGuaranteedBuffers ? this.bufferPool.requestBufferBuilderBlocking() : this.bufferPool.requestBufferBuilder();
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while requesting buffer.", e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public BufferWithChannel getNextBuffer(MemorySegment memorySegment) {
        Preconditions.checkState(this.isFull, "Sort buffer is not ready to be read.");
        Preconditions.checkState(!this.isReleased, "Sort buffer is already released.");
        BufferWithChannel bufferWithChannel = null;
        if (!hasRemaining() || this.readOrderIndex >= this.subpartitionReadOrder.length) {
            return null;
        }
        int i = this.subpartitionReadOrder[this.readOrderIndex];
        while (bufferWithChannel == null) {
            BufferConsumer poll = this.buffers[i].poll();
            if (poll != null) {
                bufferWithChannel = new BufferWithChannel(poll.build(), i);
                this.numBuffersOccupied -= bufferWithChannel.getBuffer().isBuffer() ? 1 : 0;
                this.numTotalBytesRead += bufferWithChannel.getBuffer().readableBytes();
                poll.close();
            } else {
                int i2 = this.readOrderIndex + 1;
                this.readOrderIndex = i2;
                if (i2 >= this.subpartitionReadOrder.length) {
                    break;
                }
                i = this.subpartitionReadOrder[this.readOrderIndex];
            }
        }
        return bufferWithChannel;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public long numTotalRecords() {
        return this.numTotalRecords;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public long numTotalBytes() {
        return this.numTotalBytes;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean hasRemaining() {
        return this.numTotalBytesRead < this.numTotalBytes;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public void reset() {
        Preconditions.checkState(!this.isFinished, "Sort buffer has been finished.");
        Preconditions.checkState(!this.isReleased, "Sort buffer has been released.");
        this.isFull = false;
        this.readOrderIndex = 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public void finish() {
        Preconditions.checkState(!this.isFull, "DataBuffer must not be full.");
        Preconditions.checkState(!this.isFinished, "DataBuffer is already finished.");
        this.isFull = true;
        this.isFinished = true;
        for (int i = 0; i < this.builders.length; i++) {
            BufferBuilder bufferBuilder = this.builders[i];
            if (bufferBuilder != null) {
                bufferBuilder.finish();
                this.buffers[i].add(bufferBuilder.createBufferConsumerFromBeginning());
                bufferBuilder.close();
                this.builders[i] = null;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public void release() {
        if (this.isReleased) {
            return;
        }
        this.isReleased = true;
        for (int i = 0; i < this.builders.length; i++) {
            BufferBuilder bufferBuilder = this.builders[i];
            if (bufferBuilder != null) {
                bufferBuilder.close();
                this.builders[i] = null;
            }
        }
        for (ArrayDeque<BufferConsumer> arrayDeque : this.buffers) {
            BufferConsumer poll = arrayDeque.poll();
            while (true) {
                BufferConsumer bufferConsumer = poll;
                if (bufferConsumer != null) {
                    bufferConsumer.close();
                    poll = arrayDeque.poll();
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean isReleased() {
        return this.isReleased;
    }
}
