/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.elasticsearch.river.remote;

import java.util.Date;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.jboss.elasticsearch.river.remote.ChangedDocumentsResults;
import org.jboss.elasticsearch.river.remote.IDocumentIndexStructureBuilder;
import org.jboss.elasticsearch.river.remote.IESIntegration;
import org.jboss.elasticsearch.river.remote.IRemoteSystemClient;
import org.jboss.elasticsearch.river.remote.SpaceIndexerBase;
import org.jboss.elasticsearch.river.remote.SpaceIndexingInfo;

public class SpaceByLastUpdateTimestampIndexer
extends SpaceIndexerBase {
    protected static final String STORE_PROPERTYNAME_LAST_INDEXED_DOC_UPDATE_DATE = "lastIndexedDocumentUpdateDate";

    public SpaceByLastUpdateTimestampIndexer(String spaceKey, boolean fullUpdate, IRemoteSystemClient remoteSystemClient, IESIntegration esIntegrationComponent, IDocumentIndexStructureBuilder documentIndexStructureBuilder) {
        super(spaceKey, remoteSystemClient, esIntegrationComponent, documentIndexStructureBuilder);
        this.logger = esIntegrationComponent.createLogger(SpaceByLastUpdateTimestampIndexer.class);
        this.indexingInfo = new SpaceIndexingInfo(spaceKey, fullUpdate);
    }

    @Override
    protected void processUpdate() throws Exception {
        this.indexingInfo.documentsUpdated = 0;
        Date updatedAfter = null;
        if (!this.indexingInfo.fullUpdate) {
            updatedAfter = this.readLastDocumentUpdatedDate(this.spaceKey);
        }
        Date updatedAfterStarting = updatedAfter;
        if (updatedAfter == null) {
            this.indexingInfo.fullUpdate = true;
        }
        Date lastDocumentUpdatedDate = null;
        int startAt = 0;
        this.logger.info("Go to perform {} update for Space {}", new Object[]{this.indexingInfo.fullUpdate ? "full" : "incremental", this.spaceKey});
        boolean cont = true;
        while (cont) {
            ChangedDocumentsResults res;
            if (this.isClosed()) {
                throw new InterruptedException("Interrupted because River is closed");
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Go to ask remote system for updated documents for space {} with startAt {} and updated {}", new Object[]{this.spaceKey, startAt, updatedAfter != null ? "after " + updatedAfter : "in whole history"});
            }
            if ((res = this.remoteSystemClient.getChangedDocuments(this.spaceKey, startAt, this.indexingInfo.fullUpdate, updatedAfter)).getDocumentsCount() == 0) {
                cont = false;
                continue;
            }
            if (this.isClosed()) {
                throw new InterruptedException("Interrupted because River is closed");
            }
            Date firstDocumentUpdatedDate = null;
            int updatedInThisBulk = 0;
            boolean deletedInThisBulk = false;
            BulkRequestBuilder esBulk = this.esIntegrationComponent.prepareESBulkRequestBuilder();
            for (Map<String, Object> document : res.getDocuments()) {
                String documentId = this.getDocumentIdChecked(document);
                if (this.getDocumentDetail(documentId, document)) {
                    lastDocumentUpdatedDate = this.documentIndexStructureBuilder.extractDocumentUpdated(document);
                    this.logger.debug("Go to update index for document '{}' with updated {}", new Object[]{documentId, lastDocumentUpdatedDate});
                    if (lastDocumentUpdatedDate == null) {
                        throw new IllegalArgumentException("Last update timestamp not found in data for document " + documentId);
                    }
                    if (firstDocumentUpdatedDate == null) {
                        firstDocumentUpdatedDate = lastDocumentUpdatedDate;
                    }
                    if (this.documentIndexStructureBuilder.extractDocumentDeleted(document)) {
                        deletedInThisBulk = this.prepareDeleteByRemoteDocumentId(esBulk, documentId) || deletedInThisBulk;
                    } else {
                        this.documentIndexStructureBuilder.indexDocument(esBulk, this.spaceKey, document);
                        ++updatedInThisBulk;
                    }
                }
                if (!this.isClosed()) continue;
                throw new InterruptedException("Interrupted because River is closed");
            }
            if (lastDocumentUpdatedDate != null) {
                this.storeLastDocumentUpdatedDate(esBulk, this.spaceKey, lastDocumentUpdatedDate);
            }
            if (updatedInThisBulk > 0 || deletedInThisBulk) {
                this.executeBulkUpdate(esBulk);
                this.indexingInfo.documentsUpdated += updatedInThisBulk;
            }
            if (lastDocumentUpdatedDate != null && firstDocumentUpdatedDate != null && !lastDocumentUpdatedDate.equals(firstDocumentUpdatedDate)) {
                updatedAfter = lastDocumentUpdatedDate;
                if (res.getTotal() != null) {
                    cont = res.getTotal() > res.getStartAt() + res.getDocumentsCount();
                }
                startAt = 0;
                continue;
            }
            if (res.getTotal() != null) {
                startAt = res.getStartAt() + res.getDocumentsCount();
                cont = res.getTotal() > startAt;
                continue;
            }
            long t = 0L;
            if (lastDocumentUpdatedDate != null) {
                t = lastDocumentUpdatedDate.getTime();
            } else if (firstDocumentUpdatedDate != null) {
                t = firstDocumentUpdatedDate.getTime();
            }
            if (t > 0L) {
                updatedAfter = new Date(t + 1000L);
                this.logger.warn("All documents loaded from remote system for space '{}' contain same update timestamp {}, but we have no total count from response, so we may miss some documents because we shift timestamp for new request by one second to {}!", new Object[]{this.spaceKey, lastDocumentUpdatedDate, updatedAfter});
                startAt = 0;
                continue;
            }
            this.logger.warn("All documents loaded from remote system for space '{}' are unreachable and we have no total count of records, so we have to finish indexing for now.", new Object[]{this.spaceKey});
            cont = false;
        }
        if (this.indexingInfo.documentsUpdated > 0 && lastDocumentUpdatedDate != null && updatedAfterStarting != null && updatedAfterStarting.equals(lastDocumentUpdatedDate)) {
            this.storeLastDocumentUpdatedDate(null, this.spaceKey, new Date(lastDocumentUpdatedDate.getTime() + 1000L));
        }
    }

    protected Date readLastDocumentUpdatedDate(String spaceKey) throws Exception {
        return this.esIntegrationComponent.readDatetimeValue(spaceKey, STORE_PROPERTYNAME_LAST_INDEXED_DOC_UPDATE_DATE);
    }

    protected void storeLastDocumentUpdatedDate(BulkRequestBuilder esBulk, String spaceKey, Date lastDocumentUpdatedDate) throws Exception {
        this.esIntegrationComponent.storeDatetimeValue(spaceKey, STORE_PROPERTYNAME_LAST_INDEXED_DOC_UPDATE_DATE, lastDocumentUpdatedDate, esBulk);
    }
}

