package com.google.appengine.tools.mapreduce.impl.shardedjob;

import com.google.appengine.api.backends.BackendServiceFactory;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityNotFoundException;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.Transaction;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTask;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobStateImpl;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.class */
class ShardedJobRunner<T extends IncrementalTask<T, R>, R extends Serializable> {
    static final String JOB_ID_PARAM = "job";
    static final String TASK_ID_PARAM = "task";
    static final String SEQUENCE_NUMBER_PARAM = "seq";
    private static final Logger log = Logger.getLogger(ShardedJobRunner.class.getName());
    private static final DatastoreService DATASTORE = DatastoreServiceFactory.getDatastoreService();

    private ShardedJobStateImpl<T, R> lookupJobState(Transaction transaction, String str) {
        try {
            return ShardedJobStateImpl.ShardedJobSerializer.fromEntity(DATASTORE.get(transaction, ShardedJobStateImpl.ShardedJobSerializer.makeKey(str)));
        } catch (EntityNotFoundException e) {
            return null;
        }
    }

    private IncrementalTaskState<T, R> lookupTaskState(Transaction transaction, String str) {
        try {
            return IncrementalTaskState.Serializer.fromEntity(DATASTORE.get(transaction, IncrementalTaskState.Serializer.makeKey(str)));
        } catch (EntityNotFoundException e) {
            return null;
        }
    }

    private List<IncrementalTaskState<T, R>> lookupTasks(ShardedJobState shardedJobState) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i = 0; i < shardedJobState.getTotalTaskCount(); i++) {
            builder.add(IncrementalTaskState.Serializer.makeKey(getTaskId(shardedJobState.getJobId(), i)));
        }
        ImmutableList<Key> build = builder.build();
        Map map = DATASTORE.get(build);
        ImmutableList.Builder builder2 = ImmutableList.builder();
        for (Key key : build) {
            Entity entity = (Entity) map.get(key);
            Preconditions.checkState(entity != null, "%s: Missing task: %s", new Object[]{this, key});
            builder2.add(IncrementalTaskState.Serializer.fromEntity(entity));
        }
        return builder2.build();
    }

    private int countActiveTasks(List<IncrementalTaskState<T, R>> list) {
        int i = 0;
        Iterator<IncrementalTaskState<T, R>> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getNextTask() != null) {
                i++;
            }
        }
        return i;
    }

    private R aggregateState(ShardedJobController<T, R> shardedJobController, List<IncrementalTaskState<T, R>> list) {
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<IncrementalTaskState<T, R>> it = list.iterator();
        while (it.hasNext()) {
            builder.add(it.next().getPartialResult());
        }
        return shardedJobController.combineResults(builder.build());
    }

    private void scheduleControllerTask(Transaction transaction, ShardedJobStateImpl<T, R> shardedJobStateImpl) {
        ShardedJobSettings settings = shardedJobStateImpl.getSettings();
        TaskOptions countdownMillis = TaskOptions.Builder.withMethod(TaskOptions.Method.POST).url(settings.getControllerPath()).param(JOB_ID_PARAM, shardedJobStateImpl.getJobId()).param(SEQUENCE_NUMBER_PARAM, "" + shardedJobStateImpl.getNextSequenceNumber()).countdownMillis(settings.getMillisBetweenPolls());
        if (settings.getControllerBackend() != null) {
            countdownMillis.header("Host", BackendServiceFactory.getBackendService().getBackendAddress(settings.getControllerBackend()));
        }
        QueueFactory.getQueue(settings.getControllerQueueName()).add(transaction, countdownMillis);
    }

    private void scheduleWorkerTask(Transaction transaction, ShardedJobSettings shardedJobSettings, IncrementalTaskState<T, R> incrementalTaskState) {
        TaskOptions param = TaskOptions.Builder.withMethod(TaskOptions.Method.POST).url(shardedJobSettings.getWorkerPath()).param(TASK_ID_PARAM, incrementalTaskState.getTaskId()).param(JOB_ID_PARAM, incrementalTaskState.getJobId()).param(SEQUENCE_NUMBER_PARAM, "" + incrementalTaskState.getNextSequenceNumber());
        if (shardedJobSettings.getWorkerBackend() != null) {
            param.header("Host", BackendServiceFactory.getBackendService().getBackendAddress(shardedJobSettings.getWorkerBackend()));
        }
        QueueFactory.getQueue(shardedJobSettings.getWorkerQueueName()).add(transaction, param);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pollTaskStates(String str, int i) {
        ShardedJobStateImpl<T, R> lookupJobState = lookupJobState(null, str);
        if (lookupJobState == null) {
            log.info(str + ": Job gone");
            return;
        }
        log.info("Polling task states for job " + str + ", sequence number " + i);
        Preconditions.checkState(lookupJobState.getStatus() != Status.INITIALIZING, "Should be done initializing: %s", new Object[]{lookupJobState});
        if (!lookupJobState.getStatus().isActive()) {
            log.info(str + ": Job no longer active: " + lookupJobState);
            return;
        }
        if (lookupJobState.getNextSequenceNumber() != i) {
            Preconditions.checkState(lookupJobState.getNextSequenceNumber() > ((long) i), "%s: Job state is from the past: %s", new Object[]{str, lookupJobState});
            log.info(str + ": Poll sequence number " + i + " already completed: " + lookupJobState);
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<IncrementalTaskState<T, R>> lookupTasks = lookupTasks(lookupJobState);
        int countActiveTasks = countActiveTasks(lookupTasks);
        lookupJobState.setMostRecentUpdateTimeMillis(currentTimeMillis);
        lookupJobState.setActiveTaskCount(countActiveTasks);
        if (countActiveTasks == 0) {
            lookupJobState.setStatus(Status.DONE);
            lookupJobState.getController().completed(aggregateState(lookupJobState.getController(), lookupTasks));
        } else {
            lookupJobState.setNextSequenceNumber(i + 1);
        }
        log.fine(str + ": Writing " + lookupJobState);
        Transaction beginTransaction = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> lookupJobState2 = lookupJobState(beginTransaction, str);
            if (lookupJobState2 == null) {
                log.info(str + ": Job gone after poll");
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            if (lookupJobState2.getNextSequenceNumber() != i) {
                log.info(str + ": Job processed concurrently; was sequence number " + i + ", now " + lookupJobState2.getNextSequenceNumber());
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            DATASTORE.put(beginTransaction, ShardedJobStateImpl.ShardedJobSerializer.toEntity(lookupJobState));
            if (lookupJobState.getStatus().isActive()) {
                scheduleControllerTask(beginTransaction, lookupJobState);
            }
            beginTransaction.commit();
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
        } catch (Throwable th) {
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runTask(String str, String str2, int i) {
        ShardedJobStateImpl<T, R> lookupJobState = lookupJobState(null, str2);
        if (lookupJobState == null) {
            log.info(str + ": Job gone");
            return;
        }
        if (!lookupJobState.getStatus().isActive()) {
            log.info(str + ": Job no longer active: " + lookupJobState);
            return;
        }
        IncrementalTaskState<T, R> lookupTaskState = lookupTaskState(null, str);
        if (lookupTaskState == null) {
            log.info(str + ": Task gone");
            return;
        }
        log.info("Running task " + str + " (job " + str2 + "), sequence number " + i);
        if (lookupTaskState.getNextSequenceNumber() != i) {
            Preconditions.checkState(lookupTaskState.getNextSequenceNumber() > i, "%s: Task state is from the past: %s", new Object[]{str, lookupTaskState});
            log.info(str + ": Task sequence number " + i + " already completed: " + lookupTaskState);
            return;
        }
        Preconditions.checkState(lookupTaskState.getNextTask() != null, "%s: Next task is null", new Object[]{lookupTaskState});
        log.fine("About to run task: " + lookupTaskState);
        IncrementalTask.RunResult<T, R> run = lookupTaskState.getNextTask().run();
        lookupTaskState.setPartialResult(lookupJobState.getController().combineResults(ImmutableList.of(lookupTaskState.getPartialResult(), run.getPartialResult())));
        lookupTaskState.setNextTask(run.getFollowupTask());
        lookupTaskState.setMostRecentUpdateMillis(System.currentTimeMillis());
        lookupTaskState.setNextSequenceNumber(i + 1);
        Transaction beginTransaction = DATASTORE.beginTransaction();
        try {
            IncrementalTaskState<T, R> lookupTaskState2 = lookupTaskState(beginTransaction, str);
            if (lookupTaskState2 == null) {
                log.info(str + ": Task disappeared while processing");
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            if (lookupTaskState2.getNextSequenceNumber() != i) {
                log.info(str + ": Task processed concurrently; was sequence number " + i + ", now " + lookupTaskState2.getNextSequenceNumber());
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            DATASTORE.put(beginTransaction, IncrementalTaskState.Serializer.toEntity(lookupTaskState));
            if (run.getFollowupTask() != null) {
                scheduleWorkerTask(beginTransaction, lookupJobState.getSettings(), lookupTaskState);
            }
            beginTransaction.commit();
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
        } catch (Throwable th) {
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
            throw th;
        }
    }

    private String getTaskId(String str, int i) {
        return str + "-task" + i;
    }

    private void createTasks(ShardedJobController<T, R> shardedJobController, ShardedJobSettings shardedJobSettings, String str, List<? extends T> list, long j) {
        log.info(str + ": Creating " + list.size() + " tasks");
        for (int i = 0; i < list.size(); i++) {
            String taskId = getTaskId(str, i);
            Transaction beginTransaction = DATASTORE.beginTransaction();
            try {
                IncrementalTaskState<T, R> lookupTaskState = lookupTaskState(beginTransaction, taskId);
                if (lookupTaskState != null) {
                    log.info(str + ": Task already exists: " + lookupTaskState);
                    if (beginTransaction.isActive()) {
                        beginTransaction.rollback();
                    }
                } else {
                    IncrementalTaskState<T, R> incrementalTaskState = new IncrementalTaskState<>(taskId, str, j, list.get(i), shardedJobController.combineResults(ImmutableList.of()));
                    DATASTORE.put(beginTransaction, IncrementalTaskState.Serializer.toEntity(incrementalTaskState));
                    scheduleWorkerTask(beginTransaction, shardedJobSettings, incrementalTaskState);
                    beginTransaction.commit();
                    if (beginTransaction.isActive()) {
                        beginTransaction.rollback();
                    }
                }
            } catch (Throwable th) {
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                }
                throw th;
            }
        }
    }

    private boolean writeInitialJobState(ShardedJobStateImpl<T, R> shardedJobStateImpl) {
        String jobId = shardedJobStateImpl.getJobId();
        log.fine(jobId + ": Writing initial job state");
        Transaction beginTransaction = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> lookupJobState = lookupJobState(beginTransaction, jobId);
            if (lookupJobState == null) {
                DATASTORE.put(beginTransaction, ShardedJobStateImpl.ShardedJobSerializer.toEntity(shardedJobStateImpl));
                beginTransaction.commit();
            } else {
                if (!lookupJobState.getStatus().isActive()) {
                    log.info(jobId + ": Attempt to reinitialize inactive job: " + lookupJobState);
                    if (beginTransaction.isActive()) {
                        beginTransaction.rollback();
                    }
                    return false;
                }
                log.info(jobId + ": Reinitializing job: " + lookupJobState);
            }
            return true;
        } finally {
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
        }
    }

    private void scheduleControllerAndMarkActive(ShardedJobStateImpl<T, R> shardedJobStateImpl) {
        String jobId = shardedJobStateImpl.getJobId();
        log.fine(jobId + ": Scheduling controller and marking active");
        Transaction beginTransaction = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> lookupJobState = lookupJobState(beginTransaction, jobId);
            if (lookupJobState == null) {
                log.warning(jobId + ": Job disappeared while initializing");
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            if (lookupJobState.getStatus() != Status.INITIALIZING) {
                log.info(jobId + ": Job changed status while initializing: " + shardedJobStateImpl);
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            DATASTORE.put(beginTransaction, ShardedJobStateImpl.ShardedJobSerializer.toEntity(shardedJobStateImpl));
            scheduleControllerTask(beginTransaction, shardedJobStateImpl);
            beginTransaction.commit();
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
            log.info(jobId + ": Started");
        } catch (Throwable th) {
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startJob(String str, List<? extends T> list, ShardedJobController<T, R> shardedJobController, ShardedJobSettings shardedJobSettings) {
        long currentTimeMillis = System.currentTimeMillis();
        ImmutableList copyOf = ImmutableList.copyOf(list);
        ShardedJobStateImpl<T, R> shardedJobStateImpl = new ShardedJobStateImpl<>(str, shardedJobController, shardedJobSettings, copyOf.size(), currentTimeMillis, Status.INITIALIZING, null);
        if (copyOf.isEmpty()) {
            log.info(str + ": No tasks, immediately complete: " + shardedJobController);
            shardedJobStateImpl.setStatus(Status.DONE);
            DATASTORE.put(ShardedJobStateImpl.ShardedJobSerializer.toEntity(shardedJobStateImpl));
            shardedJobController.completed(shardedJobController.combineResults(ImmutableList.of()));
            return;
        }
        if (writeInitialJobState(shardedJobStateImpl)) {
            createTasks(shardedJobController, shardedJobSettings, str, copyOf, currentTimeMillis);
            shardedJobStateImpl.setStatus(Status.RUNNING);
            scheduleControllerAndMarkActive(shardedJobStateImpl);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardedJobState<T, R> getJobState(String str) {
        ShardedJobStateImpl<T, R> lookupJobState = lookupJobState(null, str);
        if (lookupJobState == null) {
            return null;
        }
        Preconditions.checkState(lookupJobState.getAggregateResult() == null, "%s: Non-null aggregate result: %s", new Object[]{lookupJobState, lookupJobState.getAggregateResult()});
        lookupJobState.setAggregateResult(aggregateState(lookupJobState.getController(), lookupTasks(lookupJobState)));
        return lookupJobState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void abortJob(String str) {
        log.info(str + ": Aborting");
        Transaction beginTransaction = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> lookupJobState = lookupJobState(beginTransaction, str);
            if (lookupJobState == null || !lookupJobState.getStatus().isActive()) {
                log.info(str + ": Job not active, not aborting: " + lookupJobState);
                if (beginTransaction.isActive()) {
                    beginTransaction.rollback();
                    return;
                }
                return;
            }
            lookupJobState.setStatus(Status.ABORTED);
            DATASTORE.put(beginTransaction, ShardedJobStateImpl.ShardedJobSerializer.toEntity(lookupJobState));
            beginTransaction.commit();
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
        } catch (Throwable th) {
            if (beginTransaction.isActive()) {
                beginTransaction.rollback();
            }
            throw th;
        }
    }
}
