/*
 * Decompiled with CFR 0.152.
 */
package org.jberet.vertx.cluster;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.jberet.job.model.Step;
import org.jberet.runtime.JobExecutionImpl;
import org.jberet.runtime.JobStopNotificationListener;
import org.jberet.runtime.PartitionExecutionImpl;
import org.jberet.runtime.context.StepContextImpl;
import org.jberet.spi.PartitionHandler;
import org.jberet.spi.PartitionInfo;
import org.jberet.util.BatchUtil;
import org.jberet.vertx.cluster._private.VertxClusterLogger;
import org.jberet.vertx.cluster._private.VertxClusterMessages;

public class VertxPartitionHandler
implements PartitionHandler,
JobStopNotificationListener {
    private BlockingQueue<Boolean> completedPartitionThreads;
    private BlockingQueue<Serializable> collectorDataQueue;
    private final Vertx vertx;
    private final EventBus eventBus;

    public VertxPartitionHandler(final StepContextImpl stepContext, Vertx vertx) {
        this.vertx = vertx;
        this.eventBus = vertx.eventBus();
        Handler<Message<Buffer>> receivingResultHandler = new Handler<Message<Buffer>>(){

            public void handle(Message<Buffer> message) {
                Serializable partitionCollectorData;
                Buffer body = (Buffer)message.body();
                try {
                    partitionCollectorData = BatchUtil.bytesToSerializableObject((byte[])body.getBytes(), (ClassLoader)stepContext.getJobContext().getClassLoader());
                }
                catch (Exception e) {
                    throw VertxClusterMessages.MESSAGES.failedToReceivePartitionCollectorData(e);
                }
                if (partitionCollectorData instanceof PartitionExecutionImpl) {
                    if (VertxPartitionHandler.this.completedPartitionThreads != null) {
                        VertxPartitionHandler.this.completedPartitionThreads.offer(Boolean.TRUE);
                    }
                    PartitionExecutionImpl partitionExecution = (PartitionExecutionImpl)partitionCollectorData;
                    int partitionId = partitionExecution.getPartitionId();
                    VertxClusterLogger.LOGGER.receivedPartitionResult(stepContext.getJobContext().getExecutionId(), stepContext.getStepExecutionId(), partitionId, partitionExecution.getBatchStatus());
                    List partitionExecutions = stepContext.getStepExecution().getPartitionExecutions();
                    for (int i = 0; i < partitionExecutions.size(); ++i) {
                        if (((PartitionExecutionImpl)partitionExecutions.get(i)).getPartitionId() != partitionId) continue;
                        partitionExecutions.remove(i);
                        partitionExecutions.add(partitionExecution);
                    }
                }
                try {
                    VertxPartitionHandler.this.collectorDataQueue.put(partitionCollectorData);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };
        long stepExecutionId = stepContext.getStepExecutionId();
        this.eventBus.consumer(PartitionInfo.getCollectorQueueName((long)stepExecutionId), (Handler)receivingResultHandler);
    }

    public void setResourceTracker(BlockingQueue<Boolean> completedPartitionThreads) {
        this.completedPartitionThreads = completedPartitionThreads;
    }

    public void setCollectorDataQueue(BlockingQueue<Serializable> collectorDataQueue) {
        this.collectorDataQueue = collectorDataQueue;
    }

    public void submitPartitionTask(StepContextImpl partitionStepContext, int currentIndex, int numOfPartitions) throws Exception {
        Step step1 = partitionStepContext.getStep();
        PartitionExecutionImpl partitionExecution = (PartitionExecutionImpl)partitionStepContext.getStepExecution();
        JobExecutionImpl jobExecution = partitionStepContext.getJobContext().getJobExecution();
        if (!jobExecution.isStopRequested()) {
            PartitionInfo partitionInfo = new PartitionInfo(partitionExecution, step1, jobExecution);
            byte[] bytes = BatchUtil.objectToBytes((Object)partitionInfo);
            this.eventBus.send("jberet.partition", (Object)Buffer.buffer((byte[])bytes));
        }
    }

    public void stopRequested(long jobExecutionId) {
        this.eventBus.publish(PartitionInfo.getStopRequestTopicName((long)jobExecutionId), (Object)Boolean.TRUE);
    }
}

