package org.springframework.data.mongodb.core;

import com.mongodb.client.model.changestream.FullDocument;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveChangeStreamOperation;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-4.0.4.jar:org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.class */
class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation {
    private final ReactiveMongoTemplate template;

    /* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-4.0.4.jar:org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport$ReactiveChangeStreamSupport.class */
    static class ReactiveChangeStreamSupport<T> implements ReactiveChangeStreamOperation.ReactiveChangeStream<T>, ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<T> {
        private final ReactiveMongoTemplate template;
        private final Class<?> domainType;
        private final Class<T> returnType;

        @Nullable
        private final String collection;

        @Nullable
        private final ChangeStreamOptions options;

        private ReactiveChangeStreamSupport(ReactiveMongoTemplate reactiveMongoTemplate, Class<?> cls, Class<T> cls2, @Nullable String str, @Nullable ChangeStreamOptions changeStreamOptions) {
            this.template = reactiveMongoTemplate;
            this.domainType = cls;
            this.returnType = cls2;
            this.collection = str;
            this.options = changeStreamOptions;
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection
        public ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<T> watchCollection(String str) {
            Assert.hasText(str, "Collection name must not be null nor empty");
            return new ReactiveChangeStreamSupport(this.template, this.domainType, this.returnType, str, this.options);
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection
        public ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<T> watchCollection(Class<?> cls) {
            Assert.notNull(cls, "Collection type not be null");
            return watchCollection(this.template.getCollectionName(cls));
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream
        public ReactiveChangeStreamOperation.TerminatingChangeStream<T> resumeAt(Object obj) {
            return withOptions(changeStreamOptionsBuilder -> {
                if (obj instanceof Instant) {
                    changeStreamOptionsBuilder.resumeAt((Instant) obj);
                } else if (obj instanceof BsonTimestamp) {
                    changeStreamOptionsBuilder.resumeAt((BsonTimestamp) obj);
                }
            });
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream
        public ReactiveChangeStreamOperation.TerminatingChangeStream<T> resumeAfter(Object obj) {
            Assert.isInstanceOf((Class<?>) BsonValue.class, obj, "Token must be a BsonValue");
            return withOptions(changeStreamOptionsBuilder -> {
                changeStreamOptionsBuilder.resumeAfter((BsonValue) obj);
            });
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream
        public ReactiveChangeStreamOperation.TerminatingChangeStream<T> startAfter(Object obj) {
            Assert.isInstanceOf((Class<?>) BsonValue.class, obj, "Token must be a BsonValue");
            return withOptions(changeStreamOptionsBuilder -> {
                changeStreamOptionsBuilder.startAfter((BsonValue) obj);
            });
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithOptions
        public ReactiveChangeStreamSupport<T> withOptions(Consumer<ChangeStreamOptions.ChangeStreamOptionsBuilder> consumer) {
            ChangeStreamOptions.ChangeStreamOptionsBuilder initOptionsBuilder = initOptionsBuilder();
            consumer.accept(initOptionsBuilder);
            return new ReactiveChangeStreamSupport<>(this.template, this.domainType, this.returnType, this.collection, initOptionsBuilder.build());
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection
        public <R> ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<R> as(Class<R> cls) {
            Assert.notNull(cls, "ResultType must not be null");
            return new ReactiveChangeStreamSupport(this.template, this.domainType, cls, this.collection, this.options);
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection
        public ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<T> filter(Aggregation aggregation) {
            return withOptions(changeStreamOptionsBuilder -> {
                changeStreamOptionsBuilder.filter(aggregation);
            });
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection
        public ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection<T> filter(CriteriaDefinition criteriaDefinition) {
            MatchOperation match = Aggregation.match(criteriaDefinition);
            return filter(!Document.class.equals(this.domainType) ? Aggregation.newAggregation(this.domainType, match) : Aggregation.newAggregation(match));
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.TerminatingChangeStream
        public Flux<ChangeStreamEvent<T>> listen() {
            return this.template.changeStream(this.collection, this.options != null ? this.options : ChangeStreamOptions.empty(), this.returnType);
        }

        private ChangeStreamOptions.ChangeStreamOptionsBuilder initOptionsBuilder() {
            ChangeStreamOptions.ChangeStreamOptionsBuilder builder = ChangeStreamOptions.builder();
            if (this.options == null) {
                return builder;
            }
            this.options.getFilter().ifPresent(obj -> {
                if (obj instanceof Aggregation) {
                    builder.filter((Aggregation) obj);
                } else {
                    builder.filter((Document[]) ((List) obj).toArray(new Document[0]));
                }
            });
            Optional<FullDocument> fullDocumentLookup = this.options.getFullDocumentLookup();
            Objects.requireNonNull(builder);
            fullDocumentLookup.ifPresent(builder::fullDocumentLookup);
            Optional<Collation> collation = this.options.getCollation();
            Objects.requireNonNull(builder);
            collation.ifPresent(builder::collation);
            if (this.options.isResumeAfter()) {
                Optional<BsonValue> resumeToken = this.options.getResumeToken();
                Objects.requireNonNull(builder);
                resumeToken.ifPresent(builder::resumeAfter);
                Optional<BsonTimestamp> resumeBsonTimestamp = this.options.getResumeBsonTimestamp();
                Objects.requireNonNull(builder);
                resumeBsonTimestamp.ifPresent((v1) -> {
                    r1.resumeAfter(v1);
                });
            } else if (this.options.isStartAfter()) {
                Optional<BsonValue> resumeToken2 = this.options.getResumeToken();
                Objects.requireNonNull(builder);
                resumeToken2.ifPresent(builder::startAfter);
            } else {
                Optional<Instant> resumeTimestamp = this.options.getResumeTimestamp();
                Objects.requireNonNull(builder);
                resumeTimestamp.ifPresent(builder::resumeAt);
                Optional<BsonTimestamp> resumeBsonTimestamp2 = this.options.getResumeBsonTimestamp();
                Objects.requireNonNull(builder);
                resumeBsonTimestamp2.ifPresent(builder::resumeAt);
            }
            return builder;
        }

        @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithOptions
        public /* bridge */ /* synthetic */ ReactiveChangeStreamOperation.ReactiveChangeStream withOptions(Consumer consumer) {
            return withOptions((Consumer<ChangeStreamOptions.ChangeStreamOptionsBuilder>) consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactiveChangeStreamOperationSupport(ReactiveMongoTemplate reactiveMongoTemplate) {
        this.template = reactiveMongoTemplate;
    }

    @Override // org.springframework.data.mongodb.core.ReactiveChangeStreamOperation
    public <T> ReactiveChangeStreamOperation.ReactiveChangeStream<T> changeStream(Class<T> cls) {
        Assert.notNull(cls, "DomainType must not be null");
        return new ReactiveChangeStreamSupport(this.template, cls, cls, null, null);
    }
}
