package org.modeshape.jcr.journal;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.schematic.document.ThreadSafe;
import org.joda.time.DateTime;
import org.modeshape.common.SystemFailureException;
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.CheckArg;
import org.modeshape.jcr.JcrI18n;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.clustering.ClusteringService;
import org.modeshape.jcr.clustering.MessageConsumer;
import org.modeshape.jcr.journal.ChangeJournal;
import org.modeshape.jcr.journal.DeltaMessage;

@ThreadSafe
/* loaded from: input_file:WEB-INF/lib/modeshape-jcr-4.0.0.Final.jar:org/modeshape/jcr/journal/ClusteredJournal.class */
public class ClusteredJournal extends MessageConsumer<DeltaMessage> implements ChangeJournal {
    private static final Logger LOGGER = Logger.getLogger((Class<?>) ClusteredJournal.class);
    private static final int MAX_MINUTES_TO_WAIT_FOR_RECONCILIATION = 5;
    private final LocalJournal localJournal;
    private final ClusteringService clusteringService;
    private final AtomicInteger expectedNumberOfDeltaResponses;
    private CountDownLatch reconciliationLatch;

    public ClusteredJournal(LocalJournal localJournal, ClusteringService clusteringService) {
        super(DeltaMessage.class);
        this.reconciliationLatch = null;
        CheckArg.isNotNull(localJournal, "localJournal");
        CheckArg.isNotNull(clusteringService, "clusteringService");
        this.clusteringService = clusteringService;
        this.localJournal = localJournal.withSearchTimeDelta(clusteringService.getMaxAllowedClockDelayMillis());
        this.expectedNumberOfDeltaResponses = new AtomicInteger(0);
    }

    @Override // org.modeshape.jcr.cache.change.ChangeSetListener
    public void notify(ChangeSet changeSet) {
        this.localJournal.notify(changeSet);
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public void start() throws Exception {
        if (!this.clusteringService.isOpen()) {
            throw new IllegalStateException("The clustering service has not been started");
        }
        this.localJournal.start();
        this.clusteringService.addConsumer(this);
        if (!this.clusteringService.multipleMembersInCluster()) {
            this.reconciliationLatch = new CountDownLatch(0);
            return;
        }
        int membersInCluster = this.clusteringService.membersInCluster() - 1;
        this.reconciliationLatch = new CountDownLatch(membersInCluster);
        this.expectedNumberOfDeltaResponses.compareAndSet(0, membersInCluster);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Sending delta request from journal {0} as part of cluster {1}", journalId(), clusterName());
        }
        JournalRecord lastRecord = lastRecord();
        this.clusteringService.sendMessage(DeltaMessage.request(journalId(), lastRecord != null ? new DateTime(lastRecord.getChangeTimeMillis()) : null));
        waitForReconciliationToComplete();
    }

    private void waitForReconciliationToComplete() {
        try {
            this.reconciliationLatch.await(5L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
        if (!deltaReconciliationCompleted()) {
            throw new SystemFailureException(JcrI18n.journalHasNotCompletedReconciliation.text(journalId(), clusterName(), 5));
        }
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public void shutdown() {
        this.localJournal.shutdown();
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public void removeOldRecords() {
        this.localJournal.removeOldRecords();
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public ChangeJournal.Records allRecords(boolean z) {
        return this.localJournal.allRecords(z);
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public JournalRecord lastRecord() {
        return this.localJournal.lastRecord();
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public ChangeJournal.Records recordsNewerThan(DateTime dateTime, boolean z, boolean z2) {
        return this.localJournal.recordsNewerThan(dateTime, z, z2);
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public void addRecords(JournalRecord... journalRecordArr) {
        this.localJournal.addRecords(journalRecordArr);
    }

    @Override // org.modeshape.jcr.journal.ChangeJournal
    public String journalId() {
        return this.localJournal.journalId();
    }

    @Override // org.modeshape.jcr.clustering.MessageConsumer
    public void consume(DeltaMessage deltaMessage) {
        if (deltaMessage instanceof DeltaMessage.DeltaRequest) {
            processDeltaRequest((DeltaMessage.DeltaRequest) deltaMessage);
        } else if (deltaMessage instanceof DeltaMessage.DeltaResponse) {
            processDeltaResponse((DeltaMessage.DeltaResponse) deltaMessage);
        } else if (deltaMessage instanceof DeltaMessage.DeltaStillReconciling) {
            processStillReconciling((DeltaMessage.DeltaStillReconciling) deltaMessage);
        }
    }

    private boolean deltaReconciliationCompleted() {
        return this.reconciliationLatch.getCount() == 0;
    }

    private void processStillReconciling(DeltaMessage.DeltaStillReconciling deltaStillReconciling) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Journal {0} received response from journal {1}: still reconciling", journalId(), deltaStillReconciling.getJournalId());
        }
        this.reconciliationLatch.countDown();
    }

    private void processDeltaRequest(DeltaMessage.DeltaRequest deltaRequest) {
        if (deltaRequest.getJournalId().equals(journalId())) {
            LOGGER.debug("Journal {0} discarding delta request from itself", journalId());
            return;
        }
        if (!deltaReconciliationCompleted()) {
            LOGGER.debug("Journal {0} is still reconciling, cannot send delta to journal {1}", journalId(), deltaRequest.getJournalId());
            this.clusteringService.sendMessage(DeltaMessage.stillReconciling(journalId()));
            return;
        }
        ChangeJournal.Records recordsNewerThan = recordsNewerThan(deltaRequest.getLastChangeSetTime(), false, false);
        ArrayList arrayList = new ArrayList(recordsNewerThan.size());
        Iterator<JournalRecord> it = recordsNewerThan.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Journal {0} sending delta response to journal {1} as part of cluster {2}. Delta size is {3}", journalId(), deltaRequest.getJournalId(), clusterName(), Integer.valueOf(recordsNewerThan.size()));
        }
        this.clusteringService.sendMessage(DeltaMessage.response(journalId(), arrayList));
    }

    private void processDeltaResponse(DeltaMessage.DeltaResponse deltaResponse) {
        List<JournalRecord> records = deltaResponse.getRecords();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Journal {0} received delta response from journal {1} as part of cluster {2}. Delta size: {3} records.", journalId(), deltaResponse.getJournalId(), clusterName(), Integer.valueOf(records.size()));
        }
        if (!records.isEmpty()) {
            this.localJournal.addRecords((JournalRecord[]) records.toArray(new JournalRecord[0]));
        }
        this.reconciliationLatch.countDown();
    }

    protected ClusteringService clusteringService() {
        return this.clusteringService;
    }

    protected LocalJournal localJournal() {
        return this.localJournal;
    }

    protected String clusterName() {
        return clusteringService().clusterName();
    }
}
