package com.thinkaurelius.titan.graphdb.fulgora;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
import com.thinkaurelius.titan.core.TitanEdge;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.olap.OLAPJob;
import com.thinkaurelius.titan.core.olap.OLAPResult;
import com.thinkaurelius.titan.core.olap.StateInitializer;
import com.thinkaurelius.titan.diskstorage.BackendTransaction;
import com.thinkaurelius.titan.diskstorage.Entry;
import com.thinkaurelius.titan.diskstorage.EntryList;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyIterator;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery;
import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
import com.thinkaurelius.titan.graphdb.database.EdgeSerializer;
import com.thinkaurelius.titan.graphdb.idmanagement.IDManager;
import com.thinkaurelius.titan.graphdb.internal.InternalRelation;
import com.thinkaurelius.titan.graphdb.internal.InternalVertex;
import com.thinkaurelius.titan.graphdb.relations.RelationCache;
import com.thinkaurelius.titan.graphdb.transaction.RelationConstructor;
import com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx;
import com.thinkaurelius.titan.graphdb.transaction.VertexFactory;
import com.thinkaurelius.titan.graphdb.types.system.BaseKey;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.tools.ant.taskdefs.WaitFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/titan-core-0.5.4.jar:com/thinkaurelius/titan/graphdb/fulgora/FulgoraExecutor.class */
public class FulgoraExecutor<S> extends AbstractFuture<OLAPResult<S>> implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(FulgoraExecutor.class);
    private static final int QUEUE_SIZE = 1000;
    private static final int TIMEOUT_MS = 60000;
    private final Map<String, FulgoraRelationQuery> queryDefinitions;
    private final int numQueries;
    private final List<BlockingQueue<QueryResult>> dataQueues;
    private final DataPuller[] pullThreads;
    private final int numProcessors;
    private final StandardTitanTx tx;
    private final EdgeSerializer edgeSerializer;
    private final IDManager idManager;
    private final OLAPJob<S> job;
    private final ConcurrentMap<Long, FulgoraExecutor<S>.MessageAccumulator> partitionedVertexMsgs;
    final FulgoraResult<S> vertexStates;
    final String stateKey;
    final StateInitializer<S> initializer;
    private boolean processingException = false;
    private final VertexFactory neighborVertices = new VertexFactory() { // from class: com.thinkaurelius.titan.graphdb.fulgora.FulgoraExecutor.2
        @Override // com.thinkaurelius.titan.graphdb.transaction.VertexFactory
        public InternalVertex getInternalVertex(long j) {
            return new FulgoraNeighborVertex(j, FulgoraExecutor.this);
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/titan-core-0.5.4.jar:com/thinkaurelius/titan/graphdb/fulgora/FulgoraExecutor$DataPuller.class */
    public static class DataPuller extends Thread {
        private final BlockingQueue<QueryResult> queue;
        private final String queryName;
        private final KeyIterator keyIter;
        private final IDManager idManager;
        private volatile boolean finished;

        private DataPuller(IDManager iDManager, String str, BlockingQueue<QueryResult> blockingQueue, KeyIterator keyIterator) {
            this.queryName = str;
            this.queue = blockingQueue;
            this.keyIter = keyIterator;
            this.idManager = iDManager;
            this.finished = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.keyIter.hasNext()) {
                try {
                    try {
                        StaticBuffer staticBuffer = (StaticBuffer) this.keyIter.next();
                        RecordIterator<Entry> entries = this.keyIter.getEntries();
                        long keyID = this.idManager.getKeyID(staticBuffer);
                        if (!IDManager.VertexIDType.Hidden.is(keyID)) {
                            try {
                                this.queue.put(new QueryResult(this.queryName, keyID, StaticArrayEntryList.ofStaticBuffer(entries, StaticArrayEntry.ENTRY_GETTER)));
                            } catch (InterruptedException e) {
                                FulgoraExecutor.log.error("Data-pulling thread interrupted while waiting on queue", e);
                            }
                        }
                    } catch (Throwable th) {
                        FulgoraExecutor.log.error("Could not load data from storage: {}", th);
                        try {
                            this.keyIter.close();
                            return;
                        } catch (IOException e2) {
                            FulgoraExecutor.log.warn("Could not close storage iterator ", e2);
                            return;
                        }
                    }
                } finally {
                    try {
                        this.keyIter.close();
                    } catch (IOException e3) {
                        FulgoraExecutor.log.warn("Could not close storage iterator ", e3);
                    }
                }
            }
            this.finished = true;
        }

        public boolean isFinished() {
            return this.finished;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/titan-core-0.5.4.jar:com/thinkaurelius/titan/graphdb/fulgora/FulgoraExecutor$MessageAccumulator.class */
    public class MessageAccumulator extends HashMap<String, Object> {
        private boolean isDeleted;

        private MessageAccumulator() {
            super(FulgoraExecutor.this.queryDefinitions.size());
            this.isDeleted = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void add(Map<String, Object> map) {
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                if (entry.getValue() != null) {
                    String key = entry.getKey();
                    Object obj = super.get(key);
                    if (obj == null) {
                        super.put(key, entry.getValue());
                    } else {
                        super.put(key, ((FulgoraRelationQuery) FulgoraExecutor.this.queryDefinitions.get(key)).combiner.combine(obj, entry.getValue()));
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void markDeleted() {
            this.isDeleted = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isDeleted() {
            return this.isDeleted;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/titan-core-0.5.4.jar:com/thinkaurelius/titan/graphdb/fulgora/FulgoraExecutor$QueryResult.class */
    public static class QueryResult {
        final EntryList entries;
        final long vertexId;
        final String queryName;

        private QueryResult(String str, long j, EntryList entryList) {
            this.entries = entryList;
            this.vertexId = j;
            this.queryName = str;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/titan-core-0.5.4.jar:com/thinkaurelius/titan/graphdb/fulgora/FulgoraExecutor$VertexProcessor.class */
    private class VertexProcessor implements Runnable {
        final long vertexId;
        final List<QueryResult> queryResults;
        static final /* synthetic */ boolean $assertionsDisabled;

        private VertexProcessor(long j, List<QueryResult> list) {
            this.vertexId = j;
            this.queryResults = list;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                FulgoraVertex fulgoraVertex = new FulgoraVertex(FulgoraExecutor.this.tx, this.vertexId, FulgoraExecutor.this);
                HashMap hashMap = new HashMap(FulgoraExecutor.this.queryDefinitions.size());
                for (QueryResult queryResult : this.queryResults) {
                    String str = queryResult.queryName;
                    FulgoraRelationQuery fulgoraRelationQuery = (FulgoraRelationQuery) FulgoraExecutor.this.queryDefinitions.get(str);
                    Preconditions.checkState(fulgoraRelationQuery != null && queryResult.vertexId == this.vertexId);
                    Object obj = hashMap.get(str);
                    Iterator<Entry> reuseIterator = queryResult.entries.reuseIterator();
                    while (reuseIterator.hasNext()) {
                        Entry next = reuseIterator.next();
                        InternalRelation readRelation = RelationConstructor.readRelation(fulgoraVertex, next, FulgoraExecutor.this.edgeSerializer, FulgoraExecutor.this.tx, FulgoraExecutor.this.neighborVertices);
                        if (fulgoraRelationQuery instanceof FulgoraPropertyQuery) {
                            Preconditions.checkArgument(readRelation instanceof TitanProperty);
                            obj = ((FulgoraPropertyQuery) fulgoraRelationQuery).process((TitanProperty) readRelation, obj);
                        } else {
                            if (!$assertionsDisabled && !(fulgoraRelationQuery instanceof FulgoraEdgeQuery)) {
                                throw new AssertionError();
                            }
                            Preconditions.checkArgument(readRelation instanceof TitanEdge);
                            obj = ((FulgoraEdgeQuery) fulgoraRelationQuery).process((TitanEdge) readRelation, FulgoraExecutor.this.edgeSerializer.parseDirection(next), FulgoraExecutor.this.getVertexState(((TitanEdge) readRelation).getOtherVertex(fulgoraVertex).getLongId()), obj);
                        }
                    }
                    if (obj != null) {
                        hashMap.put(str, obj);
                    }
                }
                if (FulgoraExecutor.this.idManager.isPartitionedVertex(this.vertexId)) {
                    FulgoraExecutor.this.getMessageAccumulator(this.vertexId).add(hashMap);
                } else {
                    FulgoraExecutor.this.processVertex(fulgoraVertex, hashMap);
                }
            } catch (Throwable th) {
                FulgoraExecutor.log.error("Exception processing relations for [" + this.vertexId + "]: ", th);
                FulgoraExecutor.this.processingException = true;
            }
        }

        static {
            $assertionsDisabled = !FulgoraExecutor.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FulgoraExecutor(Map<String, FulgoraRelationQuery> map, StandardTitanTx standardTitanTx, IDManager iDManager, int i, String str, OLAPJob oLAPJob, StateInitializer<S> stateInitializer, FulgoraResult<S> fulgoraResult) {
        this.tx = standardTitanTx;
        this.edgeSerializer = standardTitanTx.getEdgeSerializer();
        this.stateKey = str;
        this.job = oLAPJob;
        this.initializer = stateInitializer;
        BackendTransaction txHandle = standardTitanTx.getTxHandle();
        this.idManager = iDManager;
        this.queryDefinitions = map;
        this.numProcessors = i;
        int i2 = 0;
        Iterator<FulgoraRelationQuery> it2 = map.values().iterator();
        while (it2.hasNext()) {
            i2 += it2.next().queries.size();
        }
        this.numQueries = i2 + 1;
        this.dataQueues = new ArrayList(this.numQueries);
        this.pullThreads = new DataPuller[this.numQueries];
        int i3 = 0 + 1;
        this.pullThreads[0] = addDataPuller(BaseKey.VertexExists.getName(), new SliceQuery(BufferUtil.zeroBuffer(4), BufferUtil.oneBuffer(4)).setLimit(1), txHandle);
        for (Map.Entry<String, FulgoraRelationQuery> entry : map.entrySet()) {
            String key = entry.getKey();
            Iterator<SliceQuery> it3 = entry.getValue().queries.iterator();
            while (it3.hasNext()) {
                int i4 = i3;
                i3++;
                this.pullThreads[i4] = addDataPuller(key, it3.next(), txHandle);
            }
        }
        this.partitionedVertexMsgs = new ConcurrentHashMap();
        this.vertexStates = fulgoraResult;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandardTitanTx tx() {
        return this.tx;
    }

    private final DataPuller addDataPuller(String str, SliceQuery sliceQuery, BackendTransaction backendTransaction) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1000);
        this.dataQueues.add(linkedBlockingQueue);
        DataPuller dataPuller = new DataPuller(this.idManager, str, linkedBlockingQueue, backendTransaction.edgeStoreKeys(sliceQuery));
        dataPuller.start();
        return dataPuller;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public S getVertexState(long j) {
        if (IDManager.VertexIDType.Hidden.is(j)) {
            return null;
        }
        if (this.idManager.isPartitionedVertex(j)) {
            j = this.idManager.getCanonicalVertexId(j);
        }
        S s = this.vertexStates.get(j);
        if (s == null) {
            s = this.initializer.initialState();
        }
        return s;
    }

    void setVertexState(long j, S s) {
        this.vertexStates.set(j, s);
    }

    @Override // java.lang.Runnable
    public void run() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.numProcessors, this.numProcessors, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1000));
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        try {
            try {
                QueryResult[] queryResultArr = new QueryResult[this.numQueries];
                while (true) {
                    for (int i = 0; i < this.numQueries; i++) {
                        if (queryResultArr[i] == null) {
                            BlockingQueue<QueryResult> blockingQueue = this.dataQueues.get(i);
                            QueryResult poll = blockingQueue.poll(10L, TimeUnit.MILLISECONDS);
                            if (poll == null) {
                                if (!this.pullThreads[i].isFinished()) {
                                    poll = blockingQueue.poll(WaitFor.ONE_MINUTE, TimeUnit.MILLISECONDS);
                                    if (poll == null && !this.pullThreads[i].isFinished()) {
                                        throw new TitanException("Timed out waiting for next vertex data - storage error likely");
                                    }
                                }
                            }
                            queryResultArr[i] = poll;
                        }
                    }
                    QueryResult queryResult = queryResultArr[0];
                    queryResultArr[0] = null;
                    if (queryResult == null) {
                        threadPoolExecutor.shutdown();
                        threadPoolExecutor.awaitTermination(WaitFor.ONE_MINUTE, TimeUnit.MILLISECONDS);
                        if (!threadPoolExecutor.isTerminated()) {
                            throw new TitanException("Timed out waiting for vertex processors");
                        }
                        for (int i2 = 0; i2 < this.pullThreads.length; i2++) {
                            this.pullThreads[i2].join(10L);
                            if (this.pullThreads[i2].isAlive()) {
                                throw new TitanException("Could not join data pulling thread");
                            }
                        }
                        if (!this.partitionedVertexMsgs.isEmpty()) {
                            threadPoolExecutor = new ThreadPoolExecutor(this.numProcessors, this.numProcessors, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1000));
                            threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                            for (Map.Entry<Long, FulgoraExecutor<S>.MessageAccumulator> entry : this.partitionedVertexMsgs.entrySet()) {
                                final FulgoraExecutor<S>.MessageAccumulator value = entry.getValue();
                                if (value.isDeleted()) {
                                    log.warn("Found deleted partitioned vertex with id: {}. Skipping", entry.getKey());
                                } else {
                                    final FulgoraVertex fulgoraVertex = new FulgoraVertex(this.tx, entry.getKey().longValue(), this);
                                    threadPoolExecutor.submit(new Runnable() { // from class: com.thinkaurelius.titan.graphdb.fulgora.FulgoraExecutor.1
                                        @Override // java.lang.Runnable
                                        public void run() {
                                            FulgoraExecutor.this.processVertex(fulgoraVertex, value);
                                        }
                                    });
                                }
                            }
                            threadPoolExecutor.shutdown();
                            threadPoolExecutor.awaitTermination(WaitFor.ONE_MINUTE, TimeUnit.MILLISECONDS);
                            if (!threadPoolExecutor.isTerminated()) {
                                throw new TitanException("Timed out waiting for partitioned-vertex processors");
                            }
                        }
                        this.tx.rollback();
                        set(this.vertexStates);
                        threadPoolExecutor.shutdownNow();
                        return;
                    }
                    RelationCache parseRelation = this.tx.getEdgeSerializer().parseRelation(queryResult.entries.get(0), true, this.tx);
                    long j = queryResult.vertexId;
                    if (parseRelation.typeId == BaseKey.VertexExists.getLongId() || (this.idManager.isPartitionedVertex(j) && !this.idManager.isCanonicalVertexId(j))) {
                        ArrayList arrayList = new ArrayList(this.numQueries - 1);
                        for (int i3 = 1; i3 < queryResultArr.length; i3++) {
                            if (queryResultArr[i3] != null && queryResultArr[i3].vertexId == j) {
                                arrayList.add(queryResultArr[i3]);
                                queryResultArr[i3] = null;
                            }
                        }
                        threadPoolExecutor.submit(new VertexProcessor(j, arrayList));
                    } else {
                        log.warn("Found deleted vertex with id: {}|{}|{}. Skipping", new Object[]{Long.valueOf(queryResult.vertexId), Boolean.valueOf(this.idManager.isPartitionedVertex(j)), parseRelation});
                        if (this.idManager.isPartitionedVertex(j)) {
                            getMessageAccumulator(j).markDeleted();
                        }
                        for (int i4 = 1; i4 < queryResultArr.length; i4++) {
                            if (queryResultArr[i4] != null && queryResultArr[i4].vertexId == queryResult.vertexId) {
                                queryResultArr[i4] = null;
                            }
                        }
                    }
                }
            } catch (Throwable th) {
                log.error("Exception occured during job execution: {}", th);
                setException(th);
                threadPoolExecutor.shutdownNow();
            }
        } catch (Throwable th2) {
            threadPoolExecutor.shutdownNow();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FulgoraExecutor<S>.MessageAccumulator getMessageAccumulator(long j) {
        Preconditions.checkArgument(this.idManager.isPartitionedVertex(j));
        long canonicalVertexId = this.idManager.getCanonicalVertexId(j);
        FulgoraExecutor<S>.MessageAccumulator messageAccumulator = this.partitionedVertexMsgs.get(Long.valueOf(canonicalVertexId));
        if (messageAccumulator == null) {
            this.partitionedVertexMsgs.putIfAbsent(Long.valueOf(canonicalVertexId), new MessageAccumulator());
            messageAccumulator = this.partitionedVertexMsgs.get(Long.valueOf(canonicalVertexId));
        }
        return messageAccumulator;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processVertex(FulgoraVertex<S> fulgoraVertex, Map<String, Object> map) {
        long longId = fulgoraVertex.getLongId();
        try {
            fulgoraVertex.setProcessedProperties(map);
            setVertexState(longId, this.job.process(fulgoraVertex));
        } catch (Throwable th) {
            log.error("Exception processing vertex [" + longId + "]: ", th);
            this.processingException = true;
            setVertexState(longId, null);
        }
    }
}
