/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueue;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.util.Preconditions;

class CreditBasedSequenceNumberingViewReader
implements BufferAvailabilityListener,
NetworkSequenceViewReader {
    private final Object requestLock = new Object();
    private final InputChannelID receiverId;
    private final PartitionRequestQueue requestQueue;
    private final int initialCredit;
    private volatile ResultSubpartitionView subpartitionView;
    private boolean isRegisteredAsAvailable = false;
    private int numCreditsAvailable;

    CreditBasedSequenceNumberingViewReader(InputChannelID receiverId, int initialCredit, PartitionRequestQueue requestQueue) {
        Preconditions.checkArgument(initialCredit >= 0, "Must be non-negative.");
        this.receiverId = receiverId;
        this.initialCredit = initialCredit;
        this.numCreditsAvailable = initialCredit;
        this.requestQueue = requestQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestSubpartitionView(ResultPartitionProvider partitionProvider, ResultPartitionID resultPartitionId, int subPartitionIndex) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.subpartitionView != null) {
                throw new IllegalStateException("Subpartition already requested");
            }
            this.subpartitionView = partitionProvider.createSubpartitionView(resultPartitionId, subPartitionIndex, this);
        }
        this.notifyDataAvailable();
    }

    @Override
    public void addCredit(int creditDeltas) {
        this.numCreditsAvailable += creditDeltas;
    }

    @Override
    public void notifyRequiredSegmentId(int segmentId) {
        this.subpartitionView.notifyRequiredSegmentId(segmentId);
    }

    @Override
    public void resumeConsumption() {
        if (this.initialCredit == 0) {
            this.numCreditsAvailable = 0;
        }
        this.subpartitionView.resumeConsumption();
    }

    @Override
    public void acknowledgeAllRecordsProcessed() {
        this.subpartitionView.acknowledgeAllDataProcessed();
    }

    @Override
    public void setRegisteredAsAvailable(boolean isRegisteredAvailable) {
        this.isRegisteredAsAvailable = isRegisteredAvailable;
    }

    @Override
    public boolean isRegisteredAsAvailable() {
        return this.isRegisteredAsAvailable;
    }

    @Override
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog() {
        return this.subpartitionView.getAvailabilityAndBacklog(this.numCreditsAvailable);
    }

    private Buffer.DataType getNextDataType(ResultSubpartition.BufferAndBacklog bufferAndBacklog) {
        Buffer.DataType nextDataType = bufferAndBacklog.getNextDataType();
        if (this.numCreditsAvailable > 0 || nextDataType.isEvent()) {
            return nextDataType;
        }
        return Buffer.DataType.NONE;
    }

    @Override
    public InputChannelID getReceiverId() {
        return this.receiverId;
    }

    @Override
    public void notifyNewBufferSize(int newBufferSize) {
        this.subpartitionView.notifyNewBufferSize(newBufferSize);
    }

    @VisibleForTesting
    int getNumCreditsAvailable() {
        return this.numCreditsAvailable;
    }

    @VisibleForTesting
    ResultSubpartitionView.AvailabilityWithBacklog hasBuffersAvailable() {
        return this.subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE);
    }

    @Override
    @Nullable
    public InputChannel.BufferAndAvailability getNextBuffer() throws IOException {
        ResultSubpartition.BufferAndBacklog next = this.subpartitionView.getNextBuffer();
        if (next != null) {
            if (next.buffer().isBuffer() && --this.numCreditsAvailable < 0) {
                throw new IllegalStateException("no credit available");
            }
            Buffer.DataType nextDataType = this.getNextDataType(next);
            return new InputChannel.BufferAndAvailability(next.buffer(), nextDataType, next.buffersInBacklog(), next.getSequenceNumber());
        }
        return null;
    }

    @Override
    public boolean needAnnounceBacklog() {
        return this.initialCredit == 0 && this.numCreditsAvailable == 0;
    }

    @Override
    public boolean isReleased() {
        return this.subpartitionView.isReleased();
    }

    @Override
    public Throwable getFailureCause() {
        return this.subpartitionView.getFailureCause();
    }

    @Override
    public void releaseAllResources() throws IOException {
        this.subpartitionView.releaseAllResources();
    }

    @Override
    public void notifyDataAvailable() {
        this.requestQueue.notifyReaderNonEmpty(this);
    }

    @Override
    public void notifyPriorityEvent(int prioritySequenceNumber) {
        this.notifyDataAvailable();
    }

    public String toString() {
        return "CreditBasedSequenceNumberingViewReader{requestLock=" + this.requestLock + ", receiverId=" + this.receiverId + ", numCreditsAvailable=" + this.numCreditsAvailable + ", isRegisteredAsAvailable=" + this.isRegisteredAsAvailable + '}';
    }
}

