package org.infinispan.query.affinity;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.LockObtainFailedException;
import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.exception.SearchException;
import org.hibernate.search.indexes.spi.DirectoryBasedIndexManager;
import org.hibernate.search.spi.SearchIntegrator;
import org.hibernate.search.spi.WorkerBuildContext;
import org.hibernate.search.store.DirectoryProvider;
import org.infinispan.Cache;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.hibernate.search.spi.InfinispanDirectoryProvider;
import org.infinispan.lucene.InvalidLockException;
import org.infinispan.lucene.impl.DirectoryExtensions;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.query.backend.ComponentRegistryService;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.backend.TransactionHelper;
import org.infinispan.query.logging.Log;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.ByteString;
import org.infinispan.util.logging.LogFactory;

@Listener(observation = Listener.Observation.POST)
/* loaded from: input_file:org/infinispan/query/affinity/AffinityIndexManager.class */
public class AffinityIndexManager extends DirectoryBasedIndexManager implements OperationFailedHandler {
    private static final Log log = (Log) LogFactory.getLog(AffinityIndexManager.class, Log.class);
    private static final int POLL_WAIT = 1000;
    private RpcManager rpcManager;
    private DistributionManager distributionManager;
    private KeyTransformationHandler keyTransformationHandler;
    private Cache cache;
    private volatile boolean hasOwnership = true;
    private volatile Address nextOwner = null;
    private final ReadWriteLock flushLock = new ReentrantReadWriteLock();
    private final Lock writeLock = this.flushLock.writeLock();
    private final Lock readLock = this.flushLock.readLock();
    private ExecutorService asyncExecutor;
    private TransactionHelper transactionHelper;
    private int segment;

    @Override // org.infinispan.query.affinity.OperationFailedHandler
    public void operationsFailed(List<LuceneWork> list, Throwable th) {
        log.debugf(th, "Operations '%s' failed in the backend", list);
        if (th instanceof LockObtainFailedException) {
            lockObtainFailed(list);
        }
        if ((th instanceof SearchException) && (th.getCause() instanceof InvalidLockException)) {
            log.warnf(th, "Retrying failed operations", new Object[0]);
            retryOperations(list);
        }
    }

    private void lockObtainFailed(List<LuceneWork> list) {
        log.debugf("Operation %s failed due to lock already in used", list);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        clearIfNeeded(getLockHolder(getIndexName()));
        retryOperations(list);
    }

    private void retryOperations(List<LuceneWork> list) {
        if (list.stream().anyMatch(luceneWork -> {
            return luceneWork.getIdInString() != null;
        })) {
            log.debugf("Retrying operations %s", list);
            CompletableFuture.supplyAsync(() -> {
                performOperations(list, null);
                return null;
            }, this.asyncExecutor).whenComplete((obj, th) -> {
                if (th == null) {
                    log.debugf("Operation completed", new Object[0]);
                } else {
                    log.errorf(th, "Error reapplying operation", new Object[0]);
                }
            });
        }
    }

    private void clearIfNeeded(Address address) {
        List members = this.rpcManager.getMembers();
        log.debugf("Current members are %s, lock holder is %s", members, address);
        if (members.contains(address)) {
            return;
        }
        Object directory = getDirectoryProvider().getDirectory();
        log.debug("Forcing clear of index lock");
        ((DirectoryExtensions) directory).forceUnlock(IndexWriter.WRITE_LOCK_NAME);
    }

    @Override // org.hibernate.search.indexes.spi.DirectoryBasedIndexManager, org.hibernate.search.indexes.spi.IndexManager
    public void initialize(String str, Properties properties, Similarity similarity, WorkerBuildContext workerBuildContext) {
        ComponentRegistry componentRegistry = ((ComponentRegistryService) workerBuildContext.getServiceManager().requestService(ComponentRegistryService.class)).getComponentRegistry();
        this.transactionHelper = new TransactionHelper((TransactionManager) componentRegistry.getComponent(TransactionManager.class));
        Transaction suspendTxIfExists = this.transactionHelper.suspendTxIfExists();
        try {
            super.initialize(str, properties, similarity, workerBuildContext);
            this.transactionHelper.resume(suspendTxIfExists);
            this.asyncExecutor = (ExecutorService) componentRegistry.getComponent(ExecutorService.class, "org.infinispan.executors.async");
            this.distributionManager = (DistributionManager) componentRegistry.getComponent(DistributionManager.class);
            this.rpcManager = (RpcManager) componentRegistry.getComponent(RpcManager.class);
            this.cache = (Cache) componentRegistry.getComponent(Cache.class);
            this.keyTransformationHandler = ((QueryInterceptor) componentRegistry.getComponent(QueryInterceptor.class)).getKeyTransformationHandler();
            ((AffinityErrorHandler) ((SearchIntegrator) componentRegistry.getComponent(SearchIntegrator.class)).getErrorHandler()).initialize(this);
            this.segment = AffinityShardIdentifierProvider.getSegment(str);
            this.cache.addListener(this);
        } catch (Throwable th) {
            this.transactionHelper.resume(suspendTxIfExists);
            throw th;
        }
    }

    private void handleOwnershipLost(Address address) {
        this.writeLock.lock();
        try {
            log.debugf("Ownership lost to '%s', closing index manager", address);
            this.nextOwner = address;
            flushAndReleaseResources();
            this.hasOwnership = false;
        } finally {
            this.writeLock.unlock();
        }
    }

    private void handleOwnershipAcquired() {
        this.writeLock.lock();
        try {
            log.debugf("Ownership acquired", new Object[0]);
            this.nextOwner = null;
            this.hasOwnership = true;
        } finally {
            this.writeLock.unlock();
        }
    }

    @Override // org.hibernate.search.indexes.spi.DirectoryBasedIndexManager, org.hibernate.search.indexes.spi.IndexManager
    public void flushAndReleaseResources() {
        InfinispanDirectoryProvider infinispanDirectoryProvider = (InfinispanDirectoryProvider) getDirectoryProvider();
        int pendingDeleteTasks = infinispanDirectoryProvider.pendingDeleteTasks();
        boolean z = false;
        long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(1000L);
        while (pendingDeleteTasks > 0 && nanoTime - System.nanoTime() > 0) {
            try {
                Thread.sleep(10L);
                log.debugf("Waiting for pending delete tasks, remaining: %s", pendingDeleteTasks);
                pendingDeleteTasks = infinispanDirectoryProvider.pendingDeleteTasks();
            } catch (InterruptedException e) {
                z = true;
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
        log.debugf("Flushing directory provider", new Object[0]);
        super.flushAndReleaseResources();
    }

    private Object stringToKey(String str) {
        return this.keyTransformationHandler.stringToKey(str, this.cache.getAdvancedCache().getClassLoader());
    }

    private int getSegment(Object obj) {
        return this.distributionManager.getConsistentHash().getSegment(obj);
    }

    private Address getLockHolder(String str) {
        log.debugf("Getting lock holder for %s", str);
        Transaction suspendTxIfExists = this.transactionHelper.suspendTxIfExists();
        try {
            Address lockOwner = ((InfinispanDirectoryProvider) getDirectoryProvider()).getLockOwner(str, IndexWriter.WRITE_LOCK_NAME);
            this.transactionHelper.resume(suspendTxIfExists);
            return lockOwner;
        } catch (Throwable th) {
            this.transactionHelper.resume(suspendTxIfExists);
            throw th;
        }
    }

    private Address getLocation(LuceneWork luceneWork) {
        Address address;
        Address address2 = this.rpcManager.getAddress();
        if (luceneWork.getIdInString() == null) {
            return address2;
        }
        Address lockHolder = getLockHolder(replaceShard(getIndexName(), getSegment(stringToKey(luceneWork.getIdInString()))));
        log.debugf("Lock holder for %s is %s", getIndexName(), lockHolder);
        if (this.hasOwnership) {
            address = (lockHolder == null || address2.equals(lockHolder)) ? address2 : lockHolder;
        } else {
            log.debugf("Lost ownership, new owner is %s", this.nextOwner);
            address = this.nextOwner;
        }
        return address;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @SafeVarargs
    public final <T> List<T> newList(T... tArr) {
        ArrayList arrayList = new ArrayList(tArr.length);
        Collections.addAll(arrayList, tArr);
        return arrayList;
    }

    private Map<Address, List<LuceneWork>> partitionWorkByAddress(List<LuceneWork> list) {
        return (Map) list.stream().collect(Collectors.toMap(this::getLocation, luceneWork -> {
            return this.newList(luceneWork);
        }, (list2, list3) -> {
            list2.addAll(list3);
            return list2;
        }));
    }

    @Override // org.hibernate.search.indexes.spi.DirectoryBasedIndexManager, org.hibernate.search.indexes.spi.IndexManager
    public void performOperations(List<LuceneWork> list, IndexingMonitor indexingMonitor) {
        if (!this.cache.getCacheConfiguration().clustering().cacheMode().isClustered()) {
            super.performOperations(list, indexingMonitor);
            return;
        }
        Address address = this.rpcManager.getAddress();
        this.readLock.lock();
        try {
            Map<Address, List<LuceneWork>> partitionWorkByAddress = partitionWorkByAddress(list);
            log.debugf("Applying work @ %s, workMap is %s", address, partitionWorkByAddress);
            List<LuceneWork> list2 = partitionWorkByAddress.get(address);
            if (list2 != null && !list2.isEmpty()) {
                log.debugf("About to apply work locally %s, hasOwnership=%s", list2, Boolean.valueOf(this.hasOwnership));
                super.performOperations(list2, indexingMonitor);
                log.debugf("Work applied", new Object[0]);
                partitionWorkByAddress.remove(address);
            }
            partitionWorkByAddress.entrySet().forEach(entry -> {
                sendWork((List) entry.getValue(), (Address) entry.getKey());
            });
            this.readLock.unlock();
            if (this.distributionManager.isRehashInProgress()) {
                return;
            }
            Address locatePrimaryOwnerForSegment = this.distributionManager.getConsistentHash().locatePrimaryOwnerForSegment(this.segment);
            if (address.equals(locatePrimaryOwnerForSegment)) {
                return;
            }
            log.debugf("%s is not owner anymore, releasing resources", this.rpcManager.getAddress());
            handleOwnershipLost(locatePrimaryOwnerForSegment);
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    private void sendWork(List<LuceneWork> list, Address address) {
        AffinityUpdateCommand affinityUpdateCommand = new AffinityUpdateCommand(ByteString.fromString(this.cache.getName()));
        affinityUpdateCommand.setSerializedWorkList(getSerializer().toSerializedModel(list));
        List singletonList = Collections.singletonList(address);
        log.debugf("Sending works %s to %s", list, singletonList);
        this.rpcManager.invokeRemotelyAsync(singletonList, affinityUpdateCommand, this.rpcManager.getDefaultRpcOptions(false)).whenComplete((map, th) -> {
            if (th != null) {
                log.error("Error forwarding index job", th);
            }
        });
    }

    @Override // org.hibernate.search.indexes.spi.DirectoryBasedIndexManager
    protected DirectoryProvider<?> createDirectoryProvider(String str, Properties properties, WorkerBuildContext workerBuildContext) {
        InfinispanDirectoryProvider infinispanDirectoryProvider = new InfinispanDirectoryProvider(Integer.valueOf(str.substring(str.lastIndexOf(".") + 1)).intValue());
        infinispanDirectoryProvider.initialize(str, properties, workerBuildContext);
        return infinispanDirectoryProvider;
    }

    private String replaceShard(String str, int i) {
        return str.substring(0, str.lastIndexOf(".") + 1).concat(String.valueOf(i));
    }

    @TopologyChanged
    public void onTopologyChange(TopologyChangedEvent<?, ?> topologyChangedEvent) {
        log.debugf("Topology changed notification for %s: %s", getIndexName(), topologyChangedEvent);
        Address address = this.rpcManager.getAddress();
        Address locatePrimaryOwnerForSegment = topologyChangedEvent.getConsistentHashAtStart().locatePrimaryOwnerForSegment(this.segment);
        Address locatePrimaryOwnerForSegment2 = topologyChangedEvent.getConsistentHashAtEnd().locatePrimaryOwnerForSegment(this.segment);
        if (locatePrimaryOwnerForSegment.equals(address) && !locatePrimaryOwnerForSegment2.equals(address)) {
            handleOwnershipLost(locatePrimaryOwnerForSegment2);
        } else {
            if (locatePrimaryOwnerForSegment.equals(address) || !locatePrimaryOwnerForSegment2.equals(address)) {
                return;
            }
            handleOwnershipAcquired();
        }
    }
}
