package org.infinispan.api.reactive.client.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import org.infinispan.api.reactive.EntryStatus;
import org.infinispan.api.reactive.KeyValueEntry;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:org/infinispan/api/reactive/client/impl/ContinuousQueryPublisherImpl.class */
public class ContinuousQueryPublisherImpl<K, V> implements Publisher<KeyValueEntry<K, V>> {
    private static final Log log = (Log) LogFactory.getLog(ContinuousQueryPublisherImpl.class, Log.class);
    private final ContinuousQuery<K, V> continuousQuery;
    private final boolean created;
    private final boolean updated;
    private final boolean deleted;
    private final Query<?> query;
    private Flowable<KeyValueEntry<K, V>> flowable;
    private ContinuousQueryListener<K, V> continuousQueryListener;

    public ContinuousQueryPublisherImpl(Query<?> query, ContinuousQuery<K, V> continuousQuery, boolean z, boolean z2, boolean z3) {
        this.query = query;
        this.continuousQuery = continuousQuery;
        this.created = z;
        this.updated = z2;
        this.deleted = z3;
    }

    public void subscribe(Subscriber<? super KeyValueEntry<K, V>> subscriber) {
        createContinuousQueryFlowable();
        this.flowable.subscribe(subscriber);
    }

    private synchronized void createContinuousQueryFlowable() {
        if (this.flowable == null) {
            UnicastProcessor<KeyValueEntry<K, V>> create = UnicastProcessor.create();
            this.continuousQueryListener = createContinuousQueryListener(create);
            this.continuousQuery.addContinuousQueryListener(this.query, this.continuousQueryListener);
            create.doOnError(th -> {
                log.error(th);
                this.continuousQuery.removeContinuousQueryListener(this.continuousQueryListener);
            }).doOnCancel(() -> {
                this.continuousQuery.removeContinuousQueryListener(this.continuousQueryListener);
            });
            this.flowable = create;
        }
    }

    private ContinuousQueryListener<K, V> createContinuousQueryListener(final UnicastProcessor<KeyValueEntry<K, V>> unicastProcessor) {
        return new ContinuousQueryListener<K, V>() { // from class: org.infinispan.api.reactive.client.impl.ContinuousQueryPublisherImpl.1
            public void resultJoining(K k, V v) {
                if (ContinuousQueryPublisherImpl.this.created) {
                    unicastProcessor.onNext(new KeyValueEntry(k, v, EntryStatus.CREATED));
                }
            }

            public void resultUpdated(K k, V v) {
                if (ContinuousQueryPublisherImpl.this.updated) {
                    unicastProcessor.onNext(new KeyValueEntry(k, v, EntryStatus.UPDATED));
                }
            }

            public void resultLeaving(K k) {
                if (ContinuousQueryPublisherImpl.this.deleted) {
                    unicastProcessor.onNext(new KeyValueEntry(k, (Object) null, EntryStatus.DELETED));
                }
            }
        };
    }
}
