/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ReleaseOnConsumptionResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.MemoryArchitecture;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultPartitionFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
    private final ResultPartitionManager partitionManager;
    private final FileChannelManager channelManager;
    private final BufferPoolFactory bufferPoolFactory;
    private final BoundedBlockingSubpartitionType blockingSubpartitionType;
    private final int networkBuffersPerChannel;
    private final int floatingNetworkBuffersPerGate;
    private final int networkBufferSize;
    private final boolean forcePartitionReleaseOnConsumption;

    public ResultPartitionFactory(ResultPartitionManager partitionManager, FileChannelManager channelManager, BufferPoolFactory bufferPoolFactory, BoundedBlockingSubpartitionType blockingSubpartitionType, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate, int networkBufferSize, boolean forcePartitionReleaseOnConsumption) {
        this.partitionManager = partitionManager;
        this.channelManager = channelManager;
        this.networkBuffersPerChannel = networkBuffersPerChannel;
        this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
        this.bufferPoolFactory = bufferPoolFactory;
        this.blockingSubpartitionType = blockingSubpartitionType;
        this.networkBufferSize = networkBufferSize;
        this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
    }

    public ResultPartition create(String taskNameWithSubtaskAndId, ResultPartitionDeploymentDescriptor desc) {
        return this.create(taskNameWithSubtaskAndId, desc.getShuffleDescriptor().getResultPartitionID(), desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), this.createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
    }

    @VisibleForTesting
    public ResultPartition create(String taskNameWithSubtaskAndId, ResultPartitionID id, ResultPartitionType type, int numberOfSubpartitions, int maxParallelism, FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
        ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
        ResultPartition partition = this.forcePartitionReleaseOnConsumption || !type.isBlocking() ? new ReleaseOnConsumptionResultPartition(taskNameWithSubtaskAndId, id, type, subpartitions, maxParallelism, this.partitionManager, bufferPoolFactory) : new ResultPartition(taskNameWithSubtaskAndId, id, type, subpartitions, maxParallelism, this.partitionManager, bufferPoolFactory);
        this.createSubpartitions(partition, type, this.blockingSubpartitionType, subpartitions);
        LOG.debug("{}: Initialized {}", (Object)taskNameWithSubtaskAndId, (Object)this);
        return partition;
    }

    private void createSubpartitions(ResultPartition partition, ResultPartitionType type, BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition[] subpartitions) {
        if (type.isBlocking()) {
            ResultPartitionFactory.initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, this.networkBufferSize, this.channelManager);
        } else {
            for (int i = 0; i < subpartitions.length; ++i) {
                subpartitions[i] = new PipelinedSubpartition(i, partition);
            }
        }
    }

    private static void initializeBoundedBlockingPartitions(ResultSubpartition[] subpartitions, ResultPartition parent, BoundedBlockingSubpartitionType blockingSubpartitionType, int networkBufferSize, FileChannelManager channelManager) {
        int i = 0;
        try {
            for (i = 0; i < subpartitions.length; ++i) {
                File spillFile = channelManager.createChannel().getPathFile();
                subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
            }
        }
        catch (IOException e) {
            ResultPartitionFactory.releasePartitionsQuietly(subpartitions, i);
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
        for (int i = 0; i < until; ++i) {
            ResultSubpartition subpartition = partitions[i];
            ExceptionUtils.suppressExceptions(subpartition::release);
        }
    }

    @VisibleForTesting
    FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory(int numberOfSubpartitions, ResultPartitionType type) {
        return p -> {
            int maxNumberOfMemorySegments = type.isBounded() ? numberOfSubpartitions * this.networkBuffersPerChannel + this.floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
            return this.bufferPoolFactory.createBufferPool(numberOfSubpartitions + 1, maxNumberOfMemorySegments, type.hasBackPressure() ? Optional.empty() : Optional.of(p));
        };
    }

    static BoundedBlockingSubpartitionType getBoundedBlockingType() {
        switch (MemoryArchitecture.get()) {
            case _64_BIT: {
                return BoundedBlockingSubpartitionType.FILE_MMAP;
            }
            case _32_BIT: {
                return BoundedBlockingSubpartitionType.FILE;
            }
        }
        LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle.");
        return BoundedBlockingSubpartitionType.FILE;
    }
}

