package org.modeshape.jcr.bus;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.modeshape.common.collection.ring.RingBuffer;
import org.modeshape.common.logging.Logger;
import org.modeshape.jcr.RepositoryStatistics;
import org.modeshape.jcr.api.monitor.ValueMetric;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.cache.change.ChangeSetListener;

/* loaded from: input_file:org/modeshape/jcr/bus/RepositoryChangeBus.class */
public final class RepositoryChangeBus implements ChangeBus {
    public static final int DEFAULT_RING_BUFFER_SIZE = 1024;
    protected static final Logger LOGGER = Logger.getLogger((Class<?>) RepositoryChangeBus.class);
    private final AtomicBoolean shutdown;
    private final Lock registrationLock;
    private final Set<ChangeSetListener> inThreadListeners;
    private final RingBuffer<ChangeSet, ChangeSetListener> ringBuffer;
    private final RepositoryStatistics statistics;

    /* loaded from: input_file:org/modeshape/jcr/bus/RepositoryChangeBus$ChangeSetListenerConsumerAdapter.class */
    protected class ChangeSetListenerConsumerAdapter implements RingBuffer.ConsumerAdapter<ChangeSet, ChangeSetListener> {
        protected ChangeSetListenerConsumerAdapter() {
        }

        @Override // org.modeshape.common.collection.ring.RingBuffer.ConsumerAdapter
        public boolean consume(ChangeSetListener changeSetListener, ChangeSet changeSet, long j, long j2) {
            changeSetListener.notify(changeSet);
            return true;
        }

        @Override // org.modeshape.common.collection.ring.RingBuffer.ConsumerAdapter
        public void close(ChangeSetListener changeSetListener) {
        }

        @Override // org.modeshape.common.collection.ring.RingBuffer.ConsumerAdapter
        public void handleException(ChangeSetListener changeSetListener, Throwable th, ChangeSet changeSet, long j, long j2) {
            RepositoryChangeBus.LOGGER.error(th, BusI18n.errorProcessingEvent, changeSet.toString(), Long.valueOf(j));
        }
    }

    public RepositoryChangeBus(String str, ExecutorService executorService) {
        this(str, executorService, null, 1024);
    }

    public RepositoryChangeBus(String str, ExecutorService executorService, RepositoryStatistics repositoryStatistics, int i) {
        this.shutdown = new AtomicBoolean(true);
        this.registrationLock = new ReentrantLock();
        this.inThreadListeners = new CopyOnWriteArraySet();
        this.ringBuffer = RepositoryRingBufferBuilder.withMultipleProducers(executorService, new ChangeSetListenerConsumerAdapter(), repositoryStatistics).ofSize(i).named(str).garbageCollect(true).build();
        this.statistics = repositoryStatistics;
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public boolean hasObservers() {
        if (this.shutdown.get()) {
            return false;
        }
        return !this.inThreadListeners.isEmpty() || this.ringBuffer.hasConsumers();
    }

    @Override // org.modeshape.jcr.cache.change.Observable
    public boolean register(ChangeSetListener changeSetListener) {
        if (changeSetListener == null || this.shutdown.get()) {
            return false;
        }
        try {
            this.registrationLock.lock();
            boolean addConsumer = this.ringBuffer.addConsumer(changeSetListener);
            if (addConsumer && this.statistics != null) {
                this.statistics.increment(ValueMetric.LISTENER_COUNT);
            }
            return addConsumer;
        } finally {
            this.registrationLock.unlock();
        }
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public boolean registerInThread(ChangeSetListener changeSetListener) {
        if (changeSetListener == null || this.shutdown.get()) {
            return false;
        }
        try {
            this.registrationLock.lock();
            boolean add = this.inThreadListeners.add(changeSetListener);
            if (add && this.statistics != null) {
                this.statistics.increment(ValueMetric.LISTENER_COUNT);
            }
            return add;
        } finally {
            this.registrationLock.unlock();
        }
    }

    @Override // org.modeshape.jcr.cache.change.Observable
    public boolean unregister(ChangeSetListener changeSetListener) {
        if (changeSetListener == null || this.shutdown.get()) {
            return false;
        }
        try {
            this.registrationLock.lock();
            boolean z = this.ringBuffer.remove(changeSetListener) || this.inThreadListeners.remove(changeSetListener);
            if (z && this.statistics != null) {
                this.statistics.decrement(ValueMetric.LISTENER_COUNT);
            }
            return z;
        } finally {
            this.registrationLock.unlock();
        }
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public synchronized void start() throws Exception {
        this.shutdown.set(false);
    }

    @Override // org.modeshape.jcr.bus.ChangeBus
    public synchronized void shutdown() {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        try {
            this.registrationLock.lock();
            this.inThreadListeners.clear();
            this.ringBuffer.shutdown();
            if (this.statistics != null) {
                this.statistics.set(ValueMetric.LISTENER_COUNT, 0L);
            }
        } finally {
            this.registrationLock.unlock();
        }
    }

    @Override // org.modeshape.jcr.cache.change.ChangeSetListener
    public void notify(ChangeSet changeSet) {
        if (changeSet == null || !hasObservers()) {
            return;
        }
        if (this.shutdown.get()) {
            throw new IllegalStateException("Change bus has been already shut down, should not have any more observers");
        }
        this.ringBuffer.add((RingBuffer<ChangeSet, ChangeSetListener>) changeSet);
        if (this.statistics != null) {
            this.statistics.increment(ValueMetric.EVENT_COUNT);
        }
        Iterator<ChangeSetListener> it = this.inThreadListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().notify(changeSet);
            } catch (RuntimeException e) {
                if (!this.shutdown.get()) {
                    throw e;
                }
            }
        }
    }
}
