package org.hibernate.search.elasticsearch.processor.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.hibernate.search.elasticsearch.client.impl.ElasticsearchClient;
import org.hibernate.search.elasticsearch.gson.impl.GsonProvider;
import org.hibernate.search.elasticsearch.logging.impl.Log;
import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWork;
import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkExecutionContext;
import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory;
import org.hibernate.search.exception.ErrorHandler;
import org.hibernate.search.spi.BuildContext;
import org.hibernate.search.util.impl.Throwables;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/* loaded from: input_file:org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.class */
public class ElasticsearchWorkProcessor implements AutoCloseable {
    private static final int NON_STREAM_MIN_BULK_SIZE = 2;
    private static final int STREAM_MIN_BULK_SIZE = 1;
    private static final int MAX_BULK_SIZE = 250;
    private static final int NON_STREAM_MAX_CHANGESETS_PER_BATCH = 2500;
    private static final int STREAM_MAX_CHANGESETS_PER_BATCH = 5000;
    private static final Log LOG = (Log) LoggerFactory.make(Log.class);
    private final ErrorHandler errorHandler;
    private final ElasticsearchClient client;
    private final GsonProvider gsonProvider;
    private final ElasticsearchWorkFactory workFactory;
    private final ElasticsearchWorkExecutionContext parallelWorkExecutionContext;
    private final BarrierElasticsearchWorkOrchestrator streamOrchestrator = createBatchingSharedOrchestrator("Elasticsearch async stream work orchestrator", STREAM_MAX_CHANGESETS_PER_BATCH, false, createParallelOrchestrator(this::createIndexMonitorBufferingWorkExecutionContext, STREAM_MIN_BULK_SIZE, false));

    public ElasticsearchWorkProcessor(BuildContext buildContext, ElasticsearchClient elasticsearchClient, GsonProvider gsonProvider, ElasticsearchWorkFactory elasticsearchWorkFactory) {
        this.errorHandler = buildContext.getErrorHandler();
        this.client = elasticsearchClient;
        this.gsonProvider = gsonProvider;
        this.workFactory = elasticsearchWorkFactory;
        this.parallelWorkExecutionContext = new ImmutableElasticsearchWorkExecutionContext(elasticsearchClient, gsonProvider);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            try {
                this.streamOrchestrator.awaitCompletion();
                this.streamOrchestrator.close();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw LOG.interruptedWhileWaitingForRequestCompletion(e);
            }
        } catch (Throwable th) {
            this.streamOrchestrator.close();
            throw th;
        }
    }

    public <T> T executeSyncUnsafe(ElasticsearchWork<T> elasticsearchWork) {
        try {
            return executeAsyncUnsafe(elasticsearchWork).join();
        } catch (CompletionException e) {
            throw Throwables.expectRuntimeException(e.getCause());
        }
    }

    public <T> CompletableFuture<T> executeAsyncUnsafe(ElasticsearchWork<T> elasticsearchWork) {
        return start(elasticsearchWork, this.parallelWorkExecutionContext);
    }

    public BarrierElasticsearchWorkOrchestrator createNonStreamOrchestrator(String str, boolean z) {
        Supplier<FlushableElasticsearchWorkExecutionContext> supplier;
        boolean z2;
        if (z) {
            supplier = this::createRefreshingWorkExecutionContext;
            z2 = STREAM_MIN_BULK_SIZE;
        } else {
            supplier = this::createIndexMonitorBufferingWorkExecutionContext;
            z2 = false;
        }
        return createBatchingSharedOrchestrator("Elasticsearch non-stream work orchestrator for index " + str, NON_STREAM_MAX_CHANGESETS_PER_BATCH, true, createSerialOrchestrator(supplier, 2, z2));
    }

    public BarrierElasticsearchWorkOrchestrator getStreamOrchestrator() {
        return this.streamOrchestrator;
    }

    private <T> CompletableFuture<T> start(ElasticsearchWork<T> elasticsearchWork, ElasticsearchWorkExecutionContext elasticsearchWorkExecutionContext) {
        LOG.tracef("Processing %s", elasticsearchWork);
        return elasticsearchWork.execute(elasticsearchWorkExecutionContext);
    }

    private BatchingSharedElasticsearchWorkOrchestrator createBatchingSharedOrchestrator(String str, int i, boolean z, FlushableElasticsearchWorkOrchestrator flushableElasticsearchWorkOrchestrator) {
        return new BatchingSharedElasticsearchWorkOrchestrator(str, i, z, flushableElasticsearchWorkOrchestrator, this.errorHandler);
    }

    private FlushableElasticsearchWorkOrchestrator createSerialOrchestrator(Supplier<FlushableElasticsearchWorkExecutionContext> supplier, int i, boolean z) {
        ElasticsearchWorkSequenceBuilder createSequenceBuilder = createSequenceBuilder(supplier);
        return new SerialChangesetsElasticsearchWorkOrchestrator(createSequenceBuilder, createBulker(createSequenceBuilder, i, z));
    }

    private FlushableElasticsearchWorkOrchestrator createParallelOrchestrator(Supplier<FlushableElasticsearchWorkExecutionContext> supplier, int i, boolean z) {
        ElasticsearchWorkSequenceBuilder createSequenceBuilder = createSequenceBuilder(supplier);
        return new ParallelChangesetsElasticsearchWorkOrchestrator(createSequenceBuilder, createBulker(createSequenceBuilder, i, z));
    }

    private ElasticsearchWorkSequenceBuilder createSequenceBuilder(Supplier<FlushableElasticsearchWorkExecutionContext> supplier) {
        return new DefaultElasticsearchWorkSequenceBuilder(this::start, supplier, () -> {
            return new DefaultContextualErrorHandler(this.errorHandler);
        });
    }

    private ElasticsearchWorkBulker createBulker(ElasticsearchWorkSequenceBuilder elasticsearchWorkSequenceBuilder, int i, boolean z) {
        return new DefaultElasticsearchWorkBulker(elasticsearchWorkSequenceBuilder, list -> {
            return this.workFactory.bulk(list).refresh(z).build2();
        }, i, MAX_BULK_SIZE);
    }

    private FlushableElasticsearchWorkExecutionContext createIndexMonitorBufferingWorkExecutionContext() {
        return new IndexMonitorBufferingElasticsearchWorkExecutionContext(this.client, this.gsonProvider, this.errorHandler);
    }

    private FlushableElasticsearchWorkExecutionContext createRefreshingWorkExecutionContext() {
        return new RefreshingElasticsearchWorkExecutionContext(this.client, this.gsonProvider, this.workFactory, this, this.errorHandler);
    }
}
