package org.infinispan.client.hotrod.retry;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.impl.AbstractClientEvent;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.protocol.Codec25;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.configuration.ClassWhiteList;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "client.hotrod.retry.ClientListenerRetryTest")
/* loaded from: input_file:org/infinispan/client/hotrod/retry/ClientListenerRetryTest.class */
public class ClientListenerRetryTest extends MultiHotRodServersTest {
    private AtomicInteger counter = new AtomicInteger(0);
    private FailureInducingCodec failureInducingCodec = new FailureInducingCodec();

    /* loaded from: input_file:org/infinispan/client/hotrod/retry/ClientListenerRetryTest$FailureInducingCodec.class */
    private static class FailureInducingCodec extends Codec25 {
        private volatile boolean failure;
        private final IOException failWith;

        private FailureInducingCodec() {
            this.failWith = new IOException("Connection reset by peer");
        }

        public AbstractClientEvent readCacheEvent(ByteBuf byteBuf, Function<byte[], DataFormat> function, short s, ClassWhiteList classWhiteList, SocketAddress socketAddress) {
            if (this.failure) {
                throw new TransportException(this.failWith, socketAddress);
            }
            return super.readCacheEvent(byteBuf, function, s, classWhiteList, socketAddress);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void induceFailure() {
            this.failure = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetFailure() {
            this.failure = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ClientListener
    /* loaded from: input_file:org/infinispan/client/hotrod/retry/ClientListenerRetryTest$Listener.class */
    public static class Listener {
        private final AtomicInteger count;

        private Listener() {
            this.count = new AtomicInteger(0);
        }

        @ClientCacheEntryCreated
        public void handleCreatedEvent(ClientCacheEntryCreatedEvent<?> clientCacheEntryCreatedEvent) {
            this.count.incrementAndGet();
        }

        int getReceived() {
            return this.count.intValue();
        }
    }

    protected void createCacheManagers() throws Throwable {
        createHotRodServers(2, getCacheConfiguration());
        this.clients.forEach(remoteCacheManager -> {
            TestingUtil.replaceField(this.failureInducingCodec, "codec", TestingUtil.extractField(remoteCacheManager, "listenerNotifier"), ClientListenerNotifier.class);
        });
    }

    private ConfigurationBuilder getCacheConfiguration() {
        return HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
    }

    @Test
    public void testConnectionDrop() throws Exception {
        RemoteCache<Integer, String> cache = client(0).getCache();
        Listener listener = new Listener();
        cache.addClientListener(listener);
        assertListenerActive(cache, listener);
        this.failureInducingCodec.induceFailure();
        addItems(cache, 10);
        this.failureInducingCodec.resetFailure();
        assertListenerActive(cache, listener);
    }

    private void addItems(RemoteCache<Integer, String> remoteCache, int i) {
        IntStream.range(0, i).forEach(i2 -> {
            remoteCache.put(Integer.valueOf(this.counter.incrementAndGet()), "value");
        });
    }

    private void assertListenerActive(RemoteCache<Integer, String> remoteCache, Listener listener) {
        int received = listener.getReceived();
        eventually(() -> {
            remoteCache.put(Integer.valueOf(this.counter.incrementAndGet()), "value");
            return listener.getReceived() > received;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.client.hotrod.test.MultiHotRodServersTest
    public int maxRetries() {
        return 10;
    }
}
