/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashSubpartitionBufferAccumulatorContext;
import org.apache.flink.util.Preconditions;

public class HashSubpartitionBufferAccumulator {
    private final TieredStorageSubpartitionId subpartitionId;
    private final int bufferSize;
    private final HashSubpartitionBufferAccumulatorContext bufferAccumulatorContext;
    private final Queue<BufferBuilder> unfinishedBuffers = new LinkedList<BufferBuilder>();

    public HashSubpartitionBufferAccumulator(TieredStorageSubpartitionId subpartitionId, int bufferSize, HashSubpartitionBufferAccumulatorContext bufferAccumulatorContext) {
        this.subpartitionId = subpartitionId;
        this.bufferSize = bufferSize;
        this.bufferAccumulatorContext = bufferAccumulatorContext;
    }

    public void append(ByteBuffer record, Buffer.DataType dataType) throws IOException {
        if (dataType.isEvent()) {
            this.writeEvent(record, dataType);
        } else {
            this.writeRecord(record, dataType);
        }
    }

    public void close() {
        this.finishCurrentWritingBufferIfNotEmpty();
        while (!this.unfinishedBuffers.isEmpty()) {
            this.unfinishedBuffers.poll().close();
        }
    }

    private void writeEvent(ByteBuffer event, Buffer.DataType dataType) {
        Preconditions.checkArgument(dataType.isEvent());
        this.finishCurrentWritingBufferIfNotEmpty();
        MemorySegment data = MemorySegmentFactory.wrap(event.array());
        this.flushFinishedBuffer(new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, dataType, data.size()));
    }

    private void writeRecord(ByteBuffer record, Buffer.DataType dataType) {
        Preconditions.checkArgument(!dataType.isEvent());
        this.ensureCapacityForRecord(record);
        this.writeRecord(record);
    }

    private void ensureCapacityForRecord(ByteBuffer record) {
        int numRecordBytes = record.remaining();
        for (int availableBytes = Optional.ofNullable(this.unfinishedBuffers.peek()).map(currentWritingBuffer -> currentWritingBuffer.getWritableBytes() + this.bufferSize * (this.unfinishedBuffers.size() - 1)).orElse(0).intValue(); availableBytes < numRecordBytes; availableBytes += this.bufferSize) {
            BufferBuilder bufferBuilder = this.bufferAccumulatorContext.requestBufferBlocking();
            this.unfinishedBuffers.add(bufferBuilder);
        }
    }

    private void writeRecord(ByteBuffer record) {
        while (record.hasRemaining()) {
            BufferBuilder currentWritingBuffer = Preconditions.checkNotNull(this.unfinishedBuffers.peek());
            currentWritingBuffer.append(record);
            if (!currentWritingBuffer.isFull()) continue;
            this.finishCurrentWritingBuffer();
        }
    }

    private void finishCurrentWritingBufferIfNotEmpty() {
        BufferBuilder currentWritingBuffer = this.unfinishedBuffers.peek();
        if (currentWritingBuffer == null || currentWritingBuffer.getWritableBytes() == this.bufferSize) {
            return;
        }
        this.finishCurrentWritingBuffer();
    }

    private void finishCurrentWritingBuffer() {
        BufferBuilder currentWritingBuffer = this.unfinishedBuffers.poll();
        if (currentWritingBuffer == null) {
            return;
        }
        currentWritingBuffer.finish();
        BufferConsumer bufferConsumer = currentWritingBuffer.createBufferConsumerFromBeginning();
        Buffer buffer = bufferConsumer.build();
        currentWritingBuffer.close();
        bufferConsumer.close();
        this.flushFinishedBuffer(buffer);
    }

    private void flushFinishedBuffer(Buffer finishedBuffer) {
        this.bufferAccumulatorContext.flushAccumulatedBuffers(this.subpartitionId, Collections.singletonList(finishedBuffer));
    }
}

