package org.infinispan.persistence.remote;

import io.reactivex.Flowable;
import java.io.IOException;
import java.util.function.Predicate;
import java.util.function.ToIntBiFunction;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.eviction.EvictionType;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.BaseStoreTest;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.persistence.spi.SegmentedAdvancedLoadWriteStore;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(testName = "persistence.remote.RemoteStoreTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/persistence/remote/RemoteStoreTest.class */
public class RemoteStoreTest extends BaseStoreTest {
    private static final String REMOTE_CACHE = "remote-cache";
    private EmbeddedCacheManager localCacheManager;
    private HotRodServer hrServer;

    protected AdvancedLoadWriteStore createStore() throws Exception {
        ConfigurationBuilder defaultCacheConfiguration = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        defaultCacheConfiguration.memory().evictionType(EvictionType.COUNT).size(120L).expiration().wakeUpInterval(10L);
        defaultCacheConfiguration.clustering().cacheMode(CacheMode.DIST_SYNC);
        GlobalConfigurationBuilder clusteredDefault = new GlobalConfigurationBuilder().clusteredDefault();
        clusteredDefault.globalJmxStatistics().defaultCacheName(REMOTE_CACHE);
        this.localCacheManager = TestCacheManagerFactory.createClusteredCacheManager(clusteredDefault, HotRodTestingUtil.hotRodCacheConfiguration(defaultCacheConfiguration));
        this.localCacheManager.getCache(REMOTE_CACHE);
        TestingUtil.replaceComponent(this.localCacheManager, TimeService.class, this.timeService, true);
        this.localCacheManager.getCache(REMOTE_CACHE).getAdvancedCache().getComponentRegistry().rewire();
        this.hrServer = HotRodClientTestingUtil.startHotRodServer(this.localCacheManager);
        this.hrServer.setMarshaller(getMarshaller());
        ConfigurationBuilder defaultCacheConfiguration2 = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        defaultCacheConfiguration2.persistence().addStore(RemoteStoreConfigurationBuilder.class).remoteCacheName(REMOTE_CACHE).addServer().host(this.hrServer.getHost()).port(this.hrServer.getPort().intValue());
        RemoteStore remoteStore = new RemoteStore();
        remoteStore.init(createContext(defaultCacheConfiguration2.build()));
        return remoteStore;
    }

    protected StreamingMarshaller getMarshaller() {
        return this.localCacheManager.getCache(REMOTE_CACHE).getAdvancedCache().getComponentRegistry().getCacheMarshaller();
    }

    @AfterMethod
    public void tearDown() {
        HotRodClientTestingUtil.killServers(new HotRodServer[]{this.hrServer});
        TestingUtil.killCacheManagers(new EmbeddedCacheManager[]{this.localCacheManager});
    }

    protected boolean storePurgesAllExpired() {
        return false;
    }

    public void testReplaceExpiredEntry() throws Exception {
        this.cl.write(marshalledEntry(internalCacheEntry("k1", "v1", 100L)));
        this.timeService.advance(1101L);
        AssertJUnit.assertNull(this.cl.loadEntry("k1"));
        long currentTimeMillis = System.currentTimeMillis();
        this.cl.write(marshalledEntry(internalCacheEntry("k1", "v2", 100L)));
        AssertJUnit.assertTrue(this.cl.loadEntry("k1").getValue().equals("v2") || TestingUtil.moreThanDurationElapsed(currentTimeMillis, 100L));
    }

    void countWithSegments(ToIntBiFunction<SegmentedAdvancedLoadWriteStore<?, ?>, IntSet> toIntBiFunction) {
        Cache cache = this.localCacheManager.getCache(REMOTE_CACHE);
        RemoteStore remoteStore = this.cl;
        remoteStore.write(marshalledEntry(internalCacheEntry("k1", "v1", 100L)));
        CloseableIterator it = cache.keySet().iterator();
        AssertJUnit.assertTrue(it.hasNext());
        byte[] bArr = (byte[]) it.next();
        AssertJUnit.assertFalse(it.hasNext());
        int segment = ((KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class)).getSegment(bArr);
        AssertJUnit.assertEquals(1, toIntBiFunction.applyAsInt(remoteStore, IntSets.immutableSet(segment)));
        int numSegments = cache.getCacheConfiguration().clustering().hash().numSegments();
        IntSet mutableEmptySet = IntSets.mutableEmptySet(numSegments);
        for (int i = 0; i < numSegments; i++) {
            if (i != segment) {
                mutableEmptySet.set(i);
            }
        }
        AssertJUnit.assertEquals(0, toIntBiFunction.applyAsInt(remoteStore, mutableEmptySet));
    }

    public void testPublishKeysWithSegments() throws IOException, InterruptedException {
        countWithSegments((segmentedAdvancedLoadWriteStore, intSet) -> {
            return ((Long) Flowable.fromPublisher(segmentedAdvancedLoadWriteStore.publishKeys(intSet, (Predicate) null)).count().blockingGet()).intValue();
        });
    }

    public void testPublishEntriesWithSegments() throws IOException, InterruptedException {
        countWithSegments((segmentedAdvancedLoadWriteStore, intSet) -> {
            return ((Long) Flowable.fromPublisher(segmentedAdvancedLoadWriteStore.entryPublisher(intSet, (Predicate) null, true, true)).count().blockingGet()).intValue();
        });
    }
}
