package org.kie.kogito.persistence.mongodb.storage;

import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.util.Collections;
import java.util.function.Function;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.kie.kogito.persistence.mongodb.model.ModelUtils;
import org.kie.kogito.persistence.mongodb.model.MongoEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/persistence/mongodb/storage/StorageUtils.class */
public class StorageUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StorageUtils.class);

    private StorageUtils() {
    }

    public static <V, E> Multi<V> watchCollectionEntries(MongoCollection<E> mongoCollection, Bson bson, MongoEntityMapper<V, E> mongoEntityMapper) {
        return createMulti(mongoCollection, bson, changeStreamDocument -> {
            Object fullDocument = changeStreamDocument.getFullDocument();
            if (fullDocument == null) {
                return null;
            }
            return mongoEntityMapper.mapToModel(fullDocument);
        });
    }

    public static <E> Multi<String> watchCollectionKeys(MongoCollection<E> mongoCollection, Bson bson) {
        return createMulti(mongoCollection, bson, changeStreamDocument -> {
            BsonDocument documentKey = changeStreamDocument.getDocumentKey();
            if (documentKey == null) {
                return null;
            }
            return documentKey.getString(ModelUtils.MONGO_ID).getValue();
        });
    }

    private static <T, E> Multi<T> createMulti(MongoCollection<E> mongoCollection, Bson bson, Function<ChangeStreamDocument<E>, T> function) {
        MongoChangeStreamCursor<ChangeStreamDocument<E>> cursor = mongoCollection.watch(Collections.singletonList(Aggregates.match(bson))).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
        return (Multi<T>) Multi.createFrom().emitter(multiEmitter -> {
            while (cursor.hasNext()) {
                try {
                    multiEmitter.emit(cursor.next());
                } catch (IllegalStateException e) {
                    LOGGER.warn("MongoDB cursor exception: " + e.getMessage());
                    return;
                }
            }
        }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()).onTermination().invoke(() -> {
            cursor.close();
        }).onFailure().recoverWithCompletion().onItem().transform(function);
    }
}
