package org.jboss.as.clustering.infinispan.persistence.jdbc;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.executors.BlockingResource;
import org.infinispan.commons.util.IntSet;
import org.infinispan.persistence.jdbc.common.TableOperations;
import org.infinispan.persistence.jdbc.common.connectionfactory.ConnectionFactory;
import org.infinispan.persistence.jdbc.stringbased.JdbcStringBasedStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.jboss.logging.Logger;
import org.reactivestreams.Publisher;

@ConfiguredBy(JDBCStoreConfiguration.class)
/* loaded from: input_file:org/jboss/as/clustering/infinispan/persistence/jdbc/JDBCStore.class */
public class JDBCStore<K, V> extends JdbcStringBasedStore<K, V> {
    private static final Logger LOGGER = Logger.getLogger(JDBCStore.class);
    private static final Runnable INTERRUPT = () -> {
        Thread.currentThread().interrupt();
    };
    private Executor executor;

    public CompletionStage<Void> start(InitializationContext initializationContext) {
        this.executor = initializationContext.getBlockingManager().asExecutor(getClass().getSimpleName());
        return super.start(initializationContext);
    }

    public Publisher<MarshallableEntry<K, V>> publishEntries(IntSet intSet, Predicate<? super K> predicate, boolean z) {
        TableOperations tableOperations = this.tableOperations;
        ConnectionFactory connectionFactory = this.connectionFactory;
        Objects.requireNonNull(connectionFactory);
        Supplier supplier = connectionFactory::getConnection;
        ConnectionFactory connectionFactory2 = this.connectionFactory;
        Objects.requireNonNull(connectionFactory2);
        return blocking(tableOperations.publishEntries(supplier, connectionFactory2::releaseConnection, intSet, predicate, z));
    }

    public Publisher<K> publishKeys(IntSet intSet, Predicate<? super K> predicate) {
        TableOperations tableOperations = this.tableOperations;
        ConnectionFactory connectionFactory = this.connectionFactory;
        Objects.requireNonNull(connectionFactory);
        Supplier supplier = connectionFactory::getConnection;
        ConnectionFactory connectionFactory2 = this.connectionFactory;
        Objects.requireNonNull(connectionFactory2);
        return blocking(tableOperations.publishKeys(supplier, connectionFactory2::releaseConnection, intSet, predicate));
    }

    private <T> Flowable<T> blocking(Flowable<T> flowable) {
        if (Thread.currentThread().getThreadGroup() instanceof BlockingResource) {
            return flowable;
        }
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.executor.execute(new Runnable() { // from class: org.jboss.as.clustering.infinispan.persistence.jdbc.JDBCStore.1
            @Override // java.lang.Runnable
            public void run() {
                while (!Thread.interrupted()) {
                    try {
                        ((Runnable) linkedBlockingQueue.take()).run();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (Throwable th) {
                        JDBCStore.LOGGER.warn(th.getLocalizedMessage(), th);
                    }
                }
            }
        });
        Scheduler from = Schedulers.from(new Executor() { // from class: org.jboss.as.clustering.infinispan.persistence.jdbc.JDBCStore.2
            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                try {
                    linkedBlockingQueue.put(runnable);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, false, true);
        return flowable.subscribeOn(from).unsubscribeOn(from).observeOn(from).doAfterTerminate(() -> {
            linkedBlockingQueue.put(INTERRUPT);
        });
    }
}
