/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.affinity;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.engine.service.spi.ServiceManager;
import org.hibernate.search.exception.SearchException;
import org.hibernate.search.indexes.spi.DirectoryBasedIndexManager;
import org.hibernate.search.spi.BuildContext;
import org.hibernate.search.spi.SearchIntegrator;
import org.hibernate.search.spi.WorkerBuildContext;
import org.hibernate.search.store.DirectoryProvider;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.hibernate.search.spi.InfinispanDirectoryProvider;
import org.infinispan.lucene.InvalidLockException;
import org.infinispan.lucene.impl.DirectoryExtensions;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.query.affinity.AffinityErrorHandler;
import org.infinispan.query.affinity.AffinityShardIdentifierProvider;
import org.infinispan.query.affinity.AffinityUpdateCommand;
import org.infinispan.query.affinity.OperationFailedHandler;
import org.infinispan.query.backend.ComponentRegistryService;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.backend.TransactionHelper;
import org.infinispan.query.logging.Log;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.ByteString;
import org.infinispan.util.logging.LogFactory;

@Listener(observation=Listener.Observation.POST)
public class AffinityIndexManager
extends DirectoryBasedIndexManager
implements OperationFailedHandler {
    private static final Log log = (Log)LogFactory.getLog(AffinityIndexManager.class, Log.class);
    private static final int POLL_WAIT = 1000;
    private RpcManager rpcManager;
    private DistributionManager distributionManager;
    private KeyTransformationHandler keyTransformationHandler;
    private Cache cache;
    private volatile boolean hasOwnership = true;
    private volatile Address nextOwner = null;
    private final ReadWriteLock flushLock = new ReentrantReadWriteLock();
    private final Lock writeLock = this.flushLock.writeLock();
    private final Lock readLock = this.flushLock.readLock();
    private ExecutorService asyncExecutor;
    private TransactionHelper transactionHelper;
    private int segment;

    @Override
    public void operationsFailed(List<LuceneWork> failingOperations, Throwable cause) {
        Throwable rootCause;
        log.debugf(cause, "Operations '%s' failed in the backend", failingOperations);
        if (cause instanceof LockObtainFailedException) {
            this.lockObtainFailed(failingOperations);
        }
        if (cause instanceof SearchException && (rootCause = cause.getCause()) instanceof InvalidLockException) {
            log.warnf(cause, "Retrying failed operations", new Object[0]);
            this.retryOperations(failingOperations);
        }
    }

    private void lockObtainFailed(List<LuceneWork> failingOperations) {
        log.debugf("Operation %s failed due to lock already in used", failingOperations);
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Address lockHolder = this.getLockHolder(this.getIndexName());
        this.clearIfNeeded(lockHolder);
        this.retryOperations(failingOperations);
    }

    private void retryOperations(List<LuceneWork> failingOperations) {
        if (failingOperations.stream().anyMatch(w -> w.getIdInString() != null)) {
            log.debugf("Retrying operations %s", failingOperations);
            CompletableFuture.supplyAsync(() -> {
                this.performOperations(failingOperations, null);
                return null;
            }, this.asyncExecutor).whenComplete((aVoid, throwable) -> {
                if (throwable == null) {
                    log.debugf("Operation completed", new Object[0]);
                } else {
                    log.errorf((Throwable)throwable, "Error reapplying operation", new Object[0]);
                }
            });
        }
    }

    private void clearIfNeeded(Address lockHolder) {
        List members = this.rpcManager.getMembers();
        log.debugf("Current members are %s, lock holder is %s", members, lockHolder);
        if (!members.contains(lockHolder)) {
            Directory directory = this.getDirectoryProvider().getDirectory();
            log.debug("Forcing clear of index lock");
            ((DirectoryExtensions)directory).forceUnlock("write.lock");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialize(String indexName, Properties properties, Similarity similarity, WorkerBuildContext buildContext) {
        ServiceManager serviceManager = buildContext.getServiceManager();
        ComponentRegistryService componentRegistryService = (ComponentRegistryService)serviceManager.requestService(ComponentRegistryService.class);
        ComponentRegistry componentRegistry = componentRegistryService.getComponentRegistry();
        this.transactionHelper = new TransactionHelper((TransactionManager)componentRegistry.getComponent(TransactionManager.class));
        Transaction tx = this.transactionHelper.suspendTxIfExists();
        try {
            super.initialize(indexName, properties, similarity, buildContext);
        }
        finally {
            this.transactionHelper.resume(tx);
        }
        this.asyncExecutor = (ExecutorService)componentRegistry.getComponent(ExecutorService.class, "org.infinispan.executors.async");
        this.distributionManager = (DistributionManager)componentRegistry.getComponent(DistributionManager.class);
        this.rpcManager = (RpcManager)componentRegistry.getComponent(RpcManager.class);
        this.cache = (Cache)componentRegistry.getComponent(Cache.class);
        this.keyTransformationHandler = ((QueryInterceptor)((Object)componentRegistry.getComponent(QueryInterceptor.class))).getKeyTransformationHandler();
        SearchIntegrator component = (SearchIntegrator)componentRegistry.getComponent(SearchIntegrator.class);
        AffinityErrorHandler errorHandler = (AffinityErrorHandler)component.getErrorHandler();
        errorHandler.initialize(this);
        this.segment = AffinityShardIdentifierProvider.getSegment(indexName);
        this.cache.addListener((Object)this);
    }

    private void handleOwnershipLost(Address newOwner) {
        this.writeLock.lock();
        try {
            log.debugf("Ownership lost to '%s', closing index manager", newOwner);
            this.nextOwner = newOwner;
            this.flushAndReleaseResources();
            this.hasOwnership = false;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void handleOwnershipAcquired() {
        this.writeLock.lock();
        try {
            log.debugf("Ownership acquired", new Object[0]);
            this.nextOwner = null;
            this.hasOwnership = true;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void flushAndReleaseResources() {
        InfinispanDirectoryProvider directoryProvider = (InfinispanDirectoryProvider)this.getDirectoryProvider();
        int activeDeleteTasks = directoryProvider.pendingDeleteTasks();
        boolean wasInterrupted = false;
        long endTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(1000L);
        while (activeDeleteTasks > 0 && endTime - System.nanoTime() > 0L) {
            try {
                Thread.sleep(10L);
                log.debugf("Waiting for pending delete tasks, remaining: %s", activeDeleteTasks);
                activeDeleteTasks = directoryProvider.pendingDeleteTasks();
            }
            catch (InterruptedException e) {
                wasInterrupted = true;
            }
        }
        if (wasInterrupted) {
            Thread.currentThread().interrupt();
        }
        log.debugf("Flushing directory provider", new Object[0]);
        super.flushAndReleaseResources();
    }

    private Object stringToKey(String key) {
        return this.keyTransformationHandler.stringToKey(key, this.cache.getAdvancedCache().getClassLoader());
    }

    private int getSegment(Object key) {
        return this.distributionManager.getConsistentHash().getSegment(key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Address getLockHolder(String indexName) {
        log.debugf("Getting lock holder for %s", indexName);
        Transaction tx = this.transactionHelper.suspendTxIfExists();
        try {
            InfinispanDirectoryProvider directoryProvider = (InfinispanDirectoryProvider)this.getDirectoryProvider();
            Address address = directoryProvider.getLockOwner(indexName, "write.lock");
            return address;
        }
        finally {
            this.transactionHelper.resume(tx);
        }
    }

    private Address getLocation(LuceneWork work) {
        Address destination;
        Address localAddress = this.rpcManager.getAddress();
        if (work.getIdInString() == null) {
            return localAddress;
        }
        int segment = this.getSegment(this.stringToKey(work.getIdInString()));
        Address lockHolder = this.getLockHolder(this.replaceShard(this.getIndexName(), segment));
        log.debugf("Lock holder for %s is %s", this.getIndexName(), lockHolder);
        if (!this.hasOwnership) {
            log.debugf("Lost ownership, new owner is %s", this.nextOwner);
            destination = this.nextOwner;
        } else {
            destination = lockHolder != null && !localAddress.equals(lockHolder) ? lockHolder : localAddress;
        }
        return destination;
    }

    @SafeVarargs
    private static final <T> List<T> newList(T ... elements) {
        ArrayList list = new ArrayList(elements.length);
        Collections.addAll(list, elements);
        return list;
    }

    private Map<Address, List<LuceneWork>> partitionWorkByAddress(List<LuceneWork> works) {
        return works.stream().collect(Collectors.toMap(this::getLocation, xva$0 -> AffinityIndexManager.newList(xva$0), (w1, w2) -> {
            w1.addAll(w2);
            return w1;
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void performOperations(List<LuceneWork> workList, IndexingMonitor monitor) {
        if (this.cache.getCacheConfiguration().clustering().cacheMode().isClustered()) {
            Address primaryOwner;
            Address localAddress = this.rpcManager.getAddress();
            this.readLock.lock();
            try {
                Map<Address, List<LuceneWork>> workByAddress = this.partitionWorkByAddress(workList);
                log.debugf("Applying work @ %s, workMap is %s", localAddress, workByAddress);
                List<LuceneWork> localWork = workByAddress.get(localAddress);
                if (localWork != null && !localWork.isEmpty()) {
                    log.debugf("About to apply work locally %s, hasOwnership=%s", localWork, this.hasOwnership);
                    super.performOperations(localWork, monitor);
                    log.debugf("Work applied", new Object[0]);
                    workByAddress.remove(localAddress);
                }
                workByAddress.entrySet().forEach(entry -> this.sendWork((List)entry.getValue(), (Address)entry.getKey()));
            }
            finally {
                this.readLock.unlock();
            }
            if (!this.distributionManager.isRehashInProgress() && !localAddress.equals(primaryOwner = this.distributionManager.getConsistentHash().locatePrimaryOwnerForSegment(this.segment))) {
                log.debugf("%s is not owner anymore, releasing resources", this.rpcManager.getAddress());
                this.handleOwnershipLost(primaryOwner);
            }
        } else {
            super.performOperations(workList, monitor);
        }
    }

    private void sendWork(List<LuceneWork> works, Address destination) {
        AffinityUpdateCommand indexUpdateCommand = new AffinityUpdateCommand(ByteString.fromString((String)this.cache.getName()));
        byte[] serializedModel = this.getSerializer().toSerializedModel(works);
        indexUpdateCommand.setSerializedWorkList(serializedModel);
        List<Address> dest = Collections.singletonList(destination);
        log.debugf("Sending works %s to %s", works, dest);
        CompletableFuture result = this.rpcManager.invokeRemotelyAsync(dest, (ReplicableCommand)indexUpdateCommand, this.rpcManager.getDefaultRpcOptions(false));
        result.whenComplete((responses, error) -> {
            if (error != null) {
                log.error("Error forwarding index job", (Throwable)error);
            }
        });
    }

    protected DirectoryProvider<?> createDirectoryProvider(String indexName, Properties cfg, WorkerBuildContext buildContext) {
        String shardName = indexName.substring(indexName.lastIndexOf(".") + 1);
        InfinispanDirectoryProvider directoryProvider = new InfinispanDirectoryProvider(Integer.valueOf(shardName).intValue());
        directoryProvider.initialize(indexName, cfg, (BuildContext)buildContext);
        return directoryProvider;
    }

    private String replaceShard(String indexName, int newSegment) {
        return indexName.substring(0, indexName.lastIndexOf(".") + 1).concat(String.valueOf(newSegment));
    }

    @TopologyChanged
    public void onTopologyChange(TopologyChangedEvent<?, ?> tce) {
        log.debugf("Topology changed notification for %s: %s", this.getIndexName(), tce);
        Address localAddress = this.rpcManager.getAddress();
        Address previousOwner = tce.getConsistentHashAtStart().locatePrimaryOwnerForSegment(this.segment);
        Address newOwner = tce.getConsistentHashAtEnd().locatePrimaryOwnerForSegment(this.segment);
        if (previousOwner.equals(localAddress) && !newOwner.equals(localAddress)) {
            this.handleOwnershipLost(newOwner);
        } else if (!previousOwner.equals(localAddress) && newOwner.equals(localAddress)) {
            this.handleOwnershipAcquired();
        }
    }
}

