/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.jcr.bus;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.modeshape.common.annotation.ThreadSafe;
import org.modeshape.common.i18n.I18nResource;
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.CheckArg;
import org.modeshape.jcr.bus.BusI18n;
import org.modeshape.jcr.bus.ChangeBus;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.cache.change.ChangeSetListener;
import org.modeshape.jcr.clustering.ClusteringService;
import org.modeshape.jcr.clustering.MessageConsumer;

@ThreadSafe
public final class ClusteredChangeBus
extends MessageConsumer<ChangeSet>
implements ChangeBus {
    private static final Logger LOGGER = Logger.getLogger(ClusteredChangeBus.class);
    private final ChangeBus delegate;
    private final ClusteringService clusteringService;
    private final Map<String, CountDownLatch> loopbackLatches = new HashMap<String, CountDownLatch>();

    public ClusteredChangeBus(ChangeBus delegate, ClusteringService clusteringService) {
        super(ChangeSet.class);
        CheckArg.isNotNull((Object)delegate, (String)"delegate");
        CheckArg.isNotNull((Object)clusteringService, (String)"clusteringService");
        this.delegate = delegate;
        this.clusteringService = clusteringService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void consume(ChangeSet changes) {
        block6: {
            if (this.hasObservers()) {
                String uuid;
                try {
                    CountDownLatch latch;
                    this.delegate.notify(changes);
                    this.logReceivedOperation(changes);
                    if (this.loopbackLatches.isEmpty() || (latch = this.loopbackLatches.remove(uuid = changes.getUUID())) == null) break block6;
                    latch.countDown();
                }
                catch (Throwable throwable) {
                    String uuid2;
                    CountDownLatch latch;
                    if (!this.loopbackLatches.isEmpty() && (latch = this.loopbackLatches.remove(uuid2 = changes.getUUID())) != null) {
                        latch.countDown();
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace("Loopback changeset {0} received back on {1}", new Object[]{uuid2, this.clusteringService.toString()});
                        }
                    }
                    throw throwable;
                }
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Loopback changeset {0} received back on {1}", new Object[]{uuid, this.clusteringService.toString()});
                }
            }
        }
    }

    @Override
    public synchronized void start() throws Exception {
        if (!this.clusteringService.isOpen()) {
            throw new IllegalStateException("The clustering service has not been started");
        }
        this.delegate.start();
        this.clusteringService.addConsumer(this);
    }

    @Override
    public boolean hasObservers() {
        return this.delegate.hasObservers();
    }

    @Override
    public synchronized void shutdown() {
        this.delegate.shutdown();
        this.loopbackLatches.clear();
    }

    @Override
    public void notify(ChangeSet changeSet) {
        if (changeSet == null) {
            return;
        }
        if (!this.clusteringService.multipleMembersInCluster()) {
            this.consume(changeSet);
            return;
        }
        this.logSendOperation(changeSet);
        this.clusteringService.sendMessage(changeSet);
        CountDownLatch waitForLoopback = new CountDownLatch(1);
        String uuid = changeSet.getUUID();
        this.loopbackLatches.put(uuid, waitForLoopback);
        try {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Waiting for loopback changeset {0} to be received back on {1}", new Object[]{uuid, this.clusteringService.toString()});
            }
            if (!waitForLoopback.await(15L, TimeUnit.SECONDS)) {
                this.loopbackLatches.remove(uuid);
                LOGGER.error((I18nResource)BusI18n.loopbackMessageNotReceived, new Object[]{uuid, this.clusteringService.toString()});
            }
        }
        catch (InterruptedException e) {
            this.loopbackLatches.remove(uuid);
            Thread.interrupted();
            LOGGER.error((Throwable)e, (I18nResource)BusI18n.loopbackMessageNotReceived, new Object[]{uuid, this.clusteringService.toString()});
        }
    }

    protected final void logSendOperation(ChangeSet changeSet) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Sending to cluster '{0}' {1} changes on workspace {2} made by {3} from process '{4}' at {5}", new Object[]{this.clusteringService.toString(), changeSet.size(), changeSet.getWorkspaceName(), changeSet.getUserData(), changeSet.getProcessKey(), changeSet.getTimestamp()});
        }
    }

    protected final void logReceivedOperation(ChangeSet changeSet) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Received from cluster '{0}' {1} changes on workspace {2} made by {3} from process '{4}' at {5}", new Object[]{this.clusteringService.toString(), changeSet.size(), changeSet.getWorkspaceName(), changeSet.getUserId(), changeSet.getProcessKey(), changeSet.getTimestamp()});
        }
    }

    @Override
    public boolean register(ChangeSetListener listener) {
        return this.delegate.register(listener);
    }

    @Override
    public boolean registerInThread(ChangeSetListener listener) {
        return this.delegate.registerInThread(listener);
    }

    @Override
    public boolean unregister(ChangeSetListener listener) {
        return this.delegate.unregister(listener);
    }
}

