package org.infinispan.client.hotrod.counter.impl;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import net.jcip.annotations.GuardedBy;
import org.infinispan.client.hotrod.counter.operation.AddListenerOperation;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.util.Util;

/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.5.3.Final-redhat-00002.jar:org/infinispan/client/hotrod/counter/impl/ConnectionManager.class */
public class ConnectionManager {
    private static final Log log = (Log) LogFactory.getLog(ConnectionManager.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final boolean debug = log.isDebugEnabled();
    private final byte[] listenerId = new byte[16];
    private final CounterOperationFactory factory;
    private final Consumer<HotRodCounterEvent> eventConsumer;
    private final Codec codec;
    private final ExecutorService executor;

    @GuardedBy("this")
    private final Set<String> registeredCounters;

    @GuardedBy("this")
    private EventDispatcher dispatcher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.5.3.Final-redhat-00002.jar:org/infinispan/client/hotrod/counter/impl/ConnectionManager$EventDispatcher.class */
    public class EventDispatcher implements Runnable {
        private final Transport transport;
        public volatile boolean run;

        private EventDispatcher(Transport transport) {
            this.run = true;
            this.transport = transport;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(ConnectionManager.this.getThreadName());
            while (this.run && !Thread.currentThread().isInterrupted()) {
                HotRodCounterEvent hotRodCounterEvent = null;
                try {
                    ConnectionManager.this.eventConsumer.accept(ConnectionManager.this.codec.readCounterEvent(this.transport, ConnectionManager.this.listenerId));
                    hotRodCounterEvent = null;
                } catch (CancelledKeyException e) {
                    ConnectionManager.debugMessage("Key cancelled, most likely channel closed, exiting event reader thread");
                    this.run = false;
                } catch (TransportException e2) {
                    handleTransportException(e2, hotRodCounterEvent);
                } catch (Throwable th) {
                    handleException(th, hotRodCounterEvent);
                }
            }
            if (ConnectionManager.trace) {
                ConnectionManager.log.trace("Dispatcher thread stopped!");
            }
        }

        private void handleException(Throwable th, HotRodCounterEvent hotRodCounterEvent) {
            if (hotRodCounterEvent != null) {
                ConnectionManager.log.unexpectedErrorConsumingEvent(hotRodCounterEvent, th);
            } else {
                ConnectionManager.log.unableToReadEventFromServer(th, this.transport.getRemoteSocketAddress());
            }
            if (this.transport.isValid()) {
                return;
            }
            this.run = false;
        }

        private void handleTransportException(TransportException transportException, HotRodCounterEvent hotRodCounterEvent) {
            Throwable cause = transportException.getCause();
            if ((cause instanceof ClosedChannelException) || ((cause instanceof SocketException) && !this.transport.isValid())) {
                ConnectionManager.debugMessage("Channel closed, exiting event reader thread");
                this.run = false;
                return;
            }
            if (cause instanceof SocketTimeoutException) {
                ConnectionManager.debugMessage("Timed out reading event, retry");
                return;
            }
            if (hotRodCounterEvent != null) {
                ConnectionManager.log.unexpectedErrorConsumingEvent(hotRodCounterEvent, transportException);
                return;
            }
            if ((cause instanceof IOException) && cause.getMessage().contains("Connection reset by peer")) {
                ConnectionManager.this.findAnotherServer();
                this.run = false;
            } else {
                ConnectionManager.log.unrecoverableErrorReadingEvent(transportException, this.transport.getRemoteSocketAddress());
                this.run = false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionManager(CounterOperationFactory counterOperationFactory, Consumer<HotRodCounterEvent> consumer) {
        this.factory = counterOperationFactory;
        this.codec = counterOperationFactory.getCodec();
        this.eventConsumer = consumer;
        ThreadLocalRandom.current().nextBytes(this.listenerId);
        this.executor = Executors.newCachedThreadPool(ClientListenerNotifier.getRestoreThreadNameThreadFactory());
        this.registeredCounters = new HashSet();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void debugMessage(String str) {
        if (debug) {
            log.debug(str);
        }
    }

    public synchronized void failedServers(Collection<SocketAddress> collection) {
        if (collection.contains(getServerInUse())) {
            findAnotherServer();
        }
    }

    public synchronized void stop() {
        removeConnection("");
        cleanupDispatcher();
        this.executor.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addConnection(String str) {
        createConnection(str);
        this.registeredCounters.add(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void removeConnection(String str) {
        if (trace) {
            log.tracef("Remove listener connection for counter '%s'", str);
        }
        if (this.factory.newRemoveListenerOperation(str, this.listenerId, getServerInUse()).execute().booleanValue()) {
            cleanupDispatcher();
        }
        this.registeredCounters.remove(str);
    }

    private void createConnection(String str) {
        AddListenerOperation newAddListenerOperation = this.factory.newAddListenerOperation(str, this.listenerId, getServerInUse());
        if (newAddListenerOperation.execute().booleanValue()) {
            cleanupDispatcher();
            this.dispatcher = new EventDispatcher(newAddListenerOperation.getDedicatedTransport());
            if (trace) {
                log.tracef("Add listener connection for counter '%s'. Server used=%s", str, getServerInUse());
            }
            this.executor.execute(this.dispatcher);
        }
    }

    private SocketAddress getServerInUse() {
        if (this.dispatcher == null) {
            return null;
        }
        return this.dispatcher.transport.getRemoteSocketAddress();
    }

    private void cleanupDispatcher() {
        if (this.dispatcher != null) {
            this.dispatcher.run = false;
            this.dispatcher.transport.release();
            this.dispatcher = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getThreadName() {
        return "Counter-Listener-" + Util.toHexString(this.listenerId, 8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void findAnotherServer() {
        cleanupDispatcher();
        try {
            if (debug) {
                log.debug("Connection reset by peer, finding another server for counter listeners");
            }
            this.registeredCounters.forEach(this::createConnection);
        } catch (TransportException e) {
            if (debug) {
                log.debug("Unable to find another server, so ignore connection reset");
            }
            try {
                this.factory.geTransportFactory().addDisconnectedListener(this::findAnotherServer);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
