package org.infinispan.client.hotrod.counter.impl;

import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.infinispan.client.hotrod.counter.operation.AddListenerOperation;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.event.impl.CounterEventDispatcher;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.util.concurrent.NonReentrantLock;
import org.infinispan.counter.api.CounterListener;
import org.infinispan.counter.api.Handle;

/* loaded from: input_file:BOOT-INF/lib/infinispan-client-hotrod-9.4.17.Final.jar:org/infinispan/client/hotrod/counter/impl/NotificationManager.class */
public class NotificationManager {
    private static final Log log = (Log) LogFactory.getLog(NotificationManager.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final CompletableFuture<Short> NO_ERROR_FUTURE = CompletableFuture.completedFuture((short) 0);
    private final ClientListenerNotifier notifier;
    private final CounterOperationFactory factory;
    private volatile CounterEventDispatcher dispatcher;
    private final ConcurrentMap<String, List<Consumer<HotRodCounterEvent>>> clientListeners = new ConcurrentHashMap();
    private final Lock lock = new NonReentrantLock();
    private final byte[] listenerId = new byte[16];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/infinispan-client-hotrod-9.4.17.Final.jar:org/infinispan/client/hotrod/counter/impl/NotificationManager$HandleImpl.class */
    public class HandleImpl<T extends CounterListener> implements Handle<T>, Consumer<HotRodCounterEvent> {
        private final T listener;
        private final String counterName;

        private HandleImpl(String str, T t) {
            this.counterName = str;
            this.listener = t;
        }

        @Override // org.infinispan.counter.api.Handle
        public T getCounterListener() {
            return this.listener;
        }

        @Override // org.infinispan.counter.api.Handle
        public void remove() {
            NotificationManager.this.removeListener(this.counterName, this);
        }

        @Override // java.util.function.Consumer
        public void accept(HotRodCounterEvent hotRodCounterEvent) {
            try {
                this.listener.onUpdate(hotRodCounterEvent);
            } catch (Throwable th) {
                NotificationManager.log.debug("Exception in user listener", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NotificationManager(ClientListenerNotifier clientListenerNotifier, CounterOperationFactory counterOperationFactory) {
        this.notifier = clientListenerNotifier;
        this.factory = counterOperationFactory;
        ThreadLocalRandom.current().nextBytes(this.listenerId);
    }

    public <T extends CounterListener> Handle<T> addListener(String str, T t) {
        if (trace) {
            log.tracef("Add listener for counter '%s'", str);
        }
        CounterEventDispatcher counterEventDispatcher = this.dispatcher;
        if (counterEventDispatcher != null) {
            return registerListener(str, t, counterEventDispatcher.address());
        }
        log.debugf("ALock %s", this.lock);
        this.lock.lock();
        try {
            CounterEventDispatcher counterEventDispatcher2 = this.dispatcher;
            Handle<T> registerListener = registerListener(str, t, counterEventDispatcher2 == null ? null : counterEventDispatcher2.address());
            this.lock.unlock();
            log.debugf("AUnLock %s", this.lock);
            return registerListener;
        } catch (Throwable th) {
            this.lock.unlock();
            log.debugf("AUnLock %s", this.lock);
            throw th;
        }
    }

    private <T extends CounterListener> Handle<T> registerListener(String str, T t, SocketAddress socketAddress) {
        HandleImpl handleImpl = new HandleImpl(str, t);
        this.clientListeners.computeIfAbsent(str, str2 -> {
            AddListenerOperation newAddListenerOperation = this.factory.newAddListenerOperation(str, this.listenerId, socketAddress);
            if (((Boolean) Util.await(newAddListenerOperation.execute())).booleanValue() && socketAddress == null) {
                byte[] bArr = this.listenerId;
                ConcurrentMap<String, List<Consumer<HotRodCounterEvent>>> concurrentMap = this.clientListeners;
                SocketAddress remoteAddress = newAddListenerOperation.getChannel().remoteAddress();
                Supplier supplier = this::failover;
                Objects.requireNonNull(newAddListenerOperation);
                this.dispatcher = new CounterEventDispatcher(bArr, concurrentMap, remoteAddress, supplier, newAddListenerOperation::cleanup);
                this.notifier.addDispatcher(this.dispatcher);
                this.notifier.startClientListener(this.listenerId);
            }
            return new CopyOnWriteArrayList();
        }).add(handleImpl);
        return handleImpl;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeListener(String str, HandleImpl<?> handleImpl) {
        if (trace) {
            log.tracef("Remove listener for counter '%s'", str);
        }
        this.clientListeners.computeIfPresent(str, (str2, list) -> {
            list.remove(handleImpl);
            if (!list.isEmpty()) {
                return list;
            }
            if (this.dispatcher == null || ((Boolean) Util.await(this.factory.newRemoveListenerOperation(str, this.listenerId, this.dispatcher.address()).execute())).booleanValue()) {
                return null;
            }
            log.debugf("Failed to remove counter listener %s on server side", str);
            return null;
        });
    }

    private CompletableFuture<Short> failover() {
        this.dispatcher = null;
        Iterator<String> it = this.clientListeners.keySet().iterator();
        if (!it.hasNext()) {
            return NO_ERROR_FUTURE;
        }
        CompletableFuture<Short> completableFuture = new CompletableFuture<>();
        String next = it.next();
        AddListenerOperation newAddListenerOperation = this.factory.newAddListenerOperation(next, this.listenerId, null);
        log.debugf("Lock %s", this.lock);
        this.lock.lock();
        if (this.dispatcher == null) {
            newAddListenerOperation.execute().whenComplete((bool, th) -> {
                if (th != null) {
                    this.lock.unlock();
                    log.debugf(th, "Failed to failover counter listener %s", next);
                    completableFuture.completeExceptionally(th);
                    return;
                }
                AtomicInteger atomicInteger = new AtomicInteger(1);
                try {
                    try {
                        if (bool.booleanValue()) {
                            log.debugf("Creating new counter event dispatcher on %s", newAddListenerOperation.getChannel());
                            byte[] bArr = this.listenerId;
                            ConcurrentMap<String, List<Consumer<HotRodCounterEvent>>> concurrentMap = this.clientListeners;
                            SocketAddress remoteAddress = newAddListenerOperation.getChannel().remoteAddress();
                            Supplier supplier = this::failover;
                            Objects.requireNonNull(newAddListenerOperation);
                            this.dispatcher = new CounterEventDispatcher(bArr, concurrentMap, remoteAddress, supplier, newAddListenerOperation::cleanup);
                            this.notifier.addDispatcher(this.dispatcher);
                            this.notifier.startClientListener(this.listenerId);
                        }
                        SocketAddress address = this.dispatcher.address();
                        this.lock.unlock();
                        log.debugf("UnLock %s", this.lock);
                        while (it.hasNext()) {
                            String str = (String) it.next();
                            this.factory.newAddListenerOperation(str, this.listenerId, address).execute().whenComplete((bool, th) -> {
                                if (th != null) {
                                    log.debugf(th, "Failed to failover counter listener %s", str);
                                    completableFuture.completeExceptionally(th);
                                    return;
                                }
                                if (bool.booleanValue()) {
                                    completableFuture.completeExceptionally(new IllegalStateException("Unexpected to use another channel for the same counter"));
                                }
                                if (atomicInteger.decrementAndGet() == 0) {
                                    completableFuture.complete((short) 0);
                                }
                            });
                        }
                        if (atomicInteger.decrementAndGet() == 0) {
                            completableFuture.complete((short) 0);
                        }
                    } catch (Throwable th2) {
                        completableFuture.completeExceptionally(th2);
                        this.lock.unlock();
                        log.debugf("UnLock %s", this.lock);
                    }
                } catch (Throwable th3) {
                    this.lock.unlock();
                    log.debugf("UnLock %s", this.lock);
                    throw th3;
                }
            });
            return completableFuture;
        }
        this.lock.unlock();
        log.debugf("UnLock %s", this.lock);
        return NO_ERROR_FUTURE;
    }

    public void stop() {
        log.debugf("Stopping %s (%s)", this, this.lock);
        this.lock.lock();
        try {
            Util.await(CompletableFuture.allOf((CompletableFuture[]) this.clientListeners.keySet().stream().map(str -> {
                return this.factory.newRemoveListenerOperation(str, this.listenerId, this.dispatcher.address()).execute();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })));
            this.clientListeners.clear();
        } finally {
            this.lock.unlock();
        }
    }
}
