package org.hibernate.search.backend.elasticsearch.orchestration.impl;

import java.util.concurrent.CompletableFuture;
import org.hibernate.search.backend.elasticsearch.cfg.ElasticsearchIndexSettings;
import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink;
import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads;
import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext;
import org.hibernate.search.backend.elasticsearch.work.impl.IndexingWork;
import org.hibernate.search.engine.backend.orchestration.spi.BatchingExecutor;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.cfg.spi.ConfigurationPropertySource;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.data.impl.SimpleHashFunction;
import org.hibernate.search.util.common.impl.Closer;

/* loaded from: input_file:org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchingWorkOrchestrator.class */
public class ElasticsearchBatchingWorkOrchestrator extends AbstractElasticsearchWorkOrchestrator<ElasticsearchBatchedWork<?>> implements ElasticsearchSerialWorkOrchestrator {
    private static final ConfigurationProperty<Integer> QUEUE_COUNT = ConfigurationProperty.forKey(ElasticsearchIndexSettings.INDEXING_QUEUE_COUNT).asInteger().withDefault(10).build();
    private static final ConfigurationProperty<Integer> QUEUE_SIZE = ConfigurationProperty.forKey(ElasticsearchIndexSettings.INDEXING_QUEUE_SIZE).asInteger().withDefault(1000).build();
    private static final ConfigurationProperty<Integer> MAX_BULK_SIZE = ConfigurationProperty.forKey(ElasticsearchIndexSettings.INDEXING_MAX_BULK_SIZE).asInteger().withDefault(100).build();
    private final BackendThreads threads;
    private final FailureHandler failureHandler;
    private BatchingExecutor<ElasticsearchBatchedWorkProcessor>[] executors;

    public ElasticsearchBatchingWorkOrchestrator(String str, BackendThreads backendThreads, ElasticsearchLink elasticsearchLink, FailureHandler failureHandler) {
        super(str, elasticsearchLink);
        this.threads = backendThreads;
        this.failureHandler = failureHandler;
    }

    @Override // org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator
    public <T> CompletableFuture<T> submit(IndexingWork<T> indexingWork) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        submit((Object) new ElasticsearchBatchedWork(indexingWork, completableFuture));
        return completableFuture;
    }

    protected void doStart(ConfigurationPropertySource configurationPropertySource) {
        int intValue = ((Integer) QUEUE_COUNT.get(configurationPropertySource)).intValue();
        int intValue2 = ((Integer) QUEUE_SIZE.get(configurationPropertySource)).intValue();
        int intValue3 = ((Integer) MAX_BULK_SIZE.get(configurationPropertySource)).intValue();
        ElasticsearchWorkExecutionContext createWorkExecutionContext = createWorkExecutionContext();
        this.executors = new BatchingExecutor[intValue];
        for (int i = 0; i < this.executors.length; i++) {
            this.executors[i] = new BatchingExecutor<>(name() + " - " + i, createProcessor(createWorkExecutionContext, intValue3), intValue2, true, this.failureHandler);
        }
        for (BatchingExecutor<ElasticsearchBatchedWorkProcessor> batchingExecutor : this.executors) {
            batchingExecutor.start(this.threads.getWorkExecutor());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doSubmit(ElasticsearchBatchedWork<?> elasticsearchBatchedWork) throws InterruptedException {
        ((BatchingExecutor) SimpleHashFunction.pick(this.executors, elasticsearchBatchedWork.getQueuingKey())).submit(elasticsearchBatchedWork);
    }

    protected CompletableFuture<?> completion() {
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.executors.length];
        for (int i = 0; i < this.executors.length; i++) {
            completableFutureArr[i] = this.executors[i].completion();
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    protected void doStop() {
        Closer closer = new Closer();
        try {
            closer.pushAll((v0) -> {
                v0.stop();
            }, this.executors);
            closer.close();
        } catch (Throwable th) {
            try {
                closer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private ElasticsearchBatchedWorkProcessor createProcessor(ElasticsearchWorkExecutionContext elasticsearchWorkExecutionContext, int i) {
        ElasticsearchDefaultWorkSequenceBuilder elasticsearchDefaultWorkSequenceBuilder = new ElasticsearchDefaultWorkSequenceBuilder(elasticsearchWorkExecutionContext);
        return new ElasticsearchBatchedWorkProcessor(elasticsearchDefaultWorkSequenceBuilder, new ElasticsearchDefaultWorkBulker(elasticsearchDefaultWorkSequenceBuilder, (list, documentRefreshStrategy) -> {
            return this.link.getWorkBuilderFactory().bulk(list).refresh(documentRefreshStrategy).build();
        }, i));
    }

    @Override // org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator
    public /* bridge */ /* synthetic */ void submit(ElasticsearchBatchedWork elasticsearchBatchedWork) {
        super.submit((Object) elasticsearchBatchedWork);
    }
}
