/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReaderImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriterImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayloadManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.util.Preconditions;

public class TieredStorageNettyServiceImpl
implements TieredStorageNettyService {
    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>> registeredServiceProducers = new ConcurrentHashMap<TieredStoragePartitionId, List<NettyServiceProducer>>();
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, List<NettyConnectionReaderRegistration>>> nettyConnectionReaderRegistrations = new HashMap<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, List<NettyConnectionReaderRegistration>>>();
    private final TieredStorageResourceRegistry resourceRegistry;

    public TieredStorageNettyServiceImpl(TieredStorageResourceRegistry resourceRegistry) {
        this.resourceRegistry = resourceRegistry;
    }

    @Override
    public void registerProducer(TieredStoragePartitionId partitionId, NettyServiceProducer serviceProducer) {
        this.registeredServiceProducers.computeIfAbsent(partitionId, ignore -> {
            TieredStoragePartitionId id = partitionId;
            this.resourceRegistry.registerResource(id, () -> this.registeredServiceProducers.remove(id));
            return new ArrayList();
        }).add(serviceProducer);
    }

    @Override
    public CompletableFuture<NettyConnectionReader> registerConsumer(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
        List<NettyConnectionReaderRegistration> registrations = this.getReaderRegistration(partitionId, subpartitionId);
        for (NettyConnectionReaderRegistration registration : registrations) {
            Optional<CompletableFuture<NettyConnectionReader>> futureOpt = registration.trySetConsumer();
            if (!futureOpt.isPresent()) continue;
            return futureOpt.get();
        }
        NettyConnectionReaderRegistration registration = new NettyConnectionReaderRegistration();
        registrations.add(registration);
        return registration.trySetConsumer().get();
    }

    public ResultSubpartitionView createResultSubpartitionView(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, BufferAvailabilityListener availabilityListener) {
        List<NettyServiceProducer> serviceProducers = this.registeredServiceProducers.get(partitionId);
        if (serviceProducers == null) {
            return new TieredStorageResultSubpartitionView(availabilityListener, new ArrayList<NettyPayloadManager>(), new ArrayList<NettyConnectionId>(), new ArrayList<NettyServiceProducer>());
        }
        ArrayList<NettyPayloadManager> nettyPayloadManagers = new ArrayList<NettyPayloadManager>();
        ArrayList<NettyConnectionId> nettyConnectionIds = new ArrayList<NettyConnectionId>();
        for (NettyServiceProducer serviceProducer : serviceProducers) {
            NettyPayloadManager nettyPayloadManager = new NettyPayloadManager();
            NettyConnectionWriterImpl writer = new NettyConnectionWriterImpl(nettyPayloadManager, availabilityListener);
            serviceProducer.connectionEstablished(subpartitionId, writer);
            nettyConnectionIds.add(writer.getNettyConnectionId());
            nettyPayloadManagers.add(nettyPayloadManager);
        }
        return new TieredStorageResultSubpartitionView(availabilityListener, nettyPayloadManagers, nettyConnectionIds, this.registeredServiceProducers.get(partitionId));
    }

    public void setupInputChannels(List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, List<Supplier<InputChannel>> inputChannelProviders) {
        Preconditions.checkState(tieredStorageConsumerSpecs.size() == inputChannelProviders.size());
        for (int index = 0; index < tieredStorageConsumerSpecs.size(); ++index) {
            this.setupInputChannel(index, tieredStorageConsumerSpecs.get(index).getPartitionId(), tieredStorageConsumerSpecs.get(index).getSubpartitionId(), inputChannelProviders.get(index));
        }
    }

    private void setupInputChannel(int index, TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, Supplier<InputChannel> inputChannelProvider) {
        List<NettyConnectionReaderRegistration> registrations = this.getReaderRegistration(partitionId, subpartitionId);
        boolean hasSetChannel = false;
        for (NettyConnectionReaderRegistration registration : registrations) {
            if (!registration.trySetChannel(index, inputChannelProvider)) continue;
            hasSetChannel = true;
        }
        if (hasSetChannel) {
            this.removeRegistration(partitionId, subpartitionId);
            return;
        }
        NettyConnectionReaderRegistration registration = new NettyConnectionReaderRegistration();
        registration.trySetChannel(index, inputChannelProvider);
        registrations.add(registration);
    }

    private void removeRegistration(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
        Map<TieredStorageSubpartitionId, List<NettyConnectionReaderRegistration>> subpartitionRegistrations = this.nettyConnectionReaderRegistrations.get(partitionId);
        subpartitionRegistrations.remove(subpartitionId);
        if (subpartitionRegistrations.isEmpty()) {
            this.nettyConnectionReaderRegistrations.remove(partitionId);
        }
    }

    private List<NettyConnectionReaderRegistration> getReaderRegistration(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
        return this.nettyConnectionReaderRegistrations.computeIfAbsent(partitionId, ignore -> new HashMap()).computeIfAbsent(subpartitionId, ignore -> new ArrayList());
    }

    private static class NettyConnectionReaderRegistration {
        private int channelIndex = -1;
        @Nullable
        private Supplier<InputChannel> channelSupplier;
        @Nullable
        private CompletableFuture<NettyConnectionReader> readerFuture;

        private NettyConnectionReaderRegistration() {
        }

        public boolean trySetChannel(int channelIndex, Supplier<InputChannel> channelSupplier) {
            if (this.isChannelSet()) {
                return false;
            }
            Preconditions.checkArgument(channelIndex >= 0);
            this.channelIndex = channelIndex;
            this.channelSupplier = Preconditions.checkNotNull(channelSupplier);
            this.tryCreateNettyConnectionReader();
            return true;
        }

        public Optional<CompletableFuture<NettyConnectionReader>> trySetConsumer() {
            if (!this.isReaderSet()) {
                this.readerFuture = new CompletableFuture();
                return Optional.of(this.readerFuture);
            }
            this.tryCreateNettyConnectionReader();
            return Optional.empty();
        }

        void tryCreateNettyConnectionReader() {
            if (this.isChannelSet() && this.isReaderSet()) {
                this.readerFuture.complete(new NettyConnectionReaderImpl(this.channelSupplier));
            }
        }

        private boolean isChannelSet() {
            return this.channelIndex >= 0;
        }

        private boolean isReaderSet() {
            return this.readerFuture != null;
        }
    }
}

