package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.util.Arrays;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.class */
public class RemoteTierProducerAgent implements TierProducerAgent {
    private final int numSubpartitions;
    private final int numBuffersPerSegment;
    private final RemoteCacheManager cacheDataManager;
    private final TieredStorageMemoryManager memoryManager;
    private final int[] currentSubpartitionSegmentWriteBuffers;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteTierProducerAgent(TieredStoragePartitionId tieredStoragePartitionId, int i, int i2, int i3, boolean z, PartitionFileWriter partitionFileWriter, TieredStorageMemoryManager tieredStorageMemoryManager, TieredStorageResourceRegistry tieredStorageResourceRegistry) {
        Preconditions.checkArgument(i2 >= i3, "One segment should contain at least one buffer.");
        this.numSubpartitions = i;
        this.numBuffersPerSegment = i2 / i3;
        this.memoryManager = tieredStorageMemoryManager;
        this.cacheDataManager = new RemoteCacheManager(tieredStoragePartitionId, z ? 1 : i, tieredStorageMemoryManager, partitionFileWriter);
        this.currentSubpartitionSegmentWriteBuffers = new int[i];
        Arrays.fill(this.currentSubpartitionSegmentWriteBuffers, 0);
        tieredStorageResourceRegistry.registerResource(tieredStoragePartitionId, this::releaseAllResources);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent
    public boolean tryStartNewSegment(TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) {
        this.cacheDataManager.startSegment(tieredStorageSubpartitionId.getSubpartitionId(), i);
        return true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent
    public boolean tryWrite(TieredStorageSubpartitionId tieredStorageSubpartitionId, Buffer buffer, Object obj) {
        int subpartitionId = tieredStorageSubpartitionId.getSubpartitionId();
        if (this.currentSubpartitionSegmentWriteBuffers[subpartitionId] + 1 > this.numBuffersPerSegment) {
            this.cacheDataManager.finishSegment(subpartitionId);
            this.currentSubpartitionSegmentWriteBuffers[subpartitionId] = 0;
            return false;
        }
        if (buffer.isBuffer()) {
            this.memoryManager.transferBufferOwnership(obj, this, buffer);
        }
        int[] iArr = this.currentSubpartitionSegmentWriteBuffers;
        iArr[subpartitionId] = iArr[subpartitionId] + 1;
        this.cacheDataManager.appendBuffer(buffer, subpartitionId);
        return true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent, java.lang.AutoCloseable
    public void close() {
        for (int i = 0; i < this.numSubpartitions; i++) {
            this.cacheDataManager.finishSegment(i);
        }
        this.cacheDataManager.close();
    }

    private void releaseAllResources() {
        this.cacheDataManager.release();
    }
}
