/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.elasticsearch.river.remote;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.jboss.elasticsearch.river.remote.DateTimeUtils;
import org.jboss.elasticsearch.river.remote.DocumentWithCommentsIndexStructureBuilder;
import org.jboss.elasticsearch.river.remote.GetJSONClient;
import org.jboss.elasticsearch.river.remote.IDocumentIndexStructureBuilder;
import org.jboss.elasticsearch.river.remote.IESIntegration;
import org.jboss.elasticsearch.river.remote.IPwdLoader;
import org.jboss.elasticsearch.river.remote.IRemoteSystemClient;
import org.jboss.elasticsearch.river.remote.IRiverMgm;
import org.jboss.elasticsearch.river.remote.ISpaceIndexerCoordinator;
import org.jboss.elasticsearch.river.remote.SpaceIndexerCoordinator;
import org.jboss.elasticsearch.river.remote.SpaceIndexingInfo;
import org.jboss.elasticsearch.river.remote.Utils;
import org.jboss.elasticsearch.tools.content.StructuredContentPreprocessorFactory;

public class RemoteRiver
extends AbstractRiverComponent
implements River,
IESIntegration,
IRiverMgm,
IPwdLoader {
    protected static Map<String, IRiverMgm> riverInstances = new HashMap<String, IRiverMgm>();
    protected static final String PERMSTOREPROP_RIVER_STOPPED_PERMANENTLY = "river_stopped_permanently";
    protected static final long SPACES_REFRESH_TIME = 1800000L;
    public static final String INDEX_DOCUMENT_TYPE_NAME_DEFAULT = "remote_document";
    public static final String INDEX_ACTIVITY_TYPE_NAME_DEFAULT = "remote_river_indexupdate";
    protected Client client;
    protected IRemoteSystemClient remoteSystemClient;
    protected IDocumentIndexStructureBuilder documentIndexStructureBuilder;
    protected int maxIndexingThreads;
    protected long indexUpdatePeriod;
    protected long indexFullUpdatePeriod = -1L;
    protected String indexName;
    protected String typeName;
    protected String activityLogIndexName;
    protected String activityLogTypeName;
    protected Thread coordinatorThread;
    protected ISpaceIndexerCoordinator coordinatorInstance;
    protected volatile boolean closed = true;
    protected List<String> spaceKeysExcluded = null;
    protected List<String> allIndexedSpacesKeys = null;
    protected long allIndexedSpacesKeysNextRefresh = 0L;
    protected Map<String, SpaceIndexingInfo> lastSpaceIndexingInfo = new HashMap<String, SpaceIndexingInfo>();
    protected Date lastRestartDate;
    protected Date permanentStopDate;
    protected static final String STORE_FIELD_VALUE = "value";
    private static final long ES_SCROLL_KEEPALIVE = 60000L;

    @Inject
    public RemoteRiver(RiverName riverName, RiverSettings settings, Client client) throws MalformedURLException {
        super(riverName, settings);
        this.client = client;
        this.configure(settings.settings());
    }

    protected void configure(Map<String, Object> settings) {
        Map remoteSettings;
        if (!this.closed) {
            throw new IllegalStateException("Remote River must be stopped to configure it!");
        }
        if (settings.containsKey("remote")) {
            String remoteClientClass;
            remoteSettings = (Map)settings.get("remote");
            this.maxIndexingThreads = XContentMapValues.nodeIntegerValue(remoteSettings.get("maxIndexingThreads"), (int)1);
            this.indexUpdatePeriod = Utils.parseTimeValue(remoteSettings, "indexUpdatePeriod", 5L, TimeUnit.MINUTES);
            this.indexFullUpdatePeriod = Utils.parseTimeValue(remoteSettings, "indexFullUpdatePeriod", 12L, TimeUnit.HOURS);
            if (remoteSettings.containsKey("spacesIndexed")) {
                this.allIndexedSpacesKeys = Utils.parseCsvString(XContentMapValues.nodeStringValue(remoteSettings.get("spacesIndexed"), null));
                if (this.allIndexedSpacesKeys != null) {
                    this.allIndexedSpacesKeysNextRefresh = Long.MAX_VALUE;
                }
            }
            if (remoteSettings.containsKey("spaceKeysExcluded")) {
                this.spaceKeysExcluded = Utils.parseCsvString(XContentMapValues.nodeStringValue(remoteSettings.get("spaceKeysExcluded"), null));
            }
            if ((remoteClientClass = Utils.trimToNull(XContentMapValues.nodeStringValue(remoteSettings.get("remoteClientClass"), null))) != null) {
                try {
                    this.remoteSystemClient = (IRemoteSystemClient)Class.forName(remoteClientClass).newInstance();
                }
                catch (Exception e) {
                    throw new SettingsException("Unable to instantiate class defined by 'remote/remoteClientClass': " + e.getMessage());
                }
            } else {
                this.remoteSystemClient = new GetJSONClient();
            }
        } else {
            throw new SettingsException("'remote' element of river configuration structure not found");
        }
        this.remoteSystemClient.init(remoteSettings, this.allIndexedSpacesKeysNextRefresh != Long.MAX_VALUE, this);
        Map indexSettings = null;
        if (settings.containsKey("index")) {
            indexSettings = (Map)settings.get("index");
            this.indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), (String)this.riverName.name());
            this.typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), (String)INDEX_DOCUMENT_TYPE_NAME_DEFAULT);
        } else {
            this.indexName = this.riverName.name();
            this.typeName = INDEX_DOCUMENT_TYPE_NAME_DEFAULT;
        }
        Map activityLogSettings = null;
        if (settings.containsKey("activity_log")) {
            activityLogSettings = (Map)settings.get("activity_log");
            this.activityLogIndexName = Utils.trimToNull(XContentMapValues.nodeStringValue(activityLogSettings.get("index"), null));
            if (this.activityLogIndexName == null) {
                throw new SettingsException("'activity_log/index' element of river configuration structure must be defined with some string");
            }
            this.activityLogTypeName = Utils.trimToNull(XContentMapValues.nodeStringValue(activityLogSettings.get("type"), (String)INDEX_ACTIVITY_TYPE_NAME_DEFAULT));
        }
        this.documentIndexStructureBuilder = new DocumentWithCommentsIndexStructureBuilder(this.riverName.getName(), this.indexName, this.typeName, indexSettings);
        this.preparePreprocessors(indexSettings, this.documentIndexStructureBuilder);
        this.remoteSystemClient.setIndexStructureBuilder(this.documentIndexStructureBuilder);
        this.logger.info("Configured Remote River '{}'. Search index name '{}', document type for issues '{}'.", new Object[]{this.riverName.getName(), this.indexName, this.typeName});
        if (this.activityLogIndexName != null) {
            this.logger.info("Activity log for Remote River '{}' is enabled. Search index name '{}', document type for index updates '{}'.", new Object[]{this.riverName.getName(), this.activityLogIndexName, this.activityLogTypeName});
        }
    }

    private void preparePreprocessors(Map<String, Object> indexSettings, IDocumentIndexStructureBuilder indexStructureBuilder) {
        List preproclist;
        if (indexSettings != null && (preproclist = (List)indexSettings.get("preprocessors")) != null && preproclist.size() > 0) {
            for (Map ppc : preproclist) {
                try {
                    indexStructureBuilder.addDataPreprocessor(StructuredContentPreprocessorFactory.createPreprocessor((Map)ppc, (Client)this.client));
                }
                catch (IllegalArgumentException e) {
                    throw new SettingsException(e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    protected RemoteRiver(RiverName riverName, RiverSettings settings) {
        super(riverName, settings);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void start() {
        if (!this.closed) {
            throw new IllegalStateException("Can't start already running river");
        }
        this.logger.info("starting Remote River", new Object[0]);
        Map<String, IRiverMgm> map = riverInstances;
        synchronized (map) {
            RemoteRiver.addRunningInstance(this);
        }
        this.refreshSearchIndex(this.getRiverIndexName());
        try {
            this.permanentStopDate = this.readDatetimeValue(null, PERMSTOREPROP_RIVER_STOPPED_PERMANENTLY);
            if (this.permanentStopDate != null) {
                this.logger.info("Remote River indexing process not started because stopped permanently, you can restart it over management REST API", new Object[0]);
                return;
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
        this.logger.info("starting Remote River indexing process", new Object[0]);
        this.closed = false;
        this.lastRestartDate = new Date();
        this.coordinatorInstance = new SpaceIndexerCoordinator(this.remoteSystemClient, this, this.documentIndexStructureBuilder, this.indexUpdatePeriod, this.maxIndexingThreads, this.indexFullUpdatePeriod);
        this.coordinatorThread = this.acquireIndexingThread("remote_river_coordinator", this.coordinatorInstance);
        this.coordinatorThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() {
        this.logger.info("closing Remote River on this node", new Object[0]);
        this.closed = true;
        if (this.coordinatorThread != null) {
            this.coordinatorThread.interrupt();
        }
        this.coordinatorThread = null;
        this.coordinatorInstance = null;
        Map<String, IRiverMgm> map = riverInstances;
        synchronized (map) {
            riverInstances.remove(this.riverName().getName());
        }
    }

    @Override
    public synchronized void stop(boolean permanent) {
        this.logger.info("stopping Remote River indexing process", new Object[0]);
        this.closed = true;
        if (this.coordinatorThread != null) {
            this.coordinatorThread.interrupt();
        }
        this.coordinatorThread = null;
        this.coordinatorInstance = null;
        if (permanent) {
            try {
                this.permanentStopDate = new Date();
                this.storeDatetimeValue(null, PERMSTOREPROP_RIVER_STOPPED_PERMANENTLY, this.permanentStopDate, null);
                this.refreshSearchIndex(this.getRiverIndexName());
                this.logger.info("Remote River indexing process stopped permanently, you can restart it over management REST API", new Object[0]);
            }
            catch (IOException e) {
                this.logger.warn("Permanent stopped value storing failed {}", new Object[]{e.getMessage()});
            }
        }
    }

    public synchronized void reconfigure() {
        if (!this.closed) {
            throw new IllegalStateException("Remote River must be stopped to reconfigure it!");
        }
        this.logger.info("reconfiguring Remote River", new Object[0]);
        String riverIndexName = this.getRiverIndexName();
        this.refreshSearchIndex(riverIndexName);
        GetResponse resp = (GetResponse)this.client.prepareGet(riverIndexName, this.riverName().name(), "_meta").execute().actionGet();
        if (resp.isExists()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Configuration document: {}", new Object[]{resp.getSourceAsString()});
            }
        } else {
            throw new IllegalStateException("Configuration document not found to reconfigure remote river " + this.riverName().name());
        }
        Map newset = resp.getSource();
        this.configure(newset);
    }

    @Override
    public synchronized void restart() {
        this.logger.info("restarting Remote River", new Object[0]);
        boolean cleanPermanent = true;
        if (!this.closed) {
            cleanPermanent = false;
            this.stop(false);
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException e) {
                return;
            }
        } else {
            this.logger.debug("stopped already", new Object[0]);
        }
        this.reconfigure();
        if (cleanPermanent) {
            this.deleteDatetimeValue(null, PERMSTOREPROP_RIVER_STOPPED_PERMANENTLY);
        }
        this.start();
        this.logger.info("Remote River restarted", new Object[0]);
    }

    @Override
    public boolean isClosed() {
        return this.closed;
    }

    @Override
    public String forceFullReindex(String spaceKey) throws Exception {
        if (this.coordinatorInstance == null) {
            return null;
        }
        List<String> pkeys = this.getAllIndexedSpaceKeys();
        if (Utils.isEmpty(spaceKey)) {
            if (pkeys != null) {
                for (String k : pkeys) {
                    this.coordinatorInstance.forceFullReindex(k);
                }
                return Utils.createCsvString(pkeys);
            }
            return "";
        }
        if (pkeys != null && pkeys.contains(spaceKey)) {
            this.coordinatorInstance.forceFullReindex(spaceKey);
            return spaceKey;
        }
        return null;
    }

    @Override
    public String getRiverOperationInfo(DiscoveryNode esNode, Date currentDate) throws Exception {
        List<String> pkeys;
        List<SpaceIndexingInfo> currProjectIndexingInfo;
        XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
        builder.startObject();
        builder.field("river_name", this.riverName().getName());
        builder.field("info_date", currentDate);
        builder.startObject("indexing");
        builder.field("state", this.closed ? "stopped" : "running");
        if (!this.closed) {
            builder.field("last_restart", this.lastRestartDate);
        } else if (this.permanentStopDate != null) {
            builder.field("stopped_permanently", this.permanentStopDate);
        }
        builder.endObject();
        if (esNode != null) {
            builder.startObject("node");
            builder.field("id", esNode.getId());
            builder.field("name", esNode.getName());
            builder.endObject();
        }
        if (this.coordinatorInstance != null && (currProjectIndexingInfo = this.coordinatorInstance.getCurrentSpaceIndexingInfo()) != null) {
            builder.startArray("current_indexing");
            for (SpaceIndexingInfo pi : currProjectIndexingInfo) {
                pi.buildDocument(builder, true, false);
            }
            builder.endArray();
        }
        if ((pkeys = this.getAllIndexedSpaceKeys()) != null) {
            builder.startArray("indexed_spaces");
            for (String spaceKey : pkeys) {
                builder.startObject();
                builder.field("space_key", spaceKey);
                SpaceIndexingInfo lastIndexing = this.getLastSpaceIndexingInfo(spaceKey);
                if (lastIndexing != null) {
                    builder.field("last_indexing");
                    lastIndexing.buildDocument(builder, false, true);
                }
                builder.endObject();
            }
            builder.endArray();
        }
        builder.endObject();
        return builder.string();
    }

    protected SpaceIndexingInfo getLastSpaceIndexingInfo(String spaceKey) {
        SpaceIndexingInfo lastIndexing = this.lastSpaceIndexingInfo.get(spaceKey);
        if (lastIndexing == null && this.activityLogIndexName != null) {
            try {
                this.refreshSearchIndex(this.activityLogIndexName);
                SearchResponse sr = (SearchResponse)this.client.prepareSearch(new String[]{this.activityLogIndexName}).setTypes(new String[]{this.activityLogTypeName}).setFilter((FilterBuilder)FilterBuilders.termFilter((String)"space_key", (String)spaceKey)).setQuery((QueryBuilder)QueryBuilders.matchAllQuery()).addSort("start_date", SortOrder.DESC).addField("_source").setSize(1).execute().actionGet();
                if (sr.getHits().getTotalHits() > 0L) {
                    SearchHit hit = sr.getHits().getAt(0);
                    lastIndexing = SpaceIndexingInfo.readFromDocument(hit.sourceAsMap());
                } else {
                    this.logger.debug("No last indexing info found in activity log for space {}", new Object[]{spaceKey});
                }
            }
            catch (Exception e) {
                this.logger.warn("Error during LastSpaceIndexingInfo reading from activity log ES index: {} {}", new Object[]{e.getClass().getName(), e.getMessage()});
            }
        }
        return lastIndexing;
    }

    public static IRiverMgm getRunningInstance(String riverName) {
        if (riverName == null) {
            return null;
        }
        return riverInstances.get(riverName);
    }

    public static void addRunningInstance(IRiverMgm remoteRiver) {
        riverInstances.put(remoteRiver.riverName().getName(), remoteRiver);
    }

    public static Set<String> getRunningInstances() {
        return Collections.unmodifiableSet(riverInstances.keySet());
    }

    public static void removeRunningInstances(String ... riverNames) {
        for (String riverName : riverNames) {
            riverInstances.remove(riverName);
        }
    }

    @Override
    public List<String> getAllIndexedSpaceKeys() throws Exception {
        if (this.allIndexedSpacesKeys == null || this.allIndexedSpacesKeysNextRefresh < System.currentTimeMillis()) {
            this.allIndexedSpacesKeys = this.remoteSystemClient.getAllSpaces();
            if (this.spaceKeysExcluded != null) {
                this.allIndexedSpacesKeys.removeAll(this.spaceKeysExcluded);
            }
            this.allIndexedSpacesKeysNextRefresh = System.currentTimeMillis() + 1800000L;
        }
        return this.allIndexedSpacesKeys;
    }

    @Override
    public void reportIndexingFinished(SpaceIndexingInfo indexingInfo) {
        this.lastSpaceIndexingInfo.put(indexingInfo.spaceKey, indexingInfo);
        if (this.coordinatorInstance != null) {
            try {
                this.coordinatorInstance.reportIndexingFinished(indexingInfo.spaceKey, indexingInfo.finishedOK, indexingInfo.fullUpdate);
            }
            catch (Exception e) {
                this.logger.warn("Indexing finished reporting to coordinator failed due {}", new Object[]{e.getMessage()});
            }
        }
        this.writeActivityLogRecord(indexingInfo);
    }

    protected void writeActivityLogRecord(SpaceIndexingInfo indexingInfo) {
        if (this.activityLogIndexName != null) {
            try {
                this.client.prepareIndex(this.activityLogIndexName, this.activityLogTypeName).setSource(indexingInfo.buildDocument(XContentFactory.jsonBuilder(), true, true)).execute().actionGet();
            }
            catch (Exception e) {
                this.logger.error("Error during index update result writing to the audit log {}", new Object[]{e.getMessage()});
            }
        }
    }

    @Override
    public void storeDatetimeValue(String spaceKey, String propertyName, Date datetime, BulkRequestBuilder esBulk) throws IOException {
        String documentName = RemoteRiver.prepareValueStoreDocumentName(spaceKey, propertyName);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Going to write {} property with datetime value {} for space {} using {} update. Document name is {}.", new Object[]{propertyName, datetime, spaceKey, esBulk != null ? "bulk" : "direct", documentName});
        }
        if (esBulk != null) {
            esBulk.add(Requests.indexRequest((String)this.getRiverIndexName()).type(this.riverName.name()).id(documentName).source(this.storeDatetimeValueBuildDocument(spaceKey, propertyName, datetime)));
        } else {
            this.client.prepareIndex(this.getRiverIndexName(), this.riverName.name(), documentName).setSource(this.storeDatetimeValueBuildDocument(spaceKey, propertyName, datetime)).execute().actionGet();
        }
    }

    protected XContentBuilder storeDatetimeValueBuildDocument(String spaceKey, String propertyName, Date datetime) throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
        if (spaceKey != null) {
            builder.field("spaceKey", spaceKey);
        }
        builder.field("propertyName", propertyName).field(STORE_FIELD_VALUE, DateTimeUtils.formatISODateTime(datetime));
        builder.endObject();
        return builder;
    }

    @Override
    public Date readDatetimeValue(String spaceKey, String propertyName) throws IOException {
        Date lastDate = null;
        String documentName = RemoteRiver.prepareValueStoreDocumentName(spaceKey, propertyName);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Going to read datetime value from {} property for space {}. Document name is {}.", new Object[]{propertyName, spaceKey, documentName});
        }
        this.refreshSearchIndex(this.getRiverIndexName());
        GetResponse lastSeqGetResponse = (GetResponse)this.client.prepareGet(this.getRiverIndexName(), this.riverName.name(), documentName).execute().actionGet();
        if (lastSeqGetResponse.isExists()) {
            Object timestamp = lastSeqGetResponse.getSourceAsMap().get(STORE_FIELD_VALUE);
            if (timestamp != null) {
                lastDate = DateTimeUtils.parseISODateTime(timestamp.toString());
            }
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("{} document doesn't exist in remore river persistent store", new Object[]{documentName});
        }
        return lastDate;
    }

    @Override
    public boolean deleteDatetimeValue(String spaceKey, String propertyName) {
        String documentName = RemoteRiver.prepareValueStoreDocumentName(spaceKey, propertyName);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Going to delete datetime value from {} property for space {}. Document name is {}.", new Object[]{propertyName, spaceKey, documentName});
        }
        this.refreshSearchIndex(this.getRiverIndexName());
        DeleteResponse lastSeqGetResponse = (DeleteResponse)this.client.prepareDelete(this.getRiverIndexName(), this.riverName.name(), documentName).execute().actionGet();
        if (lastSeqGetResponse.isNotFound()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("{} document doesn't exist in remote river persistent store", new Object[]{documentName});
            }
            return false;
        }
        return true;
    }

    protected String getRiverIndexName() {
        return "_river";
    }

    protected static String prepareValueStoreDocumentName(String spaceKey, String propertyName) {
        if (spaceKey != null) {
            return "_" + propertyName + "_" + spaceKey;
        }
        return "_" + propertyName;
    }

    @Override
    public BulkRequestBuilder prepareESBulkRequestBuilder() {
        return this.client.prepareBulk();
    }

    @Override
    public void executeESBulkRequest(BulkRequestBuilder esBulk) throws Exception {
        BulkResponse response = (BulkResponse)esBulk.execute().actionGet();
        if (response.hasFailures()) {
            throw new ElasticSearchException("Failed to execute ES index bulk update: " + response.buildFailureMessage());
        }
    }

    @Override
    public Thread acquireIndexingThread(String threadName, Runnable runnable) {
        return EsExecutors.daemonThreadFactory((Settings)this.settings.globalSettings(), (String)threadName).newThread(runnable);
    }

    @Override
    public void refreshSearchIndex(String indexName) {
        this.client.admin().indices().prepareRefresh(new String[]{indexName}).execute().actionGet();
    }

    @Override
    public SearchRequestBuilder prepareESScrollSearchRequestBuilder(String indexName) {
        return this.client.prepareSearch(new String[]{indexName}).setScroll(new TimeValue(60000L)).setSearchType(SearchType.SCAN).setSize(100);
    }

    @Override
    public SearchResponse executeESSearchRequest(SearchRequestBuilder searchRequestBuilder) {
        return (SearchResponse)searchRequestBuilder.execute().actionGet();
    }

    @Override
    public SearchResponse executeESScrollSearchNextRequest(SearchResponse scrollResp) {
        return (SearchResponse)this.client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000L)).execute().actionGet();
    }

    @Override
    public String loadPassword(String username) {
        this.logger.info("loading password for username {}", new Object[]{username});
        String ret = null;
        String riverIndexName = this.getRiverIndexName();
        this.refreshSearchIndex(riverIndexName);
        GetResponse resp = (GetResponse)this.client.prepareGet(riverIndexName, this.riverName().name(), "_pwd").execute().actionGet();
        if (resp.isExists()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Password document: {}", new Object[]{resp.getSourceAsString()});
            }
            Map newset = resp.getSource();
            ret = XContentMapValues.nodeStringValue(newset.get("pwd"), null);
        }
        return ret;
    }
}

