package org.apache.flink.connector.mongodb.source.enumerator.assigner;

import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitters;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoScanSplitAssigner.class */
public class MongoScanSplitAssigner implements MongoSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MongoScanSplitAssigner.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoReadOptions readOptions;
    private final LinkedList<String> remainingCollections;
    private final List<String> alreadyProcessedCollections;
    private final LinkedList<MongoScanSourceSplit> remainingScanSplits;
    private final Map<String, MongoScanSourceSplit> assignedScanSplits;
    private boolean initialized;
    private MongoClient mongoClient;

    public MongoScanSplitAssigner(MongoConnectionOptions mongoConnectionOptions, MongoReadOptions mongoReadOptions, MongoSourceEnumState mongoSourceEnumState) {
        this.connectionOptions = mongoConnectionOptions;
        this.readOptions = mongoReadOptions;
        this.remainingCollections = new LinkedList<>(mongoSourceEnumState.getRemainingCollections());
        this.alreadyProcessedCollections = mongoSourceEnumState.getAlreadyProcessedCollections();
        this.remainingScanSplits = new LinkedList<>(mongoSourceEnumState.getRemainingScanSplits());
        this.assignedScanSplits = mongoSourceEnumState.getAssignedScanSplits();
        this.initialized = mongoSourceEnumState.isInitialized();
    }

    @Override // org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner
    public void open() {
        LOG.info("Mongo scan split assigner is opening.");
        if (this.initialized) {
            return;
        }
        this.remainingCollections.add(String.format("%s.%s", this.connectionOptions.getDatabase(), this.connectionOptions.getCollection()));
        this.mongoClient = MongoClients.create(this.connectionOptions.getUri());
        this.initialized = true;
    }

    @Override // org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner
    public Optional<MongoSourceSplit> getNext() {
        if (!this.remainingScanSplits.isEmpty()) {
            MongoScanSourceSplit poll = this.remainingScanSplits.poll();
            this.assignedScanSplits.put(poll.splitId(), poll);
            return Optional.of(poll);
        }
        String poll2 = this.remainingCollections.poll();
        if (poll2 == null) {
            return Optional.empty();
        }
        this.remainingScanSplits.addAll(MongoSplitters.split(this.mongoClient, this.readOptions, new MongoNamespace(poll2)));
        this.alreadyProcessedCollections.add(poll2);
        return getNext();
    }

    @Override // org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner
    public void addSplitsBack(Collection<MongoSourceSplit> collection) {
        for (MongoSourceSplit mongoSourceSplit : collection) {
            if (mongoSourceSplit instanceof MongoScanSourceSplit) {
                this.remainingScanSplits.add((MongoScanSourceSplit) mongoSourceSplit);
                this.assignedScanSplits.remove(mongoSourceSplit.splitId());
            }
        }
    }

    @Override // org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner
    public MongoSourceEnumState snapshotState(long j) {
        return new MongoSourceEnumState(this.remainingCollections, this.alreadyProcessedCollections, this.remainingScanSplits, this.assignedScanSplits, this.initialized);
    }

    @Override // org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner
    public boolean noMoreSplits() {
        Preconditions.checkState(this.initialized, "The noMoreSplits method was called but not initialized.");
        return this.remainingCollections.isEmpty() && this.remainingScanSplits.isEmpty();
    }

    @Override // org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner
    public void close() throws IOException {
        if (this.mongoClient != null) {
            this.mongoClient.close();
            LOG.info("Mongo scan split assigner is closed.");
        }
    }
}
