package software.amazon.kinesis.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.ArnUtil;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.SchedulerCoordinatorFactory;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;

/* loaded from: input_file:software/amazon/kinesis/coordinator/Scheduler.class */
public class Scheduler implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Scheduler.class);
    private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
    private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3000;
    private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000;
    private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30000;
    private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 60000;
    private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
    private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
    private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
    private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
    private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
    private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
    private final SchedulerLog slog;
    private final CheckpointConfig checkpointConfig;
    private final CoordinatorConfig coordinatorConfig;
    private final LeaseManagementConfig leaseManagementConfig;
    private final LifecycleConfig lifecycleConfig;
    private final MetricsConfig metricsConfig;
    private final ProcessorConfig processorConfig;
    private final RetrievalConfig retrievalConfig;
    private final String applicationName;
    private final int maxInitializationAttempts;
    private final Checkpointer checkpoint;
    private final long shardConsumerDispatchPollIntervalMillis;
    private final long parentShardPollIntervalMillis;
    private final ExecutorService executorService;
    private final DiagnosticEventFactory diagnosticEventFactory;
    private final DiagnosticEventHandler diagnosticEventHandler;
    private final LeaseCoordinator leaseCoordinator;
    private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
    private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap;
    private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
    private final ShardPrioritization shardPrioritization;
    private final boolean cleanupLeasesUponShardCompletion;
    private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
    private final GracefulShutdownCoordinator gracefulShutdownCoordinator;
    private final WorkerStateChangeListener workerStateChangeListener;
    private final MetricsFactory metricsFactory;
    private final long failoverTimeMillis;
    private final long taskBackoffTimeMillis;
    private final boolean isMultiStreamMode;
    private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
    private final StreamTracker streamTracker;
    private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
    private final long listShardsBackoffTimeMillis;
    private final int maxListShardsRetryAttempts;
    private final LeaseRefresher leaseRefresher;
    private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
    private final boolean ignoreUnexpetedChildShards;
    private final AggregatorUtil aggregatorUtil;
    private final Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider;
    private final long schedulerInitializationBackoffTimeMillis;
    private final LeaderDecider leaderDecider;
    private final Map<StreamIdentifier, Instant> staleStreamDeletionMap;
    private final LeaseCleanupManager leaseCleanupManager;
    private final SchemaRegistryDecoder schemaRegistryDecoder;
    private final DeletedStreamListProvider deletedStreamListProvider;
    private final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap;
    private volatile boolean shutdown;
    private volatile long shutdownStartTimeMillis;
    private volatile boolean shutdownComplete;
    private final Object lock;
    private final Stopwatch streamSyncWatch;
    private boolean leasesSyncedOnAppInit;
    private final AtomicBoolean leaderSynced;
    private CompletableFuture<Boolean> gracefulShutdownFuture;
    private final CountDownLatch finalShutdownLatch;

    @VisibleForTesting
    protected boolean gracefuleShutdownStarted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/kinesis/coordinator/Scheduler$SchedulerLog.class */
    public static class SchedulerLog {
        private final long reportIntervalMillis;
        private long nextReportTime;
        private boolean infoReporting;

        void info(Object obj) {
            if (isInfoEnabled()) {
                Scheduler.log.info("{}", obj);
            }
        }

        void infoForce(Object obj) {
            Scheduler.log.info("{}", obj);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isInfoEnabled() {
            return this.infoReporting;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetInfoLogging() {
            if (!this.infoReporting) {
                if (this.nextReportTime <= System.currentTimeMillis()) {
                    this.infoReporting = true;
                }
            } else if (Scheduler.log.isInfoEnabled()) {
                this.infoReporting = false;
                this.nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
            }
        }

        private SchedulerLog() {
            this.reportIntervalMillis = TimeUnit.MINUTES.toMillis(1L);
            this.nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
        }
    }

    /* loaded from: input_file:software/amazon/kinesis/coordinator/Scheduler$StreamConfigMap.class */
    private class StreamConfigMap extends ConcurrentHashMap<StreamIdentifier, StreamConfig> {
        @Override // java.util.concurrent.ConcurrentHashMap, java.util.AbstractMap, java.util.Map
        public StreamConfig put(@NonNull StreamIdentifier streamIdentifier, @NonNull StreamConfig streamConfig) {
            if (streamIdentifier == null) {
                throw new NullPointerException("streamIdentifier is marked non-null but is null");
            }
            if (streamConfig == null) {
                throw new NullPointerException("streamConfig is marked non-null but is null");
            }
            Region kinesisRegion = Scheduler.this.getKinesisRegion();
            return (StreamConfig) super.put((StreamConfigMap) streamIdentifier, (StreamIdentifier) streamConfig.streamIdentifier().streamArnOptional().map(arn -> {
                Validate.isTrue(kinesisRegion.id().equals(arn.region().get()), "The provided streamARN " + arn + " does not match the Kinesis client's configured region - " + kinesisRegion, new Object[0]);
                return streamConfig;
            }).orElse(Scheduler.this.isMultiStreamMode ? Scheduler.withStreamArn(streamConfig, kinesisRegion) : streamConfig));
        }

        public StreamConfigMap() {
        }
    }

    public Scheduler(@NonNull CheckpointConfig checkpointConfig, @NonNull CoordinatorConfig coordinatorConfig, @NonNull LeaseManagementConfig leaseManagementConfig, @NonNull LifecycleConfig lifecycleConfig, @NonNull MetricsConfig metricsConfig, @NonNull ProcessorConfig processorConfig, @NonNull RetrievalConfig retrievalConfig) {
        this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, new DiagnosticEventFactory());
        if (checkpointConfig == null) {
            throw new NullPointerException("checkpointConfig is marked non-null but is null");
        }
        if (coordinatorConfig == null) {
            throw new NullPointerException("coordinatorConfig is marked non-null but is null");
        }
        if (leaseManagementConfig == null) {
            throw new NullPointerException("leaseManagementConfig is marked non-null but is null");
        }
        if (lifecycleConfig == null) {
            throw new NullPointerException("lifecycleConfig is marked non-null but is null");
        }
        if (metricsConfig == null) {
            throw new NullPointerException("metricsConfig is marked non-null but is null");
        }
        if (processorConfig == null) {
            throw new NullPointerException("processorConfig is marked non-null but is null");
        }
        if (retrievalConfig == null) {
            throw new NullPointerException("retrievalConfig is marked non-null but is null");
        }
    }

    @VisibleForTesting
    protected Scheduler(@NonNull CheckpointConfig checkpointConfig, @NonNull CoordinatorConfig coordinatorConfig, @NonNull LeaseManagementConfig leaseManagementConfig, @NonNull LifecycleConfig lifecycleConfig, @NonNull MetricsConfig metricsConfig, @NonNull ProcessorConfig processorConfig, @NonNull RetrievalConfig retrievalConfig, @NonNull DiagnosticEventFactory diagnosticEventFactory) {
        this.slog = new SchedulerLog();
        this.streamToShardSyncTaskManagerMap = new ConcurrentHashMap();
        this.currentStreamConfigMap = new StreamConfigMap();
        this.staleStreamDeletionMap = new HashMap();
        this.shardInfoShardConsumerMap = new ConcurrentHashMap();
        this.shutdownComplete = false;
        this.lock = new Object();
        this.streamSyncWatch = Stopwatch.createUnstarted();
        this.leasesSyncedOnAppInit = false;
        this.leaderSynced = new AtomicBoolean(false);
        this.finalShutdownLatch = new CountDownLatch(1);
        this.gracefuleShutdownStarted = false;
        if (checkpointConfig == null) {
            throw new NullPointerException("checkpointConfig is marked non-null but is null");
        }
        if (coordinatorConfig == null) {
            throw new NullPointerException("coordinatorConfig is marked non-null but is null");
        }
        if (leaseManagementConfig == null) {
            throw new NullPointerException("leaseManagementConfig is marked non-null but is null");
        }
        if (lifecycleConfig == null) {
            throw new NullPointerException("lifecycleConfig is marked non-null but is null");
        }
        if (metricsConfig == null) {
            throw new NullPointerException("metricsConfig is marked non-null but is null");
        }
        if (processorConfig == null) {
            throw new NullPointerException("processorConfig is marked non-null but is null");
        }
        if (retrievalConfig == null) {
            throw new NullPointerException("retrievalConfig is marked non-null but is null");
        }
        if (diagnosticEventFactory == null) {
            throw new NullPointerException("diagnosticEventFactory is marked non-null but is null");
        }
        this.checkpointConfig = checkpointConfig;
        this.coordinatorConfig = coordinatorConfig;
        this.leaseManagementConfig = leaseManagementConfig;
        this.lifecycleConfig = lifecycleConfig;
        this.metricsConfig = metricsConfig;
        this.processorConfig = processorConfig;
        this.retrievalConfig = retrievalConfig;
        this.applicationName = this.coordinatorConfig.applicationName();
        this.streamTracker = retrievalConfig.streamTracker();
        this.isMultiStreamMode = this.streamTracker.isMultiStream();
        this.formerStreamsLeasesDeletionStrategy = this.streamTracker.formerStreamsLeasesDeletionStrategy();
        this.streamTracker.streamConfigList().forEach(streamConfig -> {
            this.currentStreamConfigMap.put(streamConfig.streamIdentifier(), streamConfig);
        });
        log.info("Initial state: {}", this.currentStreamConfigMap.values());
        this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
        this.metricsFactory = this.metricsConfig.metricsFactory();
        LeaseSerializer dynamoDBMultiStreamLeaseSerializer = this.isMultiStreamMode ? new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBLeaseSerializer();
        this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(dynamoDBMultiStreamLeaseSerializer, this.isMultiStreamMode).createLeaseCoordinator(this.metricsFactory);
        this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
        this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator, this.leaseRefresher);
        this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
        this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
        this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
        this.diagnosticEventFactory = diagnosticEventFactory;
        this.diagnosticEventHandler = new DiagnosticEventLogger();
        this.deletedStreamListProvider = new DeletedStreamListProvider();
        this.shardSyncTaskManagerProvider = streamConfig2 -> {
            return this.leaseManagementConfig.leaseManagementFactory(dynamoDBMultiStreamLeaseSerializer, this.isMultiStreamMode).createShardSyncTaskManager(this.metricsFactory, streamConfig2, this.deletedStreamListProvider);
        };
        this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
        this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
        this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist();
        if (coordinatorConfig.gracefulShutdownCoordinator() != null) {
            this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator();
        } else {
            this.gracefulShutdownCoordinator = this.coordinatorConfig.coordinatorFactory().createGracefulShutdownCoordinator();
        }
        if (coordinatorConfig.workerStateChangeListener() != null) {
            this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener();
        } else {
            this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory().createWorkerStateChangeListener();
        }
        this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(this.leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1);
        this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
        this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
        this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
        this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
        this.shardDetectorProvider = streamConfig3 -> {
            return createOrGetShardSyncTaskManager(streamConfig3).shardDetector();
        };
        this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
        this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
        this.hierarchicalShardSyncerProvider = streamConfig4 -> {
            return createOrGetShardSyncTaskManager(streamConfig4).hierarchicalShardSyncer();
        };
        this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
        this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), this.leaderDecider, this.leaseRefresher, this.currentStreamConfigMap, this.shardSyncTaskManagerProvider, this.streamToShardSyncTaskManagerMap, this.isMultiStreamMode, this.metricsFactory, leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(), leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold(), this.leaderSynced);
        this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(dynamoDBMultiStreamLeaseSerializer, this.isMultiStreamMode).createLeaseCleanupManager(this.metricsFactory);
        this.schemaRegistryDecoder = this.retrievalConfig.glueSchemaRegistryDeserializer() == null ? null : new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.shutdown) {
            return;
        }
        try {
            initialize();
            log.info("Initialization complete. Starting worker loop.");
        } catch (RuntimeException e) {
            log.error("Unable to initialize after {} attempts. Shutting down.", Integer.valueOf(this.maxInitializationAttempts), e);
            this.workerStateChangeListener.onAllInitializationAttemptsFailed(e);
            shutdown();
        }
        while (!shouldShutdown()) {
            runProcessLoop();
        }
        finalShutdown();
        log.info("Worker loop is complete. Exiting from worker.");
    }

    @VisibleForTesting
    void initialize() {
        synchronized (this.lock) {
            registerErrorHandlerForUndeliverableAsyncTaskExceptions();
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
            boolean z = false;
            Exception exc = null;
            for (int i = 0; !z && i < this.maxInitializationAttempts; i++) {
                try {
                    log.info("Initializing LeaseCoordinator attempt {}", Integer.valueOf(i + 1));
                    this.leaseCoordinator.initialize();
                    if (this.skipShardSyncAtWorkerInitializationIfLeasesExist && !this.leaseRefresher.isLeaseTableEmpty()) {
                        log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
                    } else if (shouldInitiateLeaseSync()) {
                        log.info("Worker {} is initiating the lease sync.", this.leaseManagementConfig.workerIdentifier());
                        this.leaderElectedPeriodicShardSyncManager.syncShardsOnce();
                    }
                    this.leaseCleanupManager.start();
                    if (this.leaseCoordinator.isRunning()) {
                        log.info("LeaseCoordinator is already running. No need to start it.");
                    } else {
                        log.info("Starting LeaseCoordinator");
                        this.leaseCoordinator.start();
                    }
                    log.info("Scheduling periodicShardSync");
                    this.leaderElectedPeriodicShardSyncManager.start();
                    this.streamSyncWatch.start();
                    z = true;
                } catch (Exception e) {
                    log.error("Caught exception when initializing LeaseCoordinator", e);
                    exc = e;
                }
                if (!z) {
                    try {
                        Thread.sleep(this.schedulerInitializationBackoffTimeMillis);
                        this.leaderElectedPeriodicShardSyncManager.stop();
                    } catch (InterruptedException e2) {
                        log.debug("Sleep interrupted while initializing worker.");
                    }
                }
            }
            if (!z) {
                throw new RuntimeException(exc);
            }
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
        }
    }

    @VisibleForTesting
    boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
        long currentTimeMillis = System.currentTimeMillis() + ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
        boolean z = true;
        while (System.currentTimeMillis() < currentTimeMillis) {
            boolean isLeaseTableEmpty = this.leaseRefresher.isLeaseTableEmpty();
            z = isLeaseTableEmpty;
            if (!isLeaseTableEmpty) {
                break;
            }
            log.info("Lease table is still empty. Checking again in {} ms", Long.valueOf(LEASE_TABLE_CHECK_FREQUENCY_MILLIS));
            Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
        }
        return z;
    }

    @VisibleForTesting
    void runProcessLoop() {
        try {
            HashSet hashSet = new HashSet();
            for (ShardInfo shardInfo : getShardInfoForAssignments()) {
                createOrGetShardConsumer(shardInfo, this.processorConfig.shardRecordProcessorFactory(), this.leaseCleanupManager).executeLifecycle();
                hashSet.add(shardInfo);
            }
            cleanupShardConsumers(hashSet);
            if (isLeader()) {
                checkAndSyncStreamShardsAndLeases();
                this.leaderSynced.set(true);
            } else {
                this.leaderSynced.set(false);
            }
            logExecutorState();
            this.slog.info("Sleeping ...");
            Thread.sleep(this.shardConsumerDispatchPollIntervalMillis);
        } catch (Exception e) {
            log.error("Worker.run caught exception, sleeping for {} milli seconds!", Long.valueOf(this.shardConsumerDispatchPollIntervalMillis), e);
            try {
                Thread.sleep(this.shardConsumerDispatchPollIntervalMillis);
            } catch (InterruptedException e2) {
                log.info("Worker: sleep interrupted after catching exception ", e2);
            }
        }
        this.slog.resetInfoLogging();
    }

    private boolean isLeader() {
        return this.leaderDecider.isLeader(this.leaseManagementConfig.workerIdentifier()).booleanValue();
    }

    @VisibleForTesting
    Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        HashSet hashSet = new HashSet();
        if (shouldSyncStreamsNow()) {
            MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(this.metricsFactory, MULTI_STREAM_TRACKER);
            try {
                Map map = (Map) this.streamTracker.streamConfigList().stream().collect(Collectors.toMap((v0) -> {
                    return v0.streamIdentifier();
                }, Function.identity()));
                if (!this.leaderSynced.get() || !this.leasesSyncedOnAppInit) {
                    if (!map.keySet().equals(this.currentStreamConfigMap.keySet())) {
                        log.info("Syncing leases for leader to catch up");
                        List<MultiStreamLease> fetchMultiStreamLeases = fetchMultiStreamLeases();
                        syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases);
                        Set set = (Set) fetchMultiStreamLeases.stream().map(multiStreamLease -> {
                            return StreamIdentifier.multiStreamInstance(multiStreamLease.streamIdentifier());
                        }).collect(Collectors.toSet());
                        this.currentStreamConfigMap.keySet().stream().filter(streamIdentifier -> {
                            return (map.containsKey(streamIdentifier) || set.contains(streamIdentifier)) ? false : true;
                        }).forEach(streamIdentifier2 -> {
                            log.info("Removing stream {} from currentStreamConfigMap due to not being active", streamIdentifier2);
                            this.currentStreamConfigMap.remove(streamIdentifier2);
                            this.staleStreamDeletionMap.remove(streamIdentifier2);
                            hashSet.add(streamIdentifier2);
                        });
                    }
                    this.leasesSyncedOnAppInit = true;
                }
                for (StreamIdentifier streamIdentifier3 : map.keySet()) {
                    if (this.currentStreamConfigMap.containsKey(streamIdentifier3)) {
                        log.debug("{} is already being processed - skipping shard sync.", streamIdentifier3);
                    } else {
                        StreamConfig streamConfig = (StreamConfig) map.get(streamIdentifier3);
                        log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
                        createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask();
                        this.currentStreamConfigMap.put(streamIdentifier3, streamConfig);
                        hashSet.add(streamIdentifier3);
                    }
                }
                Consumer consumer = streamIdentifier4 -> {
                    if (map.containsKey(streamIdentifier4)) {
                        return;
                    }
                    this.staleStreamDeletionMap.putIfAbsent(streamIdentifier4, Instant.now());
                };
                if (this.formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
                    this.currentStreamConfigMap.keySet().forEach(consumer);
                } else if (this.formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
                    Optional.ofNullable(this.formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(list -> {
                        list.forEach(consumer);
                    });
                } else {
                    Iterator<StreamIdentifier> it = this.currentStreamConfigMap.keySet().iterator();
                    while (it.hasNext()) {
                        StreamIdentifier next = it.next();
                        if (!map.containsKey(next)) {
                            log.info("Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases, as part of this workflow", next);
                            it.remove();
                            hashSet.add(next);
                        }
                    }
                }
                Duration waitPeriodToDeleteFormerStreams = this.formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
                Stream<StreamIdentifier> stream = this.staleStreamDeletionMap.keySet().stream();
                map.getClass();
                Map map2 = (Map) stream.collect(Collectors.partitioningBy((v1) -> {
                    return r1.containsKey(v1);
                }, Collectors.toSet()));
                Set<StreamIdentifier> set2 = (Set) ((Set) map2.get(false)).stream().filter(streamIdentifier5 -> {
                    return Duration.between(this.staleStreamDeletionMap.get(streamIdentifier5), Instant.now()).toMillis() >= waitPeriodToDeleteFormerStreams.toMillis();
                }).collect(Collectors.toSet());
                Set set3 = (Set) this.deletedStreamListProvider.purgeAllDeletedStream().stream().filter(streamIdentifier6 -> {
                    return !map.containsKey(streamIdentifier6);
                }).collect(Collectors.toSet());
                if (set3.size() > 0) {
                    log.info("Stale streams to delete: {}", set3);
                    set2.addAll(set3);
                }
                hashSet.addAll(deleteMultiStreamLeases(set2));
                removeStreamsFromStaleStreamsList((Set) map2.get(true));
                if (!this.staleStreamDeletionMap.isEmpty()) {
                    log.warn("Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ", this.staleStreamDeletionMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, entry -> {
                        return ((Instant) entry.getValue()).plus((TemporalAmount) waitPeriodToDeleteFormerStreams);
                    })));
                }
                this.streamSyncWatch.reset().start();
                MetricsUtil.addCount(createMetricsWithOperation, ACTIVE_STREAMS_COUNT, map.size(), MetricsLevel.SUMMARY);
                MetricsUtil.addCount(createMetricsWithOperation, PENDING_STREAMS_DELETION_COUNT, this.staleStreamDeletionMap.size(), MetricsLevel.SUMMARY);
                MetricsUtil.addCount(createMetricsWithOperation, NON_EXISTING_STREAM_DELETE_COUNT, set3.size(), MetricsLevel.SUMMARY);
                MetricsUtil.addCount(createMetricsWithOperation, DELETED_STREAMS_COUNT, r0.size(), MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsWithOperation);
            } catch (Throwable th) {
                MetricsUtil.endScope(createMetricsWithOperation);
                throw th;
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    boolean shouldSyncStreamsNow() {
        return this.isMultiStreamMode && this.streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS;
    }

    @VisibleForTesting
    void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> list) {
        list.stream().map(multiStreamLease -> {
            return StreamIdentifier.multiStreamInstance(multiStreamLease.streamIdentifier());
        }).filter(streamIdentifier -> {
            return !this.currentStreamConfigMap.containsKey(streamIdentifier);
        }).forEach(streamIdentifier2 -> {
            StreamConfig createStreamConfig = this.streamTracker.createStreamConfig(streamIdentifier2);
            this.currentStreamConfigMap.put(streamIdentifier2, createStreamConfig);
            log.info("Cached {}", createStreamConfig);
        });
    }

    private List<MultiStreamLease> fetchMultiStreamLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        return this.leaseCoordinator.leaseRefresher().listLeases();
    }

    private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> set) {
        Iterator<StreamIdentifier> it = set.iterator();
        while (it.hasNext()) {
            this.staleStreamDeletionMap.remove(it.next());
        }
    }

    private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> set) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        if (set.isEmpty()) {
            return Collections.emptySet();
        }
        log.info("Deleting streams: {}", set);
        HashSet hashSet = new HashSet();
        Map map = (Map) fetchMultiStreamLeases().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.streamIdentifier();
        }, Collectors.toCollection(ArrayList::new)));
        for (StreamIdentifier streamIdentifier : set) {
            log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier);
            this.currentStreamConfigMap.remove(streamIdentifier);
            if (deleteMultiStreamLeases((List<MultiStreamLease>) map.get(streamIdentifier.serialize()))) {
                this.staleStreamDeletionMap.remove(streamIdentifier);
                hashSet.add(streamIdentifier);
            }
        }
        if (!hashSet.isEmpty()) {
            log.info("Streams retained post-deletion: {}", this.currentStreamConfigMap.values());
        }
        return hashSet;
    }

    private boolean deleteMultiStreamLeases(List<MultiStreamLease> list) {
        if (list == null) {
            return true;
        }
        for (MultiStreamLease multiStreamLease : list) {
            try {
                this.leaseRefresher.deleteLease(multiStreamLease);
            } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
                log.error("Unable to delete stale stream lease {}. Skipping further deletions for this stream. Will retry later.", multiStreamLease.leaseKey(), e);
                return false;
            }
        }
        return true;
    }

    @VisibleForTesting
    boolean shouldShutdown() {
        if (this.executorService.isShutdown()) {
            log.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
            return true;
        }
        if (!this.shutdown) {
            return false;
        }
        if (this.shardInfoShardConsumerMap.isEmpty()) {
            log.info("All record processors have been shutdown successfully.");
            return true;
        }
        if (System.currentTimeMillis() - this.shutdownStartTimeMillis < this.failoverTimeMillis) {
            return false;
        }
        log.info("Lease failover time is reached, so forcing shutdown.");
        return true;
    }

    public CompletableFuture<Boolean> startGracefulShutdown() {
        synchronized (this) {
            if (this.gracefulShutdownFuture == null) {
                this.gracefulShutdownFuture = this.gracefulShutdownCoordinator.startGracefulShutdown(createGracefulShutdownCallable());
            }
        }
        return this.gracefulShutdownFuture;
    }

    public Callable<Boolean> createGracefulShutdownCallable() {
        if (shutdownComplete()) {
            return () -> {
                return true;
            };
        }
        return this.gracefulShutdownCoordinator.createGracefulShutdownCallable(createWorkerShutdownCallable());
    }

    public boolean hasGracefulShutdownStarted() {
        return this.gracefuleShutdownStarted;
    }

    @VisibleForTesting
    Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
        return () -> {
            synchronized (this) {
                if (this.gracefuleShutdownStarted) {
                    throw new IllegalStateException("Requested shutdown has already been started");
                }
                this.gracefuleShutdownStarted = true;
            }
            this.leaseCoordinator.stopLeaseTaker();
            Collection<Lease> assignments = this.leaseCoordinator.getAssignments();
            if (assignments == null || assignments.isEmpty()) {
                shutdown();
                return GracefulShutdownContext.builder().finalShutdownLatch(this.finalShutdownLatch).build();
            }
            CountDownLatch countDownLatch = new CountDownLatch(assignments.size());
            CountDownLatch countDownLatch2 = new CountDownLatch(assignments.size());
            for (Lease lease : assignments) {
                ShardConsumerShutdownNotification shardConsumerShutdownNotification = new ShardConsumerShutdownNotification(this.leaseCoordinator, lease, countDownLatch2, countDownLatch);
                ShardConsumer shardConsumer = this.shardInfoShardConsumerMap.get(DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease));
                if (shardConsumer != null) {
                    shardConsumer.gracefulShutdown(shardConsumerShutdownNotification);
                } else {
                    countDownLatch2.countDown();
                    countDownLatch.countDown();
                }
            }
            return GracefulShutdownContext.builder().shutdownCompleteLatch(countDownLatch).notificationCompleteLatch(countDownLatch2).finalShutdownLatch(this.finalShutdownLatch).scheduler(this).build();
        };
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (this.shutdown) {
                log.warn("Shutdown requested a second time.");
                return;
            }
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN_STARTED);
            log.info("Worker shutdown requested.");
            this.shutdown = true;
            this.shutdownStartTimeMillis = System.currentTimeMillis();
            this.leaseCoordinator.stop();
            this.leaseCleanupManager.shutdown();
            this.leaderElectedPeriodicShardSyncManager.stop();
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
        }
    }

    private void finalShutdown() {
        log.info("Starting worker's final shutdown.");
        if (this.executorService instanceof SchedulerCoordinatorFactory.SchedulerThreadPoolExecutor) {
            this.executorService.shutdownNow();
        }
        if (this.metricsFactory instanceof CloudWatchMetricsFactory) {
            ((CloudWatchMetricsFactory) this.metricsFactory).shutdown();
        }
        this.shutdownComplete = true;
        this.finalShutdownLatch.countDown();
    }

    private List<ShardInfo> getShardInfoForAssignments() {
        List<ShardInfo> prioritize = this.shardPrioritization.prioritize(this.leaseCoordinator.getCurrentAssignments());
        if (prioritize == null || prioritize.isEmpty()) {
            this.slog.info("No activities assigned");
        } else if (this.slog.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder();
            boolean z = true;
            for (ShardInfo shardInfo : prioritize) {
                if (!z) {
                    sb.append(", ");
                }
                sb.append(ShardInfo.getLeaseKey(shardInfo));
                z = false;
            }
            this.slog.info("Current stream shard assignments: " + sb.toString());
        }
        return prioritize;
    }

    ShardConsumer createOrGetShardConsumer(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo is marked non-null but is null");
        }
        if (shardRecordProcessorFactory == null) {
            throw new NullPointerException("shardRecordProcessorFactory is marked non-null but is null");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager is marked non-null but is null");
        }
        ShardConsumer shardConsumer = this.shardInfoShardConsumerMap.get(shardInfo);
        if (shardConsumer == null || (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.LEASE_LOST))) {
            shardConsumer = buildConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
            this.shardInfoShardConsumerMap.put(shardInfo, shardConsumer);
            this.slog.infoForce("Created new shardConsumer for : " + shardInfo);
        }
        return shardConsumer;
    }

    private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamConfig streamConfig) {
        return this.streamToShardSyncTaskManagerMap.computeIfAbsent(streamConfig, streamConfig2 -> {
            return this.shardSyncTaskManagerProvider.apply(streamConfig2);
        });
    }

    protected ShardConsumer buildConsumer(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo is marked non-null but is null");
        }
        if (shardRecordProcessorFactory == null) {
            throw new NullPointerException("shardRecordProcessorFactory is marked non-null but is null");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager is marked non-null but is null");
        }
        ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer = this.coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, this.checkpoint);
        StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
        StreamConfig streamConfig = this.currentStreamConfigMap.get(streamIdentifier);
        if (streamConfig == null) {
            streamConfig = withStreamArn(this.streamTracker.createStreamConfig(streamIdentifier), getKinesisRegion());
            log.info("Created orphan {}", streamConfig);
        }
        Validate.notNull(streamConfig, "StreamConfig should not be null", new Object[0]);
        RecordsPublisher createGetRecordsCache = this.retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, this.metricsFactory);
        return new ShardConsumer(createGetRecordsCache, this.executorService, shardInfo, this.lifecycleConfig.logWarningForTaskAfterMillis(), new ShardConsumerArgument(shardInfo, streamConfig.streamIdentifier(), this.leaseCoordinator, this.executorService, createGetRecordsCache, shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier), this.checkpoint, createRecordProcessorCheckpointer, this.parentShardPollIntervalMillis, this.taskBackoffTimeMillis, this.skipShardSyncAtWorkerInitializationIfLeasesExist, this.listShardsBackoffTimeMillis, this.maxListShardsRetryAttempts, this.processorConfig.callProcessRecordsEvenForEmptyRecordList(), this.shardConsumerDispatchPollIntervalMillis, streamConfig.initialPositionInStreamExtended(), this.cleanupLeasesUponShardCompletion, this.ignoreUnexpetedChildShards, this.shardDetectorProvider.apply(streamConfig), this.aggregatorUtil, this.hierarchicalShardSyncerProvider.apply(streamConfig), this.metricsFactory, leaseCleanupManager, this.schemaRegistryDecoder), this.lifecycleConfig.taskExecutionListener(), this.lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
    }

    void cleanupShardConsumers(Set<ShardInfo> set) {
        for (ShardInfo shardInfo : this.shardInfoShardConsumerMap.keySet()) {
            if (!set.contains(shardInfo)) {
                ShardConsumer shardConsumer = this.shardInfoShardConsumerMap.get(shardInfo);
                if (shardConsumer.leaseLost()) {
                    this.shardInfoShardConsumerMap.remove(shardInfo);
                    log.debug("Removed consumer for {} as lease has been lost", ShardInfo.getLeaseKey(shardInfo));
                } else {
                    shardConsumer.executeLifecycle();
                }
            }
        }
    }

    private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() {
        RxJavaPlugins.setErrorHandler(th -> {
            this.diagnosticEventFactory.rejectedTaskEvent(this.diagnosticEventFactory.executorStateEvent(this.executorService, this.leaseCoordinator), th).accept(this.diagnosticEventHandler);
        });
    }

    private void logExecutorState() {
        this.diagnosticEventFactory.executorStateEvent(this.executorService, this.leaseCoordinator).accept(this.diagnosticEventHandler);
    }

    private StreamIdentifier getStreamIdentifier(Optional<String> optional) {
        StreamIdentifier streamIdentifier;
        if (optional.isPresent()) {
            streamIdentifier = StreamIdentifier.multiStreamInstance(optional.get());
        } else {
            Validate.isTrue(!this.isMultiStreamMode, "Should not be in MultiStream Mode", new Object[0]);
            streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();
        }
        Validate.notNull(streamIdentifier, "Stream identifier should not be empty", new Object[0]);
        return streamIdentifier;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Region getKinesisRegion() {
        return this.retrievalConfig.kinesisClient().serviceClientConfiguration().region();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StreamConfig withStreamArn(@NonNull StreamConfig streamConfig, @NonNull Region region) {
        if (streamConfig == null) {
            throw new NullPointerException("streamConfig is marked non-null but is null");
        }
        if (region == null) {
            throw new NullPointerException("kinesisRegion is marked non-null but is null");
        }
        Validate.isTrue(streamConfig.streamIdentifier().accountIdOptional().isPresent(), "accountId should not be empty", new Object[0]);
        Validate.isTrue(streamConfig.streamIdentifier().streamCreationEpochOptional().isPresent(), "streamCreationEpoch should not be empty", new Object[0]);
        log.info("Constructing stream ARN for {} using the Kinesis client's configured region - {}.", streamConfig.streamIdentifier(), region);
        return new StreamConfig(StreamIdentifier.multiStreamInstance(ArnUtil.constructStreamArn(region, streamConfig.streamIdentifier().accountIdOptional().get(), streamConfig.streamIdentifier().streamName()), streamConfig.streamIdentifier().streamCreationEpochOptional().get().longValue()), streamConfig.initialPositionInStreamExtended(), streamConfig.consumerArn());
    }

    @Deprecated
    public Future<Void> requestShutdown() {
        return null;
    }

    public SchedulerLog slog() {
        return this.slog;
    }

    public CheckpointConfig checkpointConfig() {
        return this.checkpointConfig;
    }

    public CoordinatorConfig coordinatorConfig() {
        return this.coordinatorConfig;
    }

    public LeaseManagementConfig leaseManagementConfig() {
        return this.leaseManagementConfig;
    }

    public LifecycleConfig lifecycleConfig() {
        return this.lifecycleConfig;
    }

    public MetricsConfig metricsConfig() {
        return this.metricsConfig;
    }

    public ProcessorConfig processorConfig() {
        return this.processorConfig;
    }

    public RetrievalConfig retrievalConfig() {
        return this.retrievalConfig;
    }

    public String applicationName() {
        return this.applicationName;
    }

    public int maxInitializationAttempts() {
        return this.maxInitializationAttempts;
    }

    public Checkpointer checkpoint() {
        return this.checkpoint;
    }

    public long shardConsumerDispatchPollIntervalMillis() {
        return this.shardConsumerDispatchPollIntervalMillis;
    }

    public long parentShardPollIntervalMillis() {
        return this.parentShardPollIntervalMillis;
    }

    public ExecutorService executorService() {
        return this.executorService;
    }

    public DiagnosticEventFactory diagnosticEventFactory() {
        return this.diagnosticEventFactory;
    }

    public DiagnosticEventHandler diagnosticEventHandler() {
        return this.diagnosticEventHandler;
    }

    public LeaseCoordinator leaseCoordinator() {
        return this.leaseCoordinator;
    }

    public Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider() {
        return this.shardSyncTaskManagerProvider;
    }

    public Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap() {
        return this.streamToShardSyncTaskManagerMap;
    }

    public PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager() {
        return this.leaderElectedPeriodicShardSyncManager;
    }

    public ShardPrioritization shardPrioritization() {
        return this.shardPrioritization;
    }

    public boolean cleanupLeasesUponShardCompletion() {
        return this.cleanupLeasesUponShardCompletion;
    }

    public boolean skipShardSyncAtWorkerInitializationIfLeasesExist() {
        return this.skipShardSyncAtWorkerInitializationIfLeasesExist;
    }

    public GracefulShutdownCoordinator gracefulShutdownCoordinator() {
        return this.gracefulShutdownCoordinator;
    }

    public WorkerStateChangeListener workerStateChangeListener() {
        return this.workerStateChangeListener;
    }

    public MetricsFactory metricsFactory() {
        return this.metricsFactory;
    }

    public long failoverTimeMillis() {
        return this.failoverTimeMillis;
    }

    public long taskBackoffTimeMillis() {
        return this.taskBackoffTimeMillis;
    }

    public boolean isMultiStreamMode() {
        return this.isMultiStreamMode;
    }

    public Map<StreamIdentifier, StreamConfig> currentStreamConfigMap() {
        return this.currentStreamConfigMap;
    }

    public StreamTracker streamTracker() {
        return this.streamTracker;
    }

    public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
        return this.formerStreamsLeasesDeletionStrategy;
    }

    public long listShardsBackoffTimeMillis() {
        return this.listShardsBackoffTimeMillis;
    }

    public int maxListShardsRetryAttempts() {
        return this.maxListShardsRetryAttempts;
    }

    public LeaseRefresher leaseRefresher() {
        return this.leaseRefresher;
    }

    public Function<StreamConfig, ShardDetector> shardDetectorProvider() {
        return this.shardDetectorProvider;
    }

    public boolean ignoreUnexpetedChildShards() {
        return this.ignoreUnexpetedChildShards;
    }

    public AggregatorUtil aggregatorUtil() {
        return this.aggregatorUtil;
    }

    public Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider() {
        return this.hierarchicalShardSyncerProvider;
    }

    public long schedulerInitializationBackoffTimeMillis() {
        return this.schedulerInitializationBackoffTimeMillis;
    }

    public LeaderDecider leaderDecider() {
        return this.leaderDecider;
    }

    public Map<StreamIdentifier, Instant> staleStreamDeletionMap() {
        return this.staleStreamDeletionMap;
    }

    public LeaseCleanupManager leaseCleanupManager() {
        return this.leaseCleanupManager;
    }

    public SchemaRegistryDecoder schemaRegistryDecoder() {
        return this.schemaRegistryDecoder;
    }

    public DeletedStreamListProvider deletedStreamListProvider() {
        return this.deletedStreamListProvider;
    }

    public ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap() {
        return this.shardInfoShardConsumerMap;
    }

    public long shutdownStartTimeMillis() {
        return this.shutdownStartTimeMillis;
    }

    public boolean shutdownComplete() {
        return this.shutdownComplete;
    }

    public Object lock() {
        return this.lock;
    }

    public Stopwatch streamSyncWatch() {
        return this.streamSyncWatch;
    }

    public boolean leasesSyncedOnAppInit() {
        return this.leasesSyncedOnAppInit;
    }

    public CompletableFuture<Boolean> gracefulShutdownFuture() {
        return this.gracefulShutdownFuture;
    }

    public boolean gracefuleShutdownStarted() {
        return this.gracefuleShutdownStarted;
    }
}
