package org.infinispan.client.hotrod.counter;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.infinispan.client.hotrod.counter.impl.NotificationManager;
import org.infinispan.client.hotrod.counter.impl.RemoteCounterManager;
import org.infinispan.commons.test.ExceptionRunnable;
import org.infinispan.counter.api.CounterEvent;
import org.infinispan.counter.api.CounterListener;
import org.infinispan.counter.api.Handle;
import org.infinispan.server.hotrod.counter.impl.BaseCounterImplTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

@Test(groups = {"functional"})
/* loaded from: input_file:org/infinispan/client/hotrod/counter/BaseCounterAPITest.class */
public abstract class BaseCounterAPITest<T> extends AbstractCounterTest {
    private static final CounterListener EXCEPTION_LISTENER;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/infinispan/client/hotrod/counter/BaseCounterAPITest$IncrementTask.class */
    private class IncrementTask implements ExceptionRunnable {
        private final T counter;
        private volatile boolean run;

        private IncrementTask(T t) {
            this.counter = t;
            this.run = true;
        }

        public void run() {
            while (this.run) {
                BaseCounterAPITest.this.increment(this.counter);
            }
        }

        void stop() {
            this.run = false;
        }
    }

    public void testExceptionInListener(Method method) throws InterruptedException {
        T defineAndCreateCounter = defineAndCreateCounter(method.getName(), 0L);
        Handle<L> addListenerTo = addListenerTo(defineAndCreateCounter, new BaseCounterImplTest.EventLogger());
        Handle<L> addListenerTo2 = addListenerTo(defineAndCreateCounter, EXCEPTION_LISTENER);
        Handle<L> addListenerTo3 = addListenerTo(defineAndCreateCounter, new BaseCounterImplTest.EventLogger());
        add(defineAndCreateCounter, 1L, 1L);
        add(defineAndCreateCounter, -1L, 0L);
        add(defineAndCreateCounter, 10L, 10L);
        add(defineAndCreateCounter, 1L, 11L);
        add(defineAndCreateCounter, 2L, 13L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo, 0L, 1L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo, 1L, 0L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo, 0L, 10L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo, 10L, 11L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo, 11L, 13L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo3, 0L, 1L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo3, 1L, 0L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo3, 0L, 10L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo3, 10L, 11L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo3, 11L, 13L);
        BaseCounterImplTest.assertNoEvents(addListenerTo);
        BaseCounterImplTest.assertNoEvents(addListenerTo3);
        addListenerTo.remove();
        addListenerTo3.remove();
        addListenerTo2.remove();
    }

    public void testConcurrentListenerAddAndRemove(Method method) throws InterruptedException {
        String name = method.getName();
        defineAndCreateCounter(name, 1L);
        List<T> counters = getCounters(name);
        List list = (List) counters.stream().map(obj -> {
            return new IncrementTask(obj);
        }).collect(Collectors.toList());
        List list2 = (List) list.stream().map((v1) -> {
            return fork(v1);
        }).collect(Collectors.toList());
        T t = counters.get(0);
        Handle<BaseCounterImplTest.EventLogger> addListenerTo = addListenerTo(t, new BaseCounterImplTest.EventLogger());
        eventually(() -> {
            return addListenerTo.getCounterListener().size() > 5;
        });
        addListenerTo.remove();
        list.forEach((v0) -> {
            v0.stop();
        });
        list2.forEach(this::awaitFuture);
        drainAndCheckEvents(addListenerTo);
        BaseCounterImplTest.assertNoEvents(addListenerTo);
        increment(t);
        BaseCounterImplTest.assertNoEvents(addListenerTo);
    }

    public void testListenerFailover(Method method) throws Exception {
        T defineAndCreateCounter = defineAndCreateCounter(method.getName(), 2L);
        Handle<L> addListenerTo = addListenerTo(defineAndCreateCounter, new BaseCounterImplTest.EventLogger());
        add(defineAndCreateCounter, 1L, 3L);
        BaseCounterImplTest.assertNextValidEvent(addListenerTo, 2L, 3L);
        InetSocketAddress findEventServer = findEventServer();
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= this.servers.size()) {
                break;
            }
            if (this.servers.get(i2).getAddress().getPort() == findEventServer.getPort()) {
                i = i2;
                break;
            }
            i2++;
        }
        if (!$assertionsDisabled && i == -1) {
            throw new AssertionError();
        }
        try {
            killServer(i);
            add(defineAndCreateCounter, 1L, 4L);
            add(defineAndCreateCounter, 1L, 5L);
            CounterEvent waitingPoll = addListenerTo.getCounterListener().waitingPoll();
            if (waitingPoll.getOldValue() == 3) {
                BaseCounterImplTest.assertValidEvent(waitingPoll, 3L, 4L);
                BaseCounterImplTest.assertNextValidEvent(addListenerTo, 4L, 5L);
            } else {
                BaseCounterImplTest.assertValidEvent(waitingPoll, 4L, 5L);
            }
            addListenerTo.remove();
            TestingUtil.waitForNoRebalance(caches("org.infinispan.COUNTER"));
        } catch (Throwable th) {
            TestingUtil.waitForNoRebalance(caches("org.infinispan.COUNTER"));
            throw th;
        }
    }

    abstract void increment(T t);

    abstract void add(T t, long j, long j2);

    abstract T defineAndCreateCounter(String str, long j);

    abstract <L extends CounterListener> Handle<L> addListenerTo(T t, L l);

    abstract List<T> getCounters(String str);

    private InetSocketAddress findEventServer() {
        return (InetSocketAddress) ((SocketAddress) TestingUtil.extractField(TestingUtil.extractField(NotificationManager.class, TestingUtil.extractField(RemoteCounterManager.class, counterManager(), "notificationManager"), "dispatcher"), "address"));
    }

    private void awaitFuture(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Object, org.infinispan.counter.api.CounterEvent] */
    private void drainAndCheckEvents(Handle<BaseCounterImplTest.EventLogger> handle) throws InterruptedException {
        ?? waitingPoll = handle.getCounterListener().waitingPoll();
        log.tracef("First Event=%s", (Object) waitingPoll);
        long oldValue = waitingPoll.getOldValue();
        long j = waitingPoll;
        BaseCounterImplTest.assertValidEvent((CounterEvent) waitingPoll, oldValue, oldValue + 1);
        while (true) {
            CounterEvent poll = handle.getCounterListener().poll();
            if (poll == null) {
                return;
            }
            log.tracef("Next Event=%s", poll);
            long j2 = j;
            long j3 = j + 1;
            j = j3;
            BaseCounterImplTest.assertValidEvent(poll, j2, j3);
        }
    }

    static {
        $assertionsDisabled = !BaseCounterAPITest.class.desiredAssertionStatus();
        EXCEPTION_LISTENER = counterEvent -> {
            throw new RuntimeException("induced");
        };
    }
}
