package org.infinispan.client.hotrod.stress;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.event.ClientEvent;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Test;

@Test(groups = {"stress"}, testName = "client.hotrod.event.ClientEventStressTest", timeOut = 900000)
/* loaded from: input_file:org/infinispan/client/hotrod/stress/ClientEventStressTest.class */
public class ClientEventStressTest extends SingleCacheManagerTest {
    private static final Log log = LogFactory.getLog(ClientEventStressTest.class);
    static int NUM_CLIENTS = 3;
    static int NUM_THREADS_PER_CLIENT = 10;
    static final int NUM_OPERATIONS = 10000;
    static final int NUM_EVENTS = ((NUM_OPERATIONS * NUM_THREADS_PER_CLIENT) * NUM_CLIENTS) * NUM_CLIENTS;
    static ExecutorService EXEC = Executors.newCachedThreadPool();
    HotRodServer hotrodServer;

    /* JADX INFO: Access modifiers changed from: package-private */
    @ClientListener
    /* loaded from: input_file:org/infinispan/client/hotrod/stress/ClientEventStressTest$ClientEntryListener.class */
    public static class ClientEntryListener {
        final AtomicInteger count = new AtomicInteger();

        ClientEntryListener() {
        }

        @ClientCacheEntryCreated
        @ClientCacheEntryModified
        public void handleClientEvent(ClientEvent clientEvent) {
            int incrementAndGet = this.count.incrementAndGet();
            if (incrementAndGet % 100 == 0) {
                ClientEventStressTest.log.debugf("Reached %s", incrementAndGet);
            }
        }
    }

    /* loaded from: input_file:org/infinispan/client/hotrod/stress/ClientEventStressTest$Put.class */
    static class Put implements Callable<Void> {
        static final ThreadLocalRandom R = ThreadLocalRandom.current();
        final CyclicBarrier barrier;
        final RemoteCache<Integer, Integer> remote;

        public Put(CyclicBarrier cyclicBarrier, RemoteCache<Integer, Integer> remoteCache) {
            this.barrier = cyclicBarrier;
            this.remote = remoteCache;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            ClientEventStressTest.barrierAwait(this.barrier);
            for (int i = 0; i < ClientEventStressTest.NUM_OPERATIONS; i++) {
                try {
                    int nextInt = R.nextInt(Integer.MAX_VALUE);
                    this.remote.put(Integer.valueOf(nextInt), Integer.valueOf(nextInt));
                } finally {
                    ClientEventStressTest.barrierAwait(this.barrier);
                }
            }
            return null;
        }
    }

    protected EmbeddedCacheManager createCacheManager() throws Exception {
        return TestCacheManagerFactory.createCacheManager();
    }

    protected void setup() throws Exception {
        super.setup();
        this.hotrodServer = HotRodClientTestingUtil.startHotRodServer(this.cacheManager);
    }

    RemoteCacheManager getRemoteCacheManager(int i) {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.addServer().host("127.0.0.1").port(i);
        return new InternalRemoteCacheManager(configurationBuilder.build());
    }

    public void testStressEvents() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier((NUM_CLIENTS * NUM_THREADS_PER_CLIENT) + 1);
        ArrayList arrayList = new ArrayList(NUM_CLIENTS * NUM_THREADS_PER_CLIENT);
        ArrayList arrayList2 = new ArrayList(NUM_CLIENTS);
        RemoteCacheManager[] remoteCacheManagerArr = new RemoteCacheManager[NUM_CLIENTS];
        for (int i = 0; i < NUM_CLIENTS; i++) {
            remoteCacheManagerArr[i] = getRemoteCacheManager(this.hotrodServer.getPort().intValue());
        }
        for (RemoteCacheManager remoteCacheManager : remoteCacheManagerArr) {
            RemoteCache cache = remoteCacheManager.getCache();
            ClientEntryListener clientEntryListener = new ClientEntryListener();
            arrayList2.add(clientEntryListener);
            cache.addClientListener(clientEntryListener);
            for (int i2 = 0; i2 < NUM_THREADS_PER_CLIENT; i2++) {
                arrayList.add(EXEC.submit(new Put(cyclicBarrier, cache)));
            }
        }
        barrierAwait(cyclicBarrier);
        barrierAwait(cyclicBarrier);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            futureGet((Future) it.next());
        }
        log.debugf("Put operations completed, wait for events...", new Object[0]);
        eventuallyEquals(Integer.valueOf(NUM_EVENTS), () -> {
            return Integer.valueOf(countEvents(arrayList2));
        });
    }

    int countEvents(List<ClientEntryListener> list) {
        Integer num = (Integer) list.stream().reduce(0, (num2, clientEntryListener) -> {
            return Integer.valueOf(num2.intValue() + clientEntryListener.count.get());
        }, (num3, num4) -> {
            return Integer.valueOf(num3.intValue() + num4.intValue());
        });
        log.debugf("Event count is %d, target %d%n", num.intValue(), NUM_EVENTS);
        return num.intValue();
    }

    static int barrierAwait(CyclicBarrier cyclicBarrier) {
        try {
            return cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new AssertionError(e);
        }
    }

    <T> T futureGet(Future<T> future) {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new AssertionError(e);
        }
    }
}
