package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.class */
public abstract class BufferWritingResultPartition extends ResultPartition {
    protected final ResultSubpartition[] subpartitions;
    private final BufferBuilder[] unicastBufferBuilders;
    private BufferBuilder broadcastBufferBuilder;
    private TimerGauge hardBackPressuredTimeMsPerSecond;
    private long totalWrittenBytes;

    public BufferWritingResultPartition(String str, int i, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, ResultSubpartition[] resultSubpartitionArr, int i2, ResultPartitionManager resultPartitionManager, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> supplierWithException) {
        super(str, i, resultPartitionID, resultPartitionType, resultSubpartitionArr.length, i2, resultPartitionManager, bufferCompressor, supplierWithException);
        this.hardBackPressuredTimeMsPerSecond = new TimerGauge();
        this.subpartitions = (ResultSubpartition[]) Preconditions.checkNotNull(resultSubpartitionArr);
        this.unicastBufferBuilders = new BufferBuilder[resultSubpartitionArr.length];
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void setupInternal() throws IOException {
        Preconditions.checkState(this.bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(), "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers() {
        int i = 0;
        for (ResultSubpartition resultSubpartition : this.subpartitions) {
            i += resultSubpartition.unsynchronizedGetNumberOfQueuedBuffers();
        }
        return i;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public long getSizeOfQueuedBuffersUnsafe() {
        long j = 0;
        for (ResultSubpartition resultSubpartition : this.subpartitions) {
            j += Math.max(0L, resultSubpartition.getTotalNumberOfBytesUnsafe());
        }
        return this.totalWrittenBytes - j;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers(int i) {
        Preconditions.checkArgument(i >= 0 && i < this.numSubpartitions);
        return this.subpartitions[i].unsynchronizedGetNumberOfQueuedBuffers();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void flushSubpartition(int i, boolean z) {
        if (z) {
            finishBroadcastBufferBuilder();
            finishUnicastBufferBuilder(i);
        }
        this.subpartitions[i].flush();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void flushAllSubpartitions(boolean z) {
        if (z) {
            finishBroadcastBufferBuilder();
            finishUnicastBufferBuilders();
        }
        for (ResultSubpartition resultSubpartition : this.subpartitions) {
            resultSubpartition.flush();
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(ByteBuffer byteBuffer, int i) throws IOException {
        BufferBuilder bufferBuilder;
        this.totalWrittenBytes += byteBuffer.remaining();
        BufferBuilder appendUnicastDataForNewRecord = appendUnicastDataForNewRecord(byteBuffer, i);
        while (true) {
            bufferBuilder = appendUnicastDataForNewRecord;
            if (!byteBuffer.hasRemaining()) {
                break;
            }
            finishUnicastBufferBuilder(i);
            appendUnicastDataForNewRecord = appendUnicastDataForRecordContinuation(byteBuffer, i);
        }
        if (bufferBuilder.isFull()) {
            finishUnicastBufferBuilder(i);
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastRecord(ByteBuffer byteBuffer) throws IOException {
        BufferBuilder bufferBuilder;
        this.totalWrittenBytes += byteBuffer.remaining() * this.numSubpartitions;
        BufferBuilder appendBroadcastDataForNewRecord = appendBroadcastDataForNewRecord(byteBuffer);
        while (true) {
            bufferBuilder = appendBroadcastDataForNewRecord;
            if (!byteBuffer.hasRemaining()) {
                break;
            }
            finishBroadcastBufferBuilder();
            appendBroadcastDataForNewRecord = appendBroadcastDataForRecordContinuation(byteBuffer);
        }
        if (bufferBuilder.isFull()) {
            finishBroadcastBufferBuilder();
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        checkInProduceState();
        finishBroadcastBufferBuilder();
        finishUnicastBufferBuilders();
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(abstractEvent, z);
        Throwable th = null;
        try {
            this.totalWrittenBytes += bufferConsumer.getWrittenBytes() * this.numSubpartitions;
            for (ResultSubpartition resultSubpartition : this.subpartitions) {
                resultSubpartition.add(bufferConsumer.copy(), 0);
            }
            if (bufferConsumer != null) {
                if (0 == 0) {
                    bufferConsumer.close();
                    return;
                }
                try {
                    bufferConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (bufferConsumer != null) {
                if (0 != 0) {
                    try {
                        bufferConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void alignedBarrierTimeout(long j) throws IOException {
        for (ResultSubpartition resultSubpartition : this.subpartitions) {
            resultSubpartition.alignedBarrierTimeout(j);
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void abortCheckpoint(long j, CheckpointException checkpointException) {
        for (ResultSubpartition resultSubpartition : this.subpartitions) {
            resultSubpartition.abortCheckpoint(j, checkpointException);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void setMetricGroup(TaskIOMetricGroup taskIOMetricGroup) {
        super.setMetricGroup(taskIOMetricGroup);
        this.hardBackPressuredTimeMsPerSecond = taskIOMetricGroup.getHardBackPressuredTimePerSecond();
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public ResultSubpartitionView createSubpartitionView(int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        Preconditions.checkElementIndex(i, this.numSubpartitions, "Subpartition not found.");
        Preconditions.checkState(!isReleased(), "Partition released.");
        ResultSubpartitionView createReadView = this.subpartitions[i].createReadView(bufferAvailabilityListener);
        LOG.debug("Created {}", createReadView);
        return createReadView;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void finish() throws IOException {
        finishBroadcastBufferBuilder();
        finishUnicastBufferBuilders();
        int length = this.subpartitions.length;
        for (int i = 0; i < length; i++) {
            this.totalWrittenBytes += r0[i].finish();
        }
        super.finish();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void releaseInternal() {
        for (ResultSubpartition resultSubpartition : this.subpartitions) {
            try {
                resultSubpartition.release();
            } catch (Throwable th) {
                LOG.error("Error during release of result subpartition: " + th.getMessage(), th);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter, java.lang.AutoCloseable
    public void close() {
        if (this.broadcastBufferBuilder != null) {
            this.broadcastBufferBuilder.close();
            this.broadcastBufferBuilder = null;
        }
        for (int i = 0; i < this.unicastBufferBuilders.length; i++) {
            if (this.unicastBufferBuilders[i] != null) {
                this.unicastBufferBuilders[i].close();
                this.unicastBufferBuilders[i] = null;
            }
        }
        super.close();
    }

    private BufferBuilder appendUnicastDataForNewRecord(ByteBuffer byteBuffer, int i) throws IOException {
        if (i < 0 || i > this.unicastBufferBuilders.length) {
            throw new ArrayIndexOutOfBoundsException(i);
        }
        BufferBuilder bufferBuilder = this.unicastBufferBuilders[i];
        if (bufferBuilder == null) {
            bufferBuilder = requestNewUnicastBufferBuilder(i);
            addToSubpartition(bufferBuilder, i, 0, byteBuffer.remaining());
        }
        bufferBuilder.appendAndCommit(byteBuffer);
        return bufferBuilder;
    }

    private void addToSubpartition(BufferBuilder bufferBuilder, int i, int i2, int i3) throws IOException {
        resizeBuffer(bufferBuilder, this.subpartitions[i].add(bufferBuilder.createBufferConsumerFromBeginning(), i2), i3);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int addToSubpartition(int i, BufferConsumer bufferConsumer, int i2) throws IOException {
        this.totalWrittenBytes += bufferConsumer.getWrittenBytes();
        return this.subpartitions[i].add(bufferConsumer, i2);
    }

    private void resizeBuffer(BufferBuilder bufferBuilder, int i, int i2) {
        if (i > 0) {
            bufferBuilder.trim(Math.max(i2, i));
        }
    }

    private BufferBuilder appendUnicastDataForRecordContinuation(ByteBuffer byteBuffer, int i) throws IOException {
        BufferBuilder requestNewUnicastBufferBuilder = requestNewUnicastBufferBuilder(i);
        int appendAndCommit = requestNewUnicastBufferBuilder.appendAndCommit(byteBuffer);
        addToSubpartition(requestNewUnicastBufferBuilder, i, appendAndCommit, appendAndCommit);
        return requestNewUnicastBufferBuilder;
    }

    private BufferBuilder appendBroadcastDataForNewRecord(ByteBuffer byteBuffer) throws IOException {
        BufferBuilder bufferBuilder = this.broadcastBufferBuilder;
        if (bufferBuilder == null) {
            bufferBuilder = requestNewBroadcastBufferBuilder();
            createBroadcastBufferConsumers(bufferBuilder, 0, byteBuffer.remaining());
        }
        bufferBuilder.appendAndCommit(byteBuffer);
        return bufferBuilder;
    }

    private BufferBuilder appendBroadcastDataForRecordContinuation(ByteBuffer byteBuffer) throws IOException {
        BufferBuilder requestNewBroadcastBufferBuilder = requestNewBroadcastBufferBuilder();
        int appendAndCommit = requestNewBroadcastBufferBuilder.appendAndCommit(byteBuffer);
        createBroadcastBufferConsumers(requestNewBroadcastBufferBuilder, appendAndCommit, appendAndCommit);
        return requestNewBroadcastBufferBuilder;
    }

    private void createBroadcastBufferConsumers(BufferBuilder bufferBuilder, int i, int i2) throws IOException {
        BufferConsumer createBufferConsumerFromBeginning = bufferBuilder.createBufferConsumerFromBeginning();
        Throwable th = null;
        try {
            int i3 = Integer.MAX_VALUE;
            for (ResultSubpartition resultSubpartition : this.subpartitions) {
                int add = resultSubpartition.add(createBufferConsumerFromBeginning.copy(), i);
                if (add != -1) {
                    i3 = Math.min(i3, add);
                }
            }
            resizeBuffer(bufferBuilder, i3, i2);
            if (createBufferConsumerFromBeginning != null) {
                if (0 == 0) {
                    createBufferConsumerFromBeginning.close();
                    return;
                }
                try {
                    createBufferConsumerFromBeginning.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createBufferConsumerFromBeginning != null) {
                if (0 != 0) {
                    try {
                        createBufferConsumerFromBeginning.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createBufferConsumerFromBeginning.close();
                }
            }
            throw th3;
        }
    }

    private BufferBuilder requestNewUnicastBufferBuilder(int i) throws IOException {
        checkInProduceState();
        ensureUnicastMode();
        BufferBuilder requestNewBufferBuilderFromPool = requestNewBufferBuilderFromPool(i);
        this.unicastBufferBuilders[i] = requestNewBufferBuilderFromPool;
        return requestNewBufferBuilderFromPool;
    }

    private BufferBuilder requestNewBroadcastBufferBuilder() throws IOException {
        checkInProduceState();
        ensureBroadcastMode();
        BufferBuilder requestNewBufferBuilderFromPool = requestNewBufferBuilderFromPool(0);
        this.broadcastBufferBuilder = requestNewBufferBuilderFromPool;
        return requestNewBufferBuilderFromPool;
    }

    private BufferBuilder requestNewBufferBuilderFromPool(int i) throws IOException {
        BufferBuilder requestBufferBuilder = this.bufferPool.requestBufferBuilder(i);
        if (requestBufferBuilder != null) {
            return requestBufferBuilder;
        }
        this.hardBackPressuredTimeMsPerSecond.markStart();
        try {
            BufferBuilder requestBufferBuilderBlocking = this.bufferPool.requestBufferBuilderBlocking(i);
            this.hardBackPressuredTimeMsPerSecond.markEnd();
            return requestBufferBuilderBlocking;
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for buffer");
        }
    }

    private void finishUnicastBufferBuilder(int i) {
        BufferBuilder bufferBuilder = this.unicastBufferBuilders[i];
        if (bufferBuilder != null) {
            int finish = bufferBuilder.finish();
            this.resultPartitionBytes.inc(i, finish);
            this.numBytesOut.inc(finish);
            this.numBuffersOut.inc();
            this.unicastBufferBuilders[i] = null;
            bufferBuilder.close();
        }
    }

    private void finishUnicastBufferBuilders() {
        for (int i = 0; i < this.numSubpartitions; i++) {
            finishUnicastBufferBuilder(i);
        }
    }

    private void finishBroadcastBufferBuilder() {
        if (this.broadcastBufferBuilder != null) {
            this.resultPartitionBytes.incAll(this.broadcastBufferBuilder.finish());
            this.numBytesOut.inc(r0 * this.numSubpartitions);
            this.numBuffersOut.inc(this.numSubpartitions);
            this.broadcastBufferBuilder.close();
            this.broadcastBufferBuilder = null;
        }
    }

    private void ensureUnicastMode() {
        finishBroadcastBufferBuilder();
    }

    private void ensureBroadcastMode() {
        finishUnicastBufferBuilders();
    }

    @VisibleForTesting
    public TimerGauge getHardBackPressuredTimeMsPerSecond() {
        return this.hardBackPressuredTimeMsPerSecond;
    }

    @VisibleForTesting
    public ResultSubpartition[] getAllPartitions() {
        return this.subpartitions;
    }
}
