package org.modeshape.jcr.bus;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.modeshape.common.annotation.ThreadSafe;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.cache.change.ChangeSetListener;

@ThreadSafe
/* loaded from: input_file:WEB-INF/lib/modeshape-jcr-3.0.0.Final.jar:org/modeshape/jcr/bus/RepositoryChangeBus.class */
public final class RepositoryChangeBus implements ChangeBus {
    private static final String NULL_WORKSPACE_NAME = "null_workspace_name";
    private final ExecutorService executor;
    private final ConcurrentHashMap<String, ConcurrentHashMap<ChangeSetListener, BlockingQueue<ChangeSet>>> workspaceListenerQueues;
    private final Set<Future<?>> workers;
    private final Set<ChangeSetListener> listeners;
    private final ReadWriteLock listenersLock;
    protected volatile boolean shutdown;
    private final String systemWorkspaceName;

    /* loaded from: input_file:WEB-INF/lib/modeshape-jcr-3.0.0.Final.jar:org/modeshape/jcr/bus/RepositoryChangeBus$ChangeSetDispatcher.class */
    private class ChangeSetDispatcher implements Callable<Void> {
        private static final int DEFAULT_POLL_TIMEOUT = 3;
        private ChangeSetListener listener;
        private BlockingQueue<ChangeSet> changeSetQueue;

        ChangeSetDispatcher(ChangeSetListener changeSetListener, BlockingQueue<ChangeSet> blockingQueue) {
            this.listener = changeSetListener;
            this.changeSetQueue = blockingQueue;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            while (!RepositoryChangeBus.this.shutdown) {
                try {
                    ChangeSet poll = this.changeSetQueue.poll(3L, TimeUnit.SECONDS);
                    if (poll != null) {
                        this.listener.notify(poll);
                    }
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            shutdown();
            return null;
        }

        private void shutdown() {
            while (!this.changeSetQueue.isEmpty()) {
                this.listener.notify(this.changeSetQueue.poll());
            }
            this.listener = null;
            this.changeSetQueue = null;
        }
    }

    public RepositoryChangeBus(ExecutorService executorService, String str, boolean z) {
        this.listenersLock = new ReentrantReadWriteLock(true);
        this.systemWorkspaceName = str;
        this.workers = new HashSet();
        this.workspaceListenerQueues = new ConcurrentHashMap<>();
        this.executor = executorService;
        this.listeners = Collections.synchronizedSet(new LinkedHashSet());
        this.shutdown = false;
    }

    RepositoryChangeBus(ExecutorService executorService) {
        this(executorService, null, false);
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public void start() {
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public void shutdown() {
        this.shutdown = true;
        try {
            this.listenersLock.writeLock().lock();
            this.listeners.clear();
            this.workspaceListenerQueues.clear();
            stopWork();
            this.listenersLock.writeLock().unlock();
        } catch (Throwable th) {
            this.listenersLock.writeLock().unlock();
            throw th;
        }
    }

    private void stopWork() {
        this.executor.shutdown();
        for (Future<?> future : this.workers) {
            if (!future.isDone()) {
                future.cancel(true);
            }
        }
        this.workers.clear();
    }

    @Override // org.modeshape.jcr.cache.change.Observable
    public boolean register(ChangeSetListener changeSetListener) {
        if (changeSetListener == null) {
            return false;
        }
        try {
            this.listenersLock.writeLock().lock();
            boolean add = this.listeners.add(changeSetListener);
            this.listenersLock.writeLock().unlock();
            return add;
        } catch (Throwable th) {
            this.listenersLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.modeshape.jcr.cache.change.Observable
    public boolean unregister(ChangeSetListener changeSetListener) {
        if (changeSetListener == null) {
            return false;
        }
        try {
            this.listenersLock.writeLock().lock();
            boolean remove = this.listeners.remove(changeSetListener);
            this.listenersLock.writeLock().unlock();
            return remove;
        } catch (Throwable th) {
            this.listenersLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.modeshape.jcr.cache.change.ChangeSetListener
    public void notify(ChangeSet changeSet) {
        if (changeSet == null || !hasObservers()) {
            return;
        }
        if (this.shutdown) {
            throw new IllegalStateException("Change bus has been already shut down, should not be receiving events");
        }
        String workspaceName = changeSet.getWorkspaceName() != null ? changeSet.getWorkspaceName() : NULL_WORKSPACE_NAME;
        if (notifiedSystemWorkspaceListenersInline(changeSet, workspaceName)) {
            return;
        }
        ConcurrentHashMap<ChangeSetListener, BlockingQueue<ChangeSet>> concurrentHashMap = this.workspaceListenerQueues.get(workspaceName);
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            ConcurrentHashMap<ChangeSetListener, BlockingQueue<ChangeSet>> putIfAbsent = this.workspaceListenerQueues.putIfAbsent(workspaceName, concurrentHashMap);
            if (putIfAbsent != null) {
                concurrentHashMap = putIfAbsent;
            }
        }
        try {
            this.listenersLock.readLock().lock();
            for (ChangeSetListener changeSetListener : this.listeners) {
                BlockingQueue<ChangeSet> blockingQueue = concurrentHashMap.get(changeSetListener);
                if (blockingQueue == null) {
                    BlockingQueue<ChangeSet> linkedBlockingQueue = new LinkedBlockingQueue();
                    linkedBlockingQueue.add(changeSet);
                    BlockingQueue<ChangeSet> putIfAbsent2 = concurrentHashMap.putIfAbsent(changeSetListener, linkedBlockingQueue);
                    if (putIfAbsent2 != null) {
                        linkedBlockingQueue = putIfAbsent2;
                    }
                    this.workers.add(this.executor.submit(new ChangeSetDispatcher(changeSetListener, linkedBlockingQueue)));
                } else {
                    blockingQueue.add(changeSet);
                }
            }
        } finally {
            this.listenersLock.readLock().unlock();
        }
    }

    private boolean notifiedSystemWorkspaceListenersInline(ChangeSet changeSet, String str) {
        if (!str.equalsIgnoreCase(this.systemWorkspaceName)) {
            return false;
        }
        this.listenersLock.readLock().lock();
        try {
            Iterator<ChangeSetListener> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().notify(changeSet);
            }
            return true;
        } finally {
            this.listenersLock.readLock().unlock();
        }
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public boolean hasObservers() {
        try {
            this.listenersLock.readLock().lock();
            return !this.listeners.isEmpty();
        } finally {
            this.listenersLock.readLock().unlock();
        }
    }
}
