package org.hibernate.search.elasticsearch.processor.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.impl.lucene.MultiWriteDrainableLinkedList;
import org.hibernate.search.elasticsearch.client.impl.ElasticsearchClient;
import org.hibernate.search.elasticsearch.gson.impl.GsonProvider;
import org.hibernate.search.elasticsearch.logging.impl.Log;
import org.hibernate.search.elasticsearch.work.impl.BulkRequestFailedException;
import org.hibernate.search.elasticsearch.work.impl.BulkableElasticsearchWork;
import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWork;
import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkAggregator;
import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkExecutionContext;
import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory;
import org.hibernate.search.exception.ErrorHandler;
import org.hibernate.search.exception.impl.ErrorContextBuilder;
import org.hibernate.search.spi.BuildContext;
import org.hibernate.search.util.impl.CollectionHelper;
import org.hibernate.search.util.impl.Executors;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/* loaded from: input_file:org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.class */
public class ElasticsearchWorkProcessor implements AutoCloseable {
    private static final Log LOG = (Log) LoggerFactory.make(Log.class);
    private static final int MAX_BULK_SIZE = 250;
    private final AsyncBackendRequestProcessor asyncProcessor = new AsyncBackendRequestProcessor();
    private final ErrorHandler errorHandler;
    private final ElasticsearchClient client;
    private final GsonProvider gsonProvider;
    private final ElasticsearchWorkFactory workFactory;
    private final ElasticsearchWorkExecutionContext parallelWorkExecutionContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor$AsyncBackendRequestProcessor.class */
    public class AsyncBackendRequestProcessor {
        private final ScheduledExecutorService scheduler;
        private final MultiWriteDrainableLinkedList<Iterable<ElasticsearchWork<?>>> asyncWorkQueue;
        private final AtomicBoolean asyncWorkerWasStarted;
        private volatile CountDownLatch lastAsyncWorkLatch;

        private AsyncBackendRequestProcessor() {
            this.asyncWorkQueue = new MultiWriteDrainableLinkedList<>();
            this.scheduler = Executors.newScheduledThreadPool("Elasticsearch AsyncBackendRequestProcessor");
            this.asyncWorkerWasStarted = new AtomicBoolean(false);
        }

        public void submit(Iterable<ElasticsearchWork<?>> iterable) {
            this.asyncWorkQueue.add(iterable);
            ensureStarted();
        }

        private void ensureStarted() {
            if (this.asyncWorkerWasStarted.get()) {
                return;
            }
            synchronized (this) {
                if (this.asyncWorkerWasStarted.compareAndSet(false, true)) {
                    try {
                        RequestProcessingRunnable requestProcessingRunnable = new RequestProcessingRunnable(this);
                        this.scheduler.schedule(requestProcessingRunnable, 100L, TimeUnit.MILLISECONDS);
                        this.lastAsyncWorkLatch = requestProcessingRunnable.latch;
                    } catch (Exception e) {
                        this.asyncWorkerWasStarted.set(false);
                        CountDownLatch countDownLatch = this.lastAsyncWorkLatch;
                        if (countDownLatch != null) {
                            countDownLatch.countDown();
                        }
                        throw e;
                    }
                }
            }
        }

        public void awaitCompletion() {
            CountDownLatch countDownLatch = this.lastAsyncWorkLatch;
            if (countDownLatch != null) {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw ElasticsearchWorkProcessor.LOG.interruptedWhileWaitingForRequestCompletion(e);
                }
            }
        }

        public void shutdown() {
            this.scheduler.shutdown();
            try {
                try {
                    this.scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                    CountDownLatch countDownLatch = this.lastAsyncWorkLatch;
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                } catch (InterruptedException e) {
                    ElasticsearchWorkProcessor.LOG.interruptedWhileWaitingForIndexActivity(e);
                    CountDownLatch countDownLatch2 = this.lastAsyncWorkLatch;
                    if (countDownLatch2 != null) {
                        countDownLatch2.countDown();
                    }
                }
            } catch (Throwable th) {
                CountDownLatch countDownLatch3 = this.lastAsyncWorkLatch;
                if (countDownLatch3 != null) {
                    countDownLatch3.countDown();
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor$ProcessorWorkGroupBuilder.class */
    public class ProcessorWorkGroupBuilder implements ElasticsearchWorkAggregator {
        private final boolean refreshInBulkAPICall;
        private final List<ElasticsearchWork<?>> result = new ArrayList();
        private final List<BulkableElasticsearchWork<?>> bulkInProgress = new ArrayList();

        public ProcessorWorkGroupBuilder(boolean z) {
            this.refreshInBulkAPICall = z;
        }

        @Override // org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkAggregator
        public void addBulkable(BulkableElasticsearchWork<?> bulkableElasticsearchWork) {
            this.bulkInProgress.add(bulkableElasticsearchWork);
            if (this.bulkInProgress.size() >= ElasticsearchWorkProcessor.MAX_BULK_SIZE) {
                flushBulkInProgress();
            }
        }

        @Override // org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkAggregator
        public void addNonBulkable(ElasticsearchWork<?> elasticsearchWork) {
            flushBulkInProgress();
            this.result.add(elasticsearchWork);
        }

        private void flushBulkInProgress() {
            if (this.bulkInProgress.isEmpty()) {
                return;
            }
            if (this.bulkInProgress.size() == 1) {
                this.result.add(this.bulkInProgress.iterator().next());
            } else {
                this.result.add(ElasticsearchWorkProcessor.this.workFactory.bulk(this.bulkInProgress).refresh(this.refreshInBulkAPICall).build2());
            }
            this.bulkInProgress.clear();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<ElasticsearchWork<?>> build() {
            flushBulkInProgress();
            return this.result;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor$RequestProcessingRunnable.class */
    public class RequestProcessingRunnable implements Runnable {
        private final AsyncBackendRequestProcessor asyncProcessor;
        private final CountDownLatch latch = new CountDownLatch(1);

        public RequestProcessingRunnable(AsyncBackendRequestProcessor asyncBackendRequestProcessor) {
            this.asyncProcessor = asyncBackendRequestProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                processAsyncWork();
            } finally {
                this.latch.countDown();
            }
        }

        private void processAsyncWork() {
            SequentialWorkExecutionContext sequentialWorkExecutionContext = new SequentialWorkExecutionContext(ElasticsearchWorkProcessor.this.client, ElasticsearchWorkProcessor.this.gsonProvider, ElasticsearchWorkProcessor.this.workFactory, ElasticsearchWorkProcessor.this, ElasticsearchWorkProcessor.this.errorHandler);
            synchronized (this.asyncProcessor) {
                while (true) {
                    Iterable drainToDetachedIterable = this.asyncProcessor.asyncWorkQueue.drainToDetachedIterable();
                    if (drainToDetachedIterable == null) {
                        this.asyncProcessor.asyncWorkerWasStarted.set(false);
                        sequentialWorkExecutionContext.flush();
                    } else {
                        ElasticsearchWorkProcessor.this.executeSafe(sequentialWorkExecutionContext, CollectionHelper.flatten(drainToDetachedIterable), false);
                    }
                }
            }
        }
    }

    public ElasticsearchWorkProcessor(BuildContext buildContext, ElasticsearchClient elasticsearchClient, GsonProvider gsonProvider, ElasticsearchWorkFactory elasticsearchWorkFactory) {
        this.errorHandler = buildContext.getErrorHandler();
        this.client = elasticsearchClient;
        this.gsonProvider = gsonProvider;
        this.workFactory = elasticsearchWorkFactory;
        this.parallelWorkExecutionContext = new ParallelWorkExecutionContext(elasticsearchClient, gsonProvider);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        awaitAsyncProcessingCompletion();
        this.asyncProcessor.shutdown();
    }

    public <T> T executeSyncUnsafe(ElasticsearchWork<T> elasticsearchWork) {
        return elasticsearchWork.execute(this.parallelWorkExecutionContext);
    }

    public void executeSyncSafe(Iterable<ElasticsearchWork<?>> iterable) {
        SequentialWorkExecutionContext sequentialWorkExecutionContext = new SequentialWorkExecutionContext(this.client, this.gsonProvider, this.workFactory, this, this.errorHandler);
        executeSafe(sequentialWorkExecutionContext, iterable, true);
        sequentialWorkExecutionContext.flush();
    }

    public void executeAsync(ElasticsearchWork<?> elasticsearchWork) {
        this.asyncProcessor.submit(Collections.singleton(elasticsearchWork));
    }

    public void executeAsync(List<ElasticsearchWork<?>> list) {
        this.asyncProcessor.submit(list);
    }

    public void awaitAsyncProcessingCompletion() {
        this.asyncProcessor.awaitCompletion();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r14v0, types: [org.hibernate.search.elasticsearch.work.impl.BulkRequestFailedException, java.lang.Throwable] */
    public void executeSafe(SequentialWorkExecutionContext sequentialWorkExecutionContext, Iterable<ElasticsearchWork<?>> iterable, boolean z) {
        ErrorContextBuilder errorContextBuilder = new ErrorContextBuilder();
        for (ElasticsearchWork<?> elasticsearchWork : createRequestGroups(iterable, z)) {
            try {
                executeUnsafe(elasticsearchWork, sequentialWorkExecutionContext);
                Stream<LuceneWork> luceneWorks = elasticsearchWork.getLuceneWorks();
                errorContextBuilder.getClass();
                luceneWorks.forEach(errorContextBuilder::workCompleted);
            } catch (RuntimeException e) {
                handleError(errorContextBuilder, e, iterable, elasticsearchWork.getLuceneWorks());
                return;
            } catch (BulkRequestFailedException e2) {
                Stream<R> flatMap = e2.getSuccessfulItems().keySet().stream().flatMap((v0) -> {
                    return v0.getLuceneWorks();
                });
                errorContextBuilder.getClass();
                flatMap.forEach(errorContextBuilder::workCompleted);
                handleError(errorContextBuilder, e2, iterable, e2.getErroneousItems().stream().flatMap((v0) -> {
                    return v0.getLuceneWorks();
                }));
                return;
            }
        }
    }

    private void executeUnsafe(ElasticsearchWork<?> elasticsearchWork, ElasticsearchWorkExecutionContext elasticsearchWorkExecutionContext) {
        if (LOG.isTraceEnabled()) {
            LOG.tracef("Processing %s", elasticsearchWork);
        }
        elasticsearchWork.execute(elasticsearchWorkExecutionContext);
    }

    private void handleError(ErrorContextBuilder errorContextBuilder, Throwable th, Iterable<ElasticsearchWork<?>> iterable, Stream<LuceneWork> stream) {
        errorContextBuilder.allWorkToBeDone((Iterable) StreamSupport.stream(iterable.spliterator(), false).flatMap(elasticsearchWork -> {
            return elasticsearchWork.getLuceneWorks();
        }).collect(Collectors.toList()));
        errorContextBuilder.getClass();
        stream.forEach(errorContextBuilder::addWorkThatFailed);
        errorContextBuilder.errorThatOccurred(th);
        this.errorHandler.handle(errorContextBuilder.createErrorContext());
    }

    private List<ElasticsearchWork<?>> createRequestGroups(Iterable<ElasticsearchWork<?>> iterable, boolean z) {
        ProcessorWorkGroupBuilder processorWorkGroupBuilder = new ProcessorWorkGroupBuilder(z);
        Iterator<ElasticsearchWork<?>> it = iterable.iterator();
        while (it.hasNext()) {
            it.next().aggregate(processorWorkGroupBuilder);
        }
        return processorWorkGroupBuilder.build();
    }
}
