package org.hibernate.search.mapper.orm.coordination.databasepolling.impl;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.cfg.spi.OptionalConfigurationProperty;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.bean.BeanReference;
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingConfigurationContext;
import org.hibernate.search.mapper.orm.cfg.HibernateOrmMapperSettings;
import org.hibernate.search.mapper.orm.cfg.impl.HibernateOrmMapperImplSettings;
import org.hibernate.search.mapper.orm.coordination.common.spi.CooordinationStrategy;
import org.hibernate.search.mapper.orm.coordination.common.spi.CoordinationStrategyPreStopContext;
import org.hibernate.search.mapper.orm.coordination.common.spi.CoordinationStrategyStartContext;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.DefaultOutboxEventFinder;
import org.hibernate.search.mapper.orm.logging.impl.Log;
import org.hibernate.search.util.common.data.impl.RangeHashTable;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

/* loaded from: input_file:org/hibernate/search/mapper/orm/coordination/databasepolling/impl/DatabasePollingCooordinationStrategy.class */
public class DatabasePollingCooordinationStrategy implements CooordinationStrategy {
    private static final Log log = (Log) LoggerFactory.make(Log.class, MethodHandles.lookup());
    private static final ConfigurationProperty<Boolean> SHARDS_STATIC = ConfigurationProperty.forKey(HibernateOrmMapperSettings.CoordinationRadicals.SHARDS_STATIC).asBoolean().withDefault(false).build();
    private static final OptionalConfigurationProperty<Integer> SHARDS_TOTAL_COUNT = ConfigurationProperty.forKey(HibernateOrmMapperSettings.CoordinationRadicals.SHARDS_TOTAL_COUNT).asInteger().build();
    private static final OptionalConfigurationProperty<List<Integer>> SHARDS_ASSIGNED = ConfigurationProperty.forKey(HibernateOrmMapperSettings.CoordinationRadicals.SHARDS_ASSIGNED).asInteger().multivalued().build();
    private static final ConfigurationProperty<Boolean> PROCESSORS_INDEXING_ENABLED = ConfigurationProperty.forKey(HibernateOrmMapperSettings.CoordinationRadicals.PROCESSORS_INDEXING_ENABLED).asBoolean().withDefault(true).build();
    private static final ConfigurationProperty<Integer> PROCESSORS_INDEXING_POLLING_INTERVAL = ConfigurationProperty.forKey(HibernateOrmMapperSettings.CoordinationRadicals.PROCESSORS_INDEXING_POLLING_INTERVAL).asInteger().withDefault(100).build();
    private static final ConfigurationProperty<Integer> PROCESSORS_INDEXING_BATCH_SIZE = ConfigurationProperty.forKey(HibernateOrmMapperSettings.CoordinationRadicals.PROCESSORS_INDEXING_BATCH_SIZE).asInteger().withDefault(50).build();
    private static final OptionalConfigurationProperty<BeanReference<? extends OutboxEventFinderProvider>> PROCESSORS_INDEXING_OUTBOX_EVENT_FINDER_PROVIDER = ConfigurationProperty.forKey(HibernateOrmMapperImplSettings.CoordinationRadicals.PROCESSORS_INDEXING_OUTBOX_EVENT_FINDER_PROVIDER).asBeanReference(OutboxEventFinderProvider.class).build();
    public static final String PROCESSOR_NAME_PREFIX = "Outbox event processor";
    private BeanHolder<? extends OutboxEventFinderProvider> finderProviderHolder;
    private ScheduledExecutorService scheduledExecutor;
    private List<Integer> assignedShardIndices;
    private RangeHashTable<OutboxEventBackgroundProcessor> indexingProcessors;

    @Override // org.hibernate.search.mapper.orm.coordination.common.spi.CooordinationStrategy
    public void configureAutomaticIndexing(AutomaticIndexingConfigurationContext automaticIndexingConfigurationContext) {
        automaticIndexingConfigurationContext.sendIndexingEventsTo(automaticIndexingEventSendingSessionContext -> {
            return new OutboxEventSendingPlan(automaticIndexingEventSendingSessionContext.mo152session());
        }, true);
    }

    @Override // org.hibernate.search.mapper.orm.coordination.common.spi.CooordinationStrategy
    public CompletableFuture<?> start(CoordinationStrategyStartContext coordinationStrategyStartContext) {
        OptionalConfigurationProperty<BeanReference<? extends OutboxEventFinderProvider>> optionalConfigurationProperty = PROCESSORS_INDEXING_OUTBOX_EVENT_FINDER_PROVIDER;
        ConfigurationPropertySource configurationPropertySource = coordinationStrategyStartContext.configurationPropertySource();
        BeanResolver beanResolver = coordinationStrategyStartContext.beanResolver();
        Objects.requireNonNull(beanResolver);
        Optional andMap = optionalConfigurationProperty.getAndMap(configurationPropertySource, beanResolver::resolve);
        if (andMap.isPresent()) {
            this.finderProviderHolder = (BeanHolder) andMap.get();
            log.debugf("Outbox processing will use custom outbox event finder provider '%s'.", this.finderProviderHolder.get());
        } else {
            this.finderProviderHolder = BeanHolder.of(new DefaultOutboxEventFinder.Provider());
        }
        if (((Boolean) PROCESSORS_INDEXING_ENABLED.get(coordinationStrategyStartContext.configurationPropertySource())).booleanValue()) {
            initializeProcessors(coordinationStrategyStartContext);
        } else {
            log.indexingProcessorDisabled();
        }
        return CompletableFuture.completedFuture(null);
    }

    private void initializeProcessors(CoordinationStrategyStartContext coordinationStrategyStartContext) {
        int i;
        ConfigurationPropertySource configurationPropertySource = coordinationStrategyStartContext.configurationPropertySource();
        if (((Boolean) SHARDS_STATIC.get(configurationPropertySource)).booleanValue()) {
            OptionalConfigurationProperty<Integer> optionalConfigurationProperty = SHARDS_TOTAL_COUNT;
            Function function = this::checkTotalShardCount;
            Log log2 = log;
            Objects.requireNonNull(log2);
            i = ((Integer) optionalConfigurationProperty.getAndMapOrThrow(configurationPropertySource, function, log2::missingPropertyForStaticSharding)).intValue();
            OptionalConfigurationProperty<List<Integer>> optionalConfigurationProperty2 = SHARDS_ASSIGNED;
            Function function2 = list -> {
                return checkAssignedShardIndices(configurationPropertySource, i, list);
            };
            Log log3 = log;
            Objects.requireNonNull(log3);
            this.assignedShardIndices = (List) optionalConfigurationProperty2.getAndMapOrThrow(configurationPropertySource, function2, log3::missingPropertyForStaticSharding);
        } else {
            log.warnf("Dynamic sharding is not implemented yet; defaulting to static sharding assuming a single node", new Object[0]);
            i = 1;
            this.assignedShardIndices = Collections.singletonList(0);
        }
        int intValue = ((Integer) PROCESSORS_INDEXING_POLLING_INTERVAL.get(configurationPropertySource)).intValue();
        int intValue2 = ((Integer) PROCESSORS_INDEXING_BATCH_SIZE.get(configurationPropertySource)).intValue();
        this.scheduledExecutor = coordinationStrategyStartContext.threadPoolProvider().newScheduledExecutor(this.assignedShardIndices.size(), PROCESSOR_NAME_PREFIX);
        this.indexingProcessors = new RangeHashTable<>(OutboxEventSendingPlan.HASH_FUNCTION, i);
        Iterator<Integer> it = this.assignedShardIndices.iterator();
        while (it.hasNext()) {
            int intValue3 = it.next().intValue();
            this.indexingProcessors.set(intValue3, new OutboxEventBackgroundProcessor("Outbox event processor - " + intValue3, coordinationStrategyStartContext.mapping(), this.scheduledExecutor, ((OutboxEventFinderProvider) this.finderProviderHolder.get()).create(i == 1 ? Optional.empty() : Optional.of(new EntityIdHashRangeOutboxEventPredicate(this.indexingProcessors.rangeForBucket(intValue3)))), intValue, intValue2));
        }
        Iterator<Integer> it2 = this.assignedShardIndices.iterator();
        while (it2.hasNext()) {
            ((OutboxEventBackgroundProcessor) this.indexingProcessors.get(it2.next().intValue())).start();
        }
    }

    private Integer checkTotalShardCount(Integer num) {
        if (num.intValue() <= 0) {
            throw log.invalidTotalShardCount();
        }
        return num;
    }

    private List<Integer> checkAssignedShardIndices(ConfigurationPropertySource configurationPropertySource, int i, List<Integer> list) {
        for (Integer num : list) {
            if (0 > num.intValue() || num.intValue() >= i) {
                throw log.invalidShardIndex(i, SHARDS_TOTAL_COUNT.resolveOrRaw(configurationPropertySource));
            }
        }
        return new ArrayList(new HashSet(list));
    }

    @Override // org.hibernate.search.mapper.orm.coordination.common.spi.CooordinationStrategy
    public CompletableFuture<?> preStop(CoordinationStrategyPreStopContext coordinationStrategyPreStopContext) {
        if (this.indexingProcessors == null) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.assignedShardIndices.size()];
        int i = 0;
        Iterator<Integer> it = this.assignedShardIndices.iterator();
        while (it.hasNext()) {
            completableFutureArr[i] = ((OutboxEventBackgroundProcessor) this.indexingProcessors.get(it.next().intValue())).preStop();
            i++;
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    @Override // org.hibernate.search.mapper.orm.coordination.common.spi.CooordinationStrategy
    public void stop() {
        Closer closer = new Closer();
        try {
            closer.pushAll((v0) -> {
                v0.stop();
            }, this.indexingProcessors);
            closer.push((v0) -> {
                v0.shutdownNow();
            }, this.scheduledExecutor);
            closer.push((v0) -> {
                v0.close();
            }, this.finderProviderHolder);
            closer.close();
        } catch (Throwable th) {
            try {
                closer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
