/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayloadManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.util.Preconditions;

public class TieredStorageResultSubpartitionView
implements ResultSubpartitionView {
    private final BufferAvailabilityListener availabilityListener;
    private final List<NettyPayloadManager> nettyPayloadManagers;
    private final List<NettyServiceProducer> serviceProducers;
    private final List<NettyConnectionId> nettyConnectionIds;
    private volatile boolean isReleased = false;
    private int requiredSegmentId = 0;
    private boolean stopSendingData = false;
    private int managerIndexContainsCurrentSegment = -1;
    private int currentSequenceNumber = -1;

    public TieredStorageResultSubpartitionView(BufferAvailabilityListener availabilityListener, List<NettyPayloadManager> nettyPayloadManagers, List<NettyConnectionId> nettyConnectionIds, List<NettyServiceProducer> serviceProducers) {
        this.availabilityListener = availabilityListener;
        this.nettyPayloadManagers = nettyPayloadManagers;
        this.nettyConnectionIds = nettyConnectionIds;
        this.serviceProducers = serviceProducers;
    }

    @Override
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException {
        if (this.stopSendingData || !this.findCurrentNettyPayloadQueue()) {
            return null;
        }
        NettyPayloadManager nettyPayloadManager = this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment);
        Optional<Buffer> nextBuffer = this.readNettyPayload(nettyPayloadManager);
        if (nextBuffer.isPresent()) {
            boolean bl = this.stopSendingData = nextBuffer.get().getDataType() == Buffer.DataType.END_OF_SEGMENT;
            if (this.stopSendingData) {
                this.managerIndexContainsCurrentSegment = -1;
            }
            ++this.currentSequenceNumber;
            return ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(nextBuffer.get(), this.getDataType(nettyPayloadManager.peek()), this.getBacklog(), this.currentSequenceNumber);
        }
        return null;
    }

    @Override
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) {
        if (this.findCurrentNettyPayloadQueue()) {
            boolean availability;
            NettyPayloadManager currentQueue = this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment);
            boolean bl = availability = numCreditsAvailable > 0;
            if (numCreditsAvailable == 0 && this.isEventOrError(currentQueue)) {
                availability = true;
            }
            return new ResultSubpartitionView.AvailabilityWithBacklog(availability, this.getBacklog());
        }
        return new ResultSubpartitionView.AvailabilityWithBacklog(false, 0);
    }

    @Override
    public void notifyRequiredSegmentId(int segmentId) {
        if (segmentId > this.requiredSegmentId) {
            this.requiredSegmentId = segmentId;
            this.stopSendingData = false;
            this.availabilityListener.notifyDataAvailable();
        }
    }

    @Override
    public void releaseAllResources() throws IOException {
        if (this.isReleased) {
            return;
        }
        this.isReleased = true;
        for (int index = 0; index < this.nettyPayloadManagers.size(); ++index) {
            this.releaseQueue(this.nettyPayloadManagers.get(index), this.serviceProducers.get(index), this.nettyConnectionIds.get(index));
        }
    }

    @Override
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override
    public Throwable getFailureCause() {
        return null;
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        if (this.findCurrentNettyPayloadQueue()) {
            return this.getBacklog();
        }
        return 0;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        if (this.findCurrentNettyPayloadQueue()) {
            return this.getBacklog();
        }
        return 0;
    }

    @Override
    public void notifyDataAvailable() {
        throw new UnsupportedOperationException("Method notifyDataAvailable should never be called.");
    }

    @Override
    public void resumeConsumption() {
        throw new UnsupportedOperationException("Method resumeConsumption should never be called.");
    }

    @Override
    public void acknowledgeAllDataProcessed() {
    }

    @Override
    public void notifyNewBufferSize(int newBufferSize) {
        throw new UnsupportedOperationException("Method notifyNewBufferSize should never be called.");
    }

    private Optional<Buffer> readNettyPayload(NettyPayloadManager nettyPayloadManager) throws IOException {
        NettyPayload nettyPayload = nettyPayloadManager.poll();
        if (nettyPayload == null) {
            return Optional.empty();
        }
        Preconditions.checkState(nettyPayload.getSegmentId() == -1);
        Optional<Throwable> error = nettyPayload.getError();
        if (error.isPresent()) {
            this.releaseAllResources();
            throw new IOException(error.get());
        }
        return nettyPayload.getBuffer();
    }

    private int getBacklog() {
        return this.managerIndexContainsCurrentSegment == -1 ? 0 : this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment).getBacklog();
    }

    private boolean isEventOrError(NettyPayloadManager nettyPayloadManager) {
        NettyPayload nettyPayload = nettyPayloadManager.peek();
        return nettyPayload != null && (nettyPayload.getError().isPresent() || nettyPayload.getBuffer().isPresent() && !nettyPayload.getBuffer().get().isBuffer());
    }

    private Buffer.DataType getDataType(NettyPayload nettyPayload) {
        if (nettyPayload == null || !nettyPayload.getBuffer().isPresent()) {
            return Buffer.DataType.NONE;
        }
        return nettyPayload.getBuffer().get().getDataType();
    }

    private void releaseQueue(NettyPayloadManager nettyPayloadManager, NettyServiceProducer serviceProducer, NettyConnectionId id) {
        NettyPayload nettyPayload;
        while ((nettyPayload = nettyPayloadManager.poll()) != null) {
            nettyPayload.getBuffer().ifPresent(Buffer::recycleBuffer);
        }
        serviceProducer.connectionBroken(id);
    }

    private boolean findCurrentNettyPayloadQueue() {
        if (this.managerIndexContainsCurrentSegment != -1 && !this.stopSendingData) {
            return true;
        }
        for (int managerIndex = 0; managerIndex < this.nettyPayloadManagers.size(); ++managerIndex) {
            NettyPayload firstNettyPayload = this.nettyPayloadManagers.get(managerIndex).peek();
            if (firstNettyPayload == null || firstNettyPayload.getSegmentId() != this.requiredSegmentId) continue;
            this.managerIndexContainsCurrentSegment = managerIndex;
            NettyPayload segmentId = this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment).poll();
            Preconditions.checkState(segmentId.getSegmentId() != -1);
            return true;
        }
        return false;
    }
}

