package org.infinispan.persistence.remote.upgrade;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Util;
import org.infinispan.container.versioning.NumericVersion;
import org.infinispan.context.Flag;
import org.infinispan.factories.threads.DefaultThreadFactory;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/persistence/remote/upgrade/MigrationTask.class */
public class MigrationTask implements Function<EmbeddedCacheManager, Integer> {
    private static final Log log = (Log) LogFactory.getLog(MigrationTask.class, Log.class);
    private static final String THREAD_NAME = "RollingUpgrade-MigrationTask";
    private final String cacheName;
    private final Set<Integer> segments;
    private final int readBatch;
    private final int threads;
    private final Set<WrappedByteArray> deletedKeys = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:org/infinispan/persistence/remote/upgrade/MigrationTask$Externalizer.class */
    public static class Externalizer extends AbstractExternalizer<MigrationTask> {
        public Set<Class<? extends MigrationTask>> getTypeClasses() {
            return Collections.singleton(MigrationTask.class);
        }

        public void writeObject(ObjectOutput objectOutput, MigrationTask migrationTask) throws IOException {
            objectOutput.writeObject(migrationTask.cacheName);
            UnsignedNumeric.writeUnsignedInt(objectOutput, migrationTask.readBatch);
            UnsignedNumeric.writeUnsignedInt(objectOutput, migrationTask.threads);
            BitSet bitSet = new BitSet();
            Iterator it = migrationTask.segments.iterator();
            while (it.hasNext()) {
                bitSet.set(((Integer) it.next()).intValue());
            }
            UnsignedNumeric.writeUnsignedInt(objectOutput, bitSet.toByteArray().length);
            objectOutput.write(bitSet.toByteArray());
        }

        /* renamed from: readObject, reason: merged with bridge method [inline-methods] */
        public MigrationTask m33readObject(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            String str = (String) objectInput.readObject();
            int readUnsignedInt = UnsignedNumeric.readUnsignedInt(objectInput);
            int readUnsignedInt2 = UnsignedNumeric.readUnsignedInt(objectInput);
            byte[] bArr = new byte[UnsignedNumeric.readUnsignedInt(objectInput)];
            objectInput.read(bArr);
            return new MigrationTask(str, (Set) BitSet.valueOf(bArr).stream().boxed().collect(Collectors.toSet()), readUnsignedInt, readUnsignedInt2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Listener(clustered = true)
    /* loaded from: input_file:org/infinispan/persistence/remote/upgrade/MigrationTask$RemoveListener.class */
    public class RemoveListener {
        private RemoveListener() {
        }

        @CacheEntryRemoved
        public void entryRemoved(CacheEntryRemovedEvent cacheEntryRemovedEvent) {
            MigrationTask.this.deletedKeys.add(new WrappedByteArray((byte[]) cacheEntryRemovedEvent.getKey()));
        }
    }

    public MigrationTask(String str, Set<Integer> set, int i, int i2) {
        this.cacheName = str;
        this.segments = set;
        this.readBatch = i;
        this.threads = i2;
    }

    @Override // java.util.function.Function
    public Integer apply(EmbeddedCacheManager embeddedCacheManager) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.threads, new DefaultThreadFactory((ThreadGroup) null, 1, "RollingUpgrade-MigrationTask-%t", (String) null, (String) null));
        RemoveListener removeListener = null;
        Cache<Object, Object> cache = embeddedCacheManager.getCache(this.cacheName);
        try {
            Set stores = ((PersistenceManager) cache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class)).getStores(RemoteStore.class);
            MigrationMarshaller migrationMarshaller = new MigrationMarshaller(cache.getCacheManager().getClassWhiteList());
            removeListener = new RemoveListener();
            cache.addFilteredListener(removeListener, new RemovedFilter(), (CacheEventConverter) null, Util.asSet(new Class[]{CacheEntryRemoved.class}));
            try {
                byte[] objectToByteBuffer = migrationMarshaller.objectToByteBuffer("___MigrationManager_HotRod_KnownKeys___");
                Iterator it = stores.iterator();
                if (it.hasNext()) {
                    RemoteStore remoteStore = (RemoteStore) it.next();
                    RemoteCache<Object, Object> remoteCache = remoteStore.getRemoteCache();
                    if (!remoteStore.getConfiguration().hotRodWrapping()) {
                        throw log.remoteStoreNoHotRodWrapping(cache.getName());
                    }
                    migrateEntriesWithMetadata(remoteCache, atomicInteger, objectToByteBuffer, newFixedThreadPool, cache);
                    HotRodMigratorHelper.awaitTermination(newFixedThreadPool);
                }
                if (removeListener != null) {
                    cache.removeListener(removeListener);
                }
                newFixedThreadPool.shutdown();
                return Integer.valueOf(atomicInteger.get());
            } catch (Exception e) {
                throw new CacheException(e);
            }
        } catch (Throwable th) {
            if (removeListener != null) {
                cache.removeListener(removeListener);
            }
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    private void migrateEntriesWithMetadata(RemoteCache<Object, Object> remoteCache, AtomicInteger atomicInteger, byte[] bArr, ExecutorService executorService, Cache<Object, Object> cache) {
        CloseableIterator retrieveEntriesWithMetadata = remoteCache.retrieveEntriesWithMetadata(this.segments, this.readBatch);
        while (retrieveEntriesWithMetadata.hasNext() && !Thread.currentThread().isInterrupted()) {
            try {
                Map.Entry entry = (Map.Entry) retrieveEntriesWithMetadata.next();
                if (!Arrays.equals((byte[]) entry.getKey(), bArr)) {
                    MetadataValue metadataValue = (MetadataValue) entry.getValue();
                    int lifespan = metadataValue.getLifespan();
                    Metadata build = new EmbeddedMetadata.Builder().version(new NumericVersion(metadataValue.getVersion())).lifespan(lifespan, TimeUnit.SECONDS).maxIdle(metadataValue.getMaxIdle(), TimeUnit.SECONDS).build();
                    executorService.submit(() -> {
                        if (!this.deletedKeys.contains(new WrappedByteArray((byte[]) entry.getKey()))) {
                            cache.getAdvancedCache().withFlags(new Flag[]{Flag.SKIP_CACHE_LOAD, Flag.ROLLING_UPGRADE}).putIfAbsent(entry.getKey(), ((MetadataValue) entry.getValue()).getValue(), new InternalMetadataImpl(build, metadataValue.getCreated(), metadataValue.getLastUsed()));
                        }
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (log.isDebugEnabled() && incrementAndGet % 100 == 0) {
                            log.debugf(">>    Migrated %s entries\n", incrementAndGet);
                        }
                    });
                }
            } catch (Throwable th) {
                if (retrieveEntriesWithMetadata != null) {
                    try {
                        retrieveEntriesWithMetadata.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (retrieveEntriesWithMetadata != null) {
            retrieveEntriesWithMetadata.close();
        }
    }
}
