/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerMetricUpdate;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;

public class TieredResultPartition
extends ResultPartition {
    private final TieredStoragePartitionId partitionId;
    private final TieredStorageProducerClient tieredStorageProducerClient;
    private final TieredStorageResourceRegistry tieredStorageResourceRegistry;
    private final TieredStorageNettyServiceImpl nettyService;
    private final List<TieredStorageMemorySpec> tieredStorageMemorySpecs;
    private final TieredStorageMemoryManager storageMemoryManager;
    private boolean hasNotifiedEndOfUserRecords;

    public TieredResultPartition(String owningTaskName, int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, int numSubpartitions, int numTargetKeyGroups, ResultPartitionManager partitionManager, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, TieredStorageProducerClient tieredStorageProducerClient, TieredStorageResourceRegistry tieredStorageResourceRegistry, TieredStorageNettyServiceImpl nettyService, List<TieredStorageMemorySpec> tieredStorageMemorySpecs, TieredStorageMemoryManager storageMemoryManager) {
        super(owningTaskName, partitionIndex, partitionId, partitionType, numSubpartitions, numTargetKeyGroups, partitionManager, bufferCompressor, bufferPoolFactory);
        this.partitionId = TieredStorageIdMappingUtils.convertId(partitionId);
        this.tieredStorageProducerClient = tieredStorageProducerClient;
        this.tieredStorageResourceRegistry = tieredStorageResourceRegistry;
        this.nettyService = nettyService;
        this.tieredStorageMemorySpecs = tieredStorageMemorySpecs;
        this.storageMemoryManager = storageMemoryManager;
    }

    @Override
    protected void setupInternal() throws IOException {
        if (this.isReleased()) {
            throw new IOException("Result partition has been released.");
        }
        this.storageMemoryManager.setup(this.bufferPool, this.tieredStorageMemorySpecs);
        this.tieredStorageResourceRegistry.registerResource(this.partitionId, this.storageMemoryManager::release);
    }

    @Override
    public void setMetricGroup(TaskIOMetricGroup metrics) {
        super.setMetricGroup(metrics);
        this.storageMemoryManager.setMetricGroup(metrics);
        this.tieredStorageProducerClient.setMetricStatisticsUpdater(this::updateProducerMetricStatistics);
    }

    @Override
    public void emitRecord(ByteBuffer record, int consumerId) throws IOException {
        this.resultPartitionBytes.inc(consumerId, record.remaining());
        this.emit(record, consumerId, Buffer.DataType.DATA_BUFFER, false);
    }

    @Override
    public void broadcastRecord(ByteBuffer record) throws IOException {
        this.resultPartitionBytes.incAll(record.remaining());
        this.broadcast(record, Buffer.DataType.DATA_BUFFER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
        try {
            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
            this.broadcast(serializedEvent, buffer.getDataType());
        }
        finally {
            buffer.recycleBuffer();
        }
    }

    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException {
        this.checkInProduceState();
        this.emit(record, 0, dataType, true);
    }

    private void emit(ByteBuffer record, int consumerId, Buffer.DataType dataType, boolean isBroadcast) throws IOException {
        this.tieredStorageProducerClient.write(record, TieredStorageIdMappingUtils.convertId(consumerId), dataType, isBroadcast);
    }

    private void updateProducerMetricStatistics(TieredStorageProducerMetricUpdate metricStatistics) {
        this.numBuffersOut.inc((long)metricStatistics.numWriteBuffersDelta());
        this.numBytesOut.inc((long)metricStatistics.numWriteBytesDelta());
    }

    @Override
    public ResultSubpartitionView createSubpartitionView(int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException {
        Preconditions.checkState(!this.isReleased(), "ResultPartition already released.");
        return this.nettyService.createResultSubpartitionView(this.partitionId, new TieredStorageSubpartitionId(subpartitionId), availabilityListener);
    }

    @Override
    public void finish() throws IOException {
        this.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        this.tieredStorageProducerClient.close();
        Preconditions.checkState(!this.isReleased(), "Result partition is already released.");
        super.finish();
    }

    @Override
    public void close() {
        super.close();
    }

    @Override
    protected void releaseInternal() {
        this.tieredStorageResourceRegistry.clearResourceFor(this.partitionId);
    }

    @Override
    public void notifyEndOfData(StopMode mode) throws IOException {
        if (!this.hasNotifiedEndOfUserRecords) {
            this.broadcastEvent(new EndOfData(mode), false);
            this.hasNotifiedEndOfUserRecords = true;
        }
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return AVAILABLE;
    }

    @Override
    public void alignedBarrierTimeout(long checkpointId) throws IOException {
    }

    @Override
    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
    }

    @Override
    public void flushAll() {
    }

    @Override
    public void flush(int subpartitionIndex) {
    }

    @Override
    public CompletableFuture<Void> getAllDataProcessedFuture() {
        return FutureUtils.completedVoidFuture();
    }

    @Override
    public void onSubpartitionAllDataProcessed(int subpartition) {
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public long getSizeOfQueuedBuffersUnsafe() {
        return 0L;
    }

    @Override
    public int getNumberOfQueuedBuffers(int targetSubpartition) {
        return 0;
    }
}

