package org.infinispan.client.hotrod.stress;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientEvent;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.fwk.TestResourceTracker;
import org.testng.annotations.Test;

@Test(groups = {"stress"}, testName = "client.hotrod.event.ClusterClientEventStressTest", timeOut = 900000)
/* loaded from: input_file:org/infinispan/client/hotrod/stress/ClusterClientEventStressTest.class */
public class ClusterClientEventStressTest extends MultiHotRodServersTest {
    static final int NUM_SERVERS = 3;
    static final int NUM_OWNERS = 2;
    static final int NUM_CLIENTS = 1;
    static final int NUM_THREADS_PER_CLIENT = 6;
    static final int NUM_OPERATIONS = 1000;
    static final int NUM_EVENTS = 6000;
    static ClientEntryListener listener;
    private static final Log log = LogFactory.getLog(ClusterClientEventStressTest.class);
    static Set<String> ALL_KEYS = ConcurrentHashMap.newKeySet();
    static ExecutorService EXEC = Executors.newCachedThreadPool();

    /* JADX INFO: Access modifiers changed from: package-private */
    @ClientListener
    /* loaded from: input_file:org/infinispan/client/hotrod/stress/ClusterClientEventStressTest$ClientEntryListener.class */
    public static class ClientEntryListener {
        final AtomicInteger count = new AtomicInteger();

        ClientEntryListener() {
        }

        @ClientCacheEntryCreated
        @ClientCacheEntryModified
        public void handleClientEvent(ClientEvent clientEvent) {
            int incrementAndGet = this.count.incrementAndGet();
            if (incrementAndGet % 100 == 0) {
                ClusterClientEventStressTest.log.debugf("Reached %s", incrementAndGet);
            }
        }
    }

    /* loaded from: input_file:org/infinispan/client/hotrod/stress/ClusterClientEventStressTest$Put.class */
    static class Put implements Callable<Void> {
        static final ThreadLocalRandom R = ThreadLocalRandom.current();
        final CyclicBarrier barrier;
        final RemoteCache<String, String> remote;
        final List<HotRodServer> servers;
        final List<String> keys;

        public Put(String str, CyclicBarrier cyclicBarrier, RemoteCache<String, String> remoteCache, List<HotRodServer> list) {
            this.barrier = cyclicBarrier;
            this.remote = remoteCache;
            this.servers = list;
            this.keys = ClusterClientEventStressTest.generateKeys(str, list, R);
            Collections.shuffle(this.keys);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            ClusterClientEventStressTest.barrierAwait(this.barrier);
            for (int i = 0; i < 1000; i += ClusterClientEventStressTest.NUM_CLIENTS) {
                try {
                    String str = this.keys.get(i);
                    this.remote.put(str, str);
                    if (str.startsWith("c0-t0") && i == 500) {
                        ClusterClientEventStressTest.listener = new ClientEntryListener();
                        this.remote.addClientListener(ClusterClientEventStressTest.listener);
                    }
                } finally {
                    ClusterClientEventStressTest.barrierAwait(this.barrier);
                }
            }
            return null;
        }
    }

    protected void createCacheManagers() throws Throwable {
        createHotRodServers(NUM_SERVERS, getCacheConfiguration());
    }

    private ConfigurationBuilder getCacheConfiguration() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
        defaultClusteredCacheConfig.clustering().hash().numOwners(NUM_OWNERS).expiration().maxIdle(1000L).wakeUpInterval(5000L).jmxStatistics().enable();
        return HotRodTestingUtil.hotRodCacheConfiguration(defaultClusteredCacheConfig);
    }

    RemoteCacheManager getRemoteCacheManager(int i) {
        org.infinispan.client.hotrod.configuration.ConfigurationBuilder newRemoteConfigurationBuilder = HotRodClientTestingUtil.newRemoteConfigurationBuilder();
        newRemoteConfigurationBuilder.addServer().host("127.0.0.1").port(i);
        InternalRemoteCacheManager internalRemoteCacheManager = new InternalRemoteCacheManager(newRemoteConfigurationBuilder.build());
        internalRemoteCacheManager.getCache();
        return internalRemoteCacheManager;
    }

    Map<String, RemoteCacheManager> createClients() {
        HashMap hashMap = new HashMap(NUM_CLIENTS);
        for (int i = 0; i < NUM_CLIENTS; i += NUM_CLIENTS) {
            hashMap.put("c" + i, getRemoteCacheManager(server(0).getPort().intValue()));
        }
        return hashMap;
    }

    public void testAddClientListenerDuringOperations() {
        TestResourceTracker.testThreadStarted(this);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7);
        ArrayList arrayList = new ArrayList(NUM_THREADS_PER_CLIENT);
        ArrayList arrayList2 = new ArrayList(NUM_CLIENTS);
        for (Map.Entry<String, RemoteCacheManager> entry : createClients().entrySet()) {
            RemoteCache cache = entry.getValue().getCache();
            ClientEntryListener clientEntryListener = new ClientEntryListener();
            arrayList2.add(clientEntryListener);
            cache.addClientListener(clientEntryListener);
            for (int i = 0; i < NUM_THREADS_PER_CLIENT; i += NUM_CLIENTS) {
                arrayList.add(EXEC.submit(new Put(String.format("%s-t%d-", entry.getKey(), Integer.valueOf(i)), cyclicBarrier, cache, this.servers)));
            }
        }
        barrierAwait(cyclicBarrier);
        barrierAwait(cyclicBarrier);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            futureGet((Future) it.next());
        }
        log.debugf("Stats asserted, wait for events...", new Object[0]);
        eventuallyEquals(6000, () -> {
            return Integer.valueOf(countEvents(arrayList2));
        });
    }

    int countEvents(List<ClientEntryListener> list) {
        Integer num = (Integer) list.stream().reduce(0, (num2, clientEntryListener) -> {
            return Integer.valueOf(num2.intValue() + clientEntryListener.count.get());
        }, (num3, num4) -> {
            return Integer.valueOf(num3.intValue() + num4.intValue());
        });
        log.infof("Event count is %d, target %d%n", Integer.valueOf(num.intValue()), 6000);
        return num.intValue();
    }

    /* JADX WARN: Multi-variable type inference failed */
    static List<String> generateKeys(String str, List<HotRodServer> list, ThreadLocalRandom threadLocalRandom) {
        ArrayList arrayList = new ArrayList();
        List asList = Arrays.asList(new HotRodServer[]{list.get(0), list.get(NUM_CLIENTS)}, new HotRodServer[]{list.get(NUM_CLIENTS), list.get(0)}, new HotRodServer[]{list.get(NUM_CLIENTS), list.get(NUM_OWNERS)}, new HotRodServer[]{list.get(NUM_OWNERS), list.get(NUM_CLIENTS)}, new HotRodServer[]{list.get(NUM_OWNERS), list.get(0)}, new HotRodServer[]{list.get(0), list.get(NUM_OWNERS)});
        for (int i = 0; i < 1000; i += NUM_CLIENTS) {
            String stringKey = getStringKey(str, (HotRodServer[]) asList.get(i % asList.size()), threadLocalRandom);
            if (ALL_KEYS.contains(stringKey)) {
                throw new AssertionError("Key already in use: " + stringKey);
            }
            arrayList.add(stringKey);
            ALL_KEYS.add(stringKey);
        }
        return arrayList;
    }

    static String getStringKey(String str, HotRodServer[] hotRodServerArr, ThreadLocalRandom threadLocalRandom) {
        String str2;
        Cache cache = hotRodServerArr[0].getCacheManager().getCache();
        Cache cache2 = hotRodServerArr[NUM_CLIENTS].getCacheManager().getCache();
        int i = 1000;
        do {
            str2 = str + Integer.valueOf(threadLocalRandom.nextInt());
            byte[] bytes = HotRodClientTestingUtil.toBytes(str2);
            i--;
            if (DistributionTestHelper.isFirstOwner(cache, bytes) && DistributionTestHelper.isOwner(cache2, bytes)) {
                break;
            }
        } while (i >= 0);
        if (i < 0) {
            throw new IllegalStateException("Could not find any key owned by " + cache + " as primary owner and " + cache2 + " as secondary owner");
        }
        log.infof("Integer key %s hashes to primary [cluster=%s,hotrod=%s] and secondary [cluster=%s,hotrod=%s]", new Object[]{str2, cache.getCacheManager().getAddress(), hotRodServerArr[0].getAddress(), cache2.getCacheManager().getAddress(), hotRodServerArr[NUM_CLIENTS].getAddress()});
        return str2;
    }

    static int barrierAwait(CyclicBarrier cyclicBarrier) {
        try {
            return cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new AssertionError(e);
        }
    }

    static <T> T futureGet(Future<T> future) {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new AssertionError(e);
        }
    }
}
