/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.jcr.journal;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.infinispan.schematic.document.ThreadSafe;
import org.joda.time.DateTime;
import org.modeshape.common.i18n.I18nResource;
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.CheckArg;
import org.modeshape.jcr.JcrI18n;
import org.modeshape.jcr.cache.NodeKey;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.clustering.ClusteringService;
import org.modeshape.jcr.clustering.MessageConsumer;
import org.modeshape.jcr.journal.ChangeJournal;
import org.modeshape.jcr.journal.JournalRecord;
import org.modeshape.jcr.journal.LocalJournal;

@ThreadSafe
public class ClusteredJournal
extends MessageConsumer<DeltaMessage>
implements ChangeJournal {
    private static final Logger LOGGER = Logger.getLogger(ClusteredJournal.class);
    private static final int MAX_MINUTES_TO_WAIT_FOR_RECONCILIATION = 2;
    private final LocalJournal localJournal;
    private final ClusteringService clusteringService;
    private CountDownLatch reconciliationLatch = null;

    public ClusteredJournal(LocalJournal localJournal, ClusteringService clusteringService) {
        super(DeltaMessage.class);
        CheckArg.isNotNull((Object)localJournal, (String)"localJournal");
        CheckArg.isNotNull((Object)clusteringService, (String)"clusteringService");
        this.clusteringService = clusteringService;
        this.localJournal = localJournal.withSearchTimeDelta(clusteringService.getMaxAllowedClockDelayMillis());
    }

    @Override
    public void notify(ChangeSet changeSet) {
        this.localJournal.notify(changeSet);
    }

    @Override
    public void start() throws Exception {
        if (!this.clusteringService.isOpen()) {
            throw new IllegalStateException("The clustering service has not been started");
        }
        this.localJournal.start();
        this.clusteringService.addConsumer(this);
        if (!this.clusteringService.multipleMembersInCluster()) {
            return;
        }
        int numberOfRequiredResponses = 1;
        this.reconciliationLatch = new CountDownLatch(numberOfRequiredResponses);
        JournalRecord lastRecord = this.lastRecord();
        Long lastChangeSetTimeMillis = lastRecord != null ? Long.valueOf(lastRecord.getChangeTimeMillis()) : null;
        DeltaMessage request = DeltaMessage.request(this.journalId(), lastChangeSetTimeMillis);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Sending delta request: {0}", new Object[]{request});
        }
        this.clusteringService.sendMessage(request);
        this.waitForReconciliationToComplete();
    }

    private void waitForReconciliationToComplete() throws InterruptedException {
        block6: {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("{0} waiting until it receives {1} responses from cluster {2}", new Object[]{this.journalId(), this.reconciliationLatch.getCount(), this.clusterName()});
                }
                if (!this.reconciliationLatch.await(2L, TimeUnit.MINUTES)) {
                    LOGGER.warn((I18nResource)JcrI18n.journalHasNotCompletedReconciliation, new Object[]{this.journalId(), this.clusterName(), 2});
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("{0} successfully completed reconciliation", new Object[]{this.journalId()});
                }
            }
            catch (InterruptedException e) {
                LOGGER.warn((I18nResource)JcrI18n.journalHasNotCompletedReconciliation, new Object[]{this.journalId(), this.clusterName(), 2});
                if (!Thread.interrupted()) break block6;
                throw e;
            }
        }
    }

    @Override
    public void shutdown() {
        this.localJournal.shutdown();
    }

    @Override
    public void removeOldRecords() {
        this.localJournal.removeOldRecords();
    }

    @Override
    public ChangeJournal.Records allRecords(boolean descendingOrder) {
        return this.localJournal.allRecords(descendingOrder);
    }

    @Override
    public JournalRecord lastRecord() {
        return this.localJournal.lastRecord();
    }

    @Override
    public ChangeJournal.Records recordsNewerThan(DateTime changeSetTime, boolean inclusive, boolean descendingOrder) {
        return this.localJournal.recordsNewerThan(changeSetTime, inclusive, descendingOrder);
    }

    @Override
    public Iterator<NodeKey> changedNodesSince(long timestamp) {
        return this.localJournal.changedNodesSince(timestamp);
    }

    @Override
    public void addRecords(JournalRecord ... records) {
        this.localJournal.addRecords(records);
    }

    @Override
    public String journalId() {
        return this.localJournal.journalId();
    }

    @Override
    public boolean started() {
        return this.localJournal.started() && this.reconciliationCompleted();
    }

    @Override
    public void consume(DeltaMessage message) {
        if (!this.localJournal.started()) {
            return;
        }
        if (message.isResponse()) {
            this.processDeltaResponse(message);
        } else {
            this.processDeltaRequest(message);
        }
    }

    protected boolean reconciliationCompleted() {
        return this.reconciliationLatch == null || this.reconciliationLatch.getCount() == 0L;
    }

    private void processDeltaRequest(DeltaMessage request) {
        String journalId;
        String requestorId = request.getRequestorId();
        if (requestorId.equals(journalId = this.journalId())) {
            LOGGER.debug("{0} discarding delta request from itself", new Object[]{journalId});
            return;
        }
        if (!this.reconciliationCompleted()) {
            LOGGER.debug("{0} is still reconciling, cannot send delta to journal {1}", new Object[]{journalId, requestorId});
            return;
        }
        Long requestorLastChangeSetTime = request.getRequestorLastChangeSetTime();
        DateTime lastChangeSetTime = requestorLastChangeSetTime != null ? new DateTime((Object)requestorLastChangeSetTime) : null;
        ChangeJournal.Records delta = this.recordsNewerThan(lastChangeSetTime, false, false);
        ArrayList<JournalRecord> deltaList = new ArrayList<JournalRecord>(delta.size());
        for (JournalRecord record : delta) {
            deltaList.add(record);
        }
        DeltaMessage response = DeltaMessage.response(request, journalId, deltaList);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Sending delta response {0} to journal {1}", new Object[]{response, requestorId});
        }
        this.clusteringService.sendMessage(response);
    }

    private void processDeltaResponse(DeltaMessage message) {
        List<JournalRecord> records;
        String journalId = this.journalId();
        if (!journalId.equals(message.getRequestorId())) {
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("{0} received delta response {1}", new Object[]{journalId, message});
        }
        if (!(records = message.getRespondentRecords()).isEmpty()) {
            this.localJournal.addRecords(records.toArray(new JournalRecord[0]));
        }
        this.reconciliationLatch.countDown();
    }

    protected ClusteringService clusteringService() {
        return this.clusteringService;
    }

    protected LocalJournal localJournal() {
        return this.localJournal;
    }

    protected String clusterName() {
        return this.clusteringService().clusterName();
    }

    protected static class DeltaMessage
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final String requestorId;
        private final Long requestorLastChangeSetTime;
        private final String respondentId;
        private final List<JournalRecord> respondentRecords;

        private DeltaMessage(String requestorId, Long requestorLastChangeSetTime, String respondentId, List<JournalRecord> respondentRecords) {
            this.requestorId = requestorId;
            this.requestorLastChangeSetTime = requestorLastChangeSetTime;
            this.respondentId = respondentId;
            this.respondentRecords = respondentRecords;
        }

        protected boolean isResponse() {
            return this.respondentId != null;
        }

        protected String getRequestorId() {
            return this.requestorId;
        }

        protected Long getRequestorLastChangeSetTime() {
            return this.requestorLastChangeSetTime;
        }

        protected String getRespondentId() {
            return this.respondentId;
        }

        protected List<JournalRecord> getRespondentRecords() {
            return this.respondentRecords;
        }

        protected static DeltaMessage request(String requestorId, Long requestorLastChangeSetTime) {
            return new DeltaMessage(requestorId, requestorLastChangeSetTime, null, null);
        }

        protected static DeltaMessage response(DeltaMessage request, String repondentId, List<JournalRecord> respondentRecords) {
            return new DeltaMessage(request.requestorId, request.requestorLastChangeSetTime, repondentId, respondentRecords);
        }

        public String toString() {
            StringBuilder sb = null;
            if (this.isResponse()) {
                sb = new StringBuilder("response[");
                sb.append("requestorId='").append(this.requestorId).append('\'');
                sb.append(", requestorLastChangeSetTime=").append(this.requestorLastChangeSetTime);
                sb.append(", repondentId='").append(this.respondentId).append('\'');
                sb.append(", respondentRecords=").append(this.respondentRecords);
            } else {
                sb = new StringBuilder("request[");
                sb.append("requestorId='").append(this.requestorId).append('\'');
                sb.append(", requestorLastChangeSetTime=").append(this.requestorLastChangeSetTime);
            }
            sb.append(']');
            return sb.toString();
        }
    }
}

