package io.druid.segment.realtime.plumber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/* loaded from: input_file:io/druid/segment/realtime/plumber/FlushingPlumber.class */
public class FlushingPlumber extends RealtimePlumber {
    private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class);
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final Duration flushDuration;
    private volatile ScheduledExecutorService flushScheduledExec;
    private volatile boolean stopped;

    public FlushingPlumber(Duration duration, DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, ServiceEmitter serviceEmitter, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentAnnouncer dataSegmentAnnouncer, ExecutorService executorService, IndexMerger indexMerger, IndexIO indexIO, Cache cache, CacheConfig cacheConfig, ObjectMapper objectMapper) {
        super(dataSchema, realtimeTuningConfig, fireDepartmentMetrics, serviceEmitter, queryRunnerFactoryConglomerate, dataSegmentAnnouncer, executorService, null, null, null, indexMerger, indexIO, cache, cacheConfig, objectMapper);
        this.flushScheduledExec = null;
        this.stopped = false;
        this.flushDuration = duration;
        this.schema = dataSchema;
        this.config = realtimeTuningConfig;
    }

    @Override // io.druid.segment.realtime.plumber.RealtimePlumber, io.druid.segment.realtime.plumber.Plumber
    public Object startJob() {
        log.info("Starting job for %s", new Object[]{getSchema().getDataSource()});
        computeBaseDir(getSchema()).mkdirs();
        initializeExecutors();
        if (this.flushScheduledExec == null) {
            this.flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d");
        }
        Object bootstrapSinksFromDisk = bootstrapSinksFromDisk();
        startFlushThread();
        return bootstrapSinksFromDisk;
    }

    protected void flushAfterDuration(final long j, final Sink sink) {
        log.info("Abandoning segment %s at %s", new Object[]{sink.getSegment().getIdentifier(), new DateTime().plusMillis((int) this.flushDuration.getMillis())});
        ScheduledExecutors.scheduleWithFixedDelay(this.flushScheduledExec, this.flushDuration, new Callable<ScheduledExecutors.Signal>() { // from class: io.druid.segment.realtime.plumber.FlushingPlumber.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public ScheduledExecutors.Signal call() throws Exception {
                FlushingPlumber.log.info("Abandoning segment %s", new Object[]{sink.getSegment().getIdentifier()});
                FlushingPlumber.this.abandonSegment(j, sink);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    private void startFlushThread() {
        final Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        DateTime truncate = segmentGranularity.truncate(new DateTime());
        final long millis = this.config.getWindowPeriod().toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", new Object[]{new DateTime().plus(new Duration(System.currentTimeMillis(), this.schema.getGranularitySpec().getSegmentGranularity().increment(truncate).getMillis() + millis))});
        ScheduledExecutors.scheduleAtFixedRate(this.flushScheduledExec, new Duration(System.currentTimeMillis(), this.schema.getGranularitySpec().getSegmentGranularity().increment(truncate).getMillis() + millis), new Duration(truncate, segmentGranularity.increment(truncate)), new ThreadRenamingCallable<ScheduledExecutors.Signal>(String.format("%s-flusher-%d", getSchema().getDataSource(), Integer.valueOf(getConfig().getShardSpec().getPartitionNum()))) { // from class: io.druid.segment.realtime.plumber.FlushingPlumber.2
            /* renamed from: doCall, reason: merged with bridge method [inline-methods] */
            public ScheduledExecutors.Signal m124doCall() {
                if (FlushingPlumber.this.stopped) {
                    FlushingPlumber.log.info("Stopping flusher thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                long millis2 = segmentGranularity.truncate(FlushingPlumber.this.getRejectionPolicy().getCurrMaxTime().minus(millis)).getMillis();
                ArrayList<Map.Entry> newArrayList = Lists.newArrayList();
                for (Map.Entry<Long, Sink> entry : FlushingPlumber.this.getSinks().entrySet()) {
                    if (entry.getKey().longValue() < millis2) {
                        FlushingPlumber.log.info("Adding entry[%s] to flush.", new Object[]{entry});
                        newArrayList.add(entry);
                    }
                }
                for (Map.Entry entry2 : newArrayList) {
                    FlushingPlumber.this.flushAfterDuration(((Long) entry2.getKey()).longValue(), (Sink) entry2.getValue());
                }
                if (!FlushingPlumber.this.stopped) {
                    return ScheduledExecutors.Signal.REPEAT;
                }
                FlushingPlumber.log.info("Stopping flusher thread", new Object[0]);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    @Override // io.druid.segment.realtime.plumber.RealtimePlumber, io.druid.segment.realtime.plumber.Plumber
    public void finishJob() {
        log.info("Stopping job", new Object[0]);
        for (Map.Entry<Long, Sink> entry : getSinks().entrySet()) {
            abandonSegment(entry.getKey().longValue(), entry.getValue());
        }
        shutdownExecutors();
        if (this.flushScheduledExec != null) {
            this.flushScheduledExec.shutdown();
        }
        this.stopped = true;
    }
}
