package org.kie.kogito.persistence.mongodb.storage;

import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.MongoCollection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.kie.kogito.persistence.mongodb.model.ModelUtils;
import org.kie.kogito.persistence.mongodb.model.MongoEntityMapper;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/kie/kogito/persistence/mongodb/storage/StorageUtils.class */
public class StorageUtils {

    /* loaded from: input_file:org/kie/kogito/persistence/mongodb/storage/StorageUtils$ObjectListenerSubscriber.class */
    private static class ObjectListenerSubscriber<V, E> implements Subscriber<ChangeStreamDocument<Document>> {
        Subscription subscription;
        BiConsumer<String, V> consumer;
        MongoEntityMapper<V, E> mongoEntityMapper;

        ObjectListenerSubscriber(BiConsumer<String, V> biConsumer, MongoEntityMapper<V, E> mongoEntityMapper) {
            this.consumer = biConsumer;
            this.mongoEntityMapper = mongoEntityMapper;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(Long.MAX_VALUE);
        }

        public void onNext(ChangeStreamDocument<Document> changeStreamDocument) {
            this.consumer.accept((String) Optional.ofNullable(changeStreamDocument.getDocumentKey()).map(bsonDocument -> {
                return bsonDocument.getString(ModelUtils.MONGO_ID).getValue();
            }).orElse(null), Optional.ofNullable((Document) changeStreamDocument.getFullDocument()).map(document -> {
                MongoEntityMapper<V, E> mongoEntityMapper = this.mongoEntityMapper;
                Class<E> entityClass = this.mongoEntityMapper.getEntityClass();
                MongoEntityMapper<V, E> mongoEntityMapper2 = this.mongoEntityMapper;
                Objects.requireNonNull(mongoEntityMapper2);
                return mongoEntityMapper.mapToModel(ModelUtils.documentToObject(document, entityClass, mongoEntityMapper2::convertToModelAttribute));
            }).orElse(null));
        }

        public void onError(Throwable th) {
            onComplete();
            throw new MongoObjectListenerException(th);
        }

        public void onComplete() {
            if (Objects.nonNull(this.subscription)) {
                this.subscription.cancel();
            }
        }
    }

    private StorageUtils() {
    }

    public static <V, E> void watchCollection(MongoCollection<E> mongoCollection, Bson bson, BiConsumer<String, V> biConsumer, MongoEntityMapper<V, E> mongoEntityMapper) {
        mongoCollection.watch(Collections.singletonList(Aggregates.match(bson))).fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new ObjectListenerSubscriber(biConsumer, mongoEntityMapper));
        try {
            TimeUnit.MILLISECONDS.sleep(1500L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
