package org.kie.kogito.jobs.service.management;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.vertx.mutiny.core.TimeoutStream;
import io.vertx.mutiny.core.Vertx;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/management/JobServiceInstanceManager.class */
public class JobServiceInstanceManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceInstanceManager.class);

    @ConfigProperty(name = "kogito.jobs-service.management.heartbeat.interval-in-seconds", defaultValue = "1")
    long heardBeatIntervalInSeconds;

    @ConfigProperty(name = "kogito.jobs-service.management.leader-check.interval-in-seconds", defaultValue = "1")
    long leaderCheckIntervalInSeconds;

    @ConfigProperty(name = "kogito.jobs-service.management.heartbeat.expiration-in-seconds", defaultValue = "10")
    long heartbeatExpirationInSeconds;

    @ConfigProperty(name = "kogito.jobs-service.management.heartbeat.management-id", defaultValue = "kogito-jobs-service-leader")
    String leaderManagementId;

    @Inject
    @Connector("smallrye-kafka")
    KafkaConnector kafkaConnector;

    @Inject
    Event<MessagingChangeEvent> messagingChangeEventEvent;

    @Inject
    Vertx vertx;

    @Inject
    JobServiceManagementRepository repository;
    private TimeoutStream checkLeader;
    private TimeoutStream heartbeat;
    private final AtomicReference<JobServiceManagementInfo> currentInfo = new AtomicReference<>();
    private final AtomicBoolean leader = new AtomicBoolean(false);

    void startup(@Observes StartupEvent startupEvent) {
        buildAndSetInstanceInfo();
        this.checkLeader = this.vertx.periodicStream(TimeUnit.SECONDS.toMillis(this.leaderCheckIntervalInSeconds)).handler(l -> {
            tryBecomeLeader(this.currentInfo.get(), this.checkLeader, this.heartbeat).subscribe().with(jobServiceManagementInfo -> {
                LOGGER.info("Checking Leader");
            }, th -> {
                LOGGER.error("Error checking Leader", th);
            });
        }).pause();
        this.heartbeat = this.vertx.periodicStream(TimeUnit.SECONDS.toMillis(this.heardBeatIntervalInSeconds)).handler(l2 -> {
            heartbeat(this.currentInfo.get()).subscribe().with(jobServiceManagementInfo -> {
                LOGGER.debug("Heartbeat {}", this.currentInfo.get());
            }, th -> {
                LOGGER.error("Error on heartbeat {}", this.currentInfo.get(), th);
            });
        }).pause();
        tryBecomeLeader(this.currentInfo.get(), this.checkLeader, this.heartbeat).subscribe().with(jobServiceManagementInfo -> {
            LOGGER.info("Initial check leader execution");
        }, th -> {
            LOGGER.error("Error on initial check leader", th);
        });
    }

    private void disableCommunication() {
        this.kafkaConnector.getConsumerChannels().stream().forEach(str -> {
            this.kafkaConnector.getConsumer(str).pause();
        });
        this.messagingChangeEventEvent.fire(new MessagingChangeEvent(false));
        LOGGER.warn("Disabled communication not leader instance");
    }

    private void enableCommunication() {
        this.kafkaConnector.getConsumerChannels().stream().forEach(str -> {
            this.kafkaConnector.getConsumer(str).resume();
        });
        this.messagingChangeEventEvent.fire(new MessagingChangeEvent(true));
        LOGGER.warn("Enabled communication for leader instance");
    }

    void onShutdown(@Observes ShutdownEvent shutdownEvent) {
        shutdown();
    }

    void onReleaseLeader(@Observes ReleaseLeaderEvent releaseLeaderEvent) {
        shutdown();
    }

    private void shutdown() {
        release(this.currentInfo.get()).onItem().invoke(r3 -> {
            this.checkLeader.cancel();
        }).onItem().invoke(r32 -> {
            this.heartbeat.cancel();
        }).subscribe().with(r33 -> {
            LOGGER.info("Shutting down leader instance check");
        }, th -> {
            LOGGER.error("Shutdown error", th);
        });
    }

    protected boolean isLeader() {
        return this.leader.get();
    }

    protected Uni<JobServiceManagementInfo> tryBecomeLeader(JobServiceManagementInfo jobServiceManagementInfo, TimeoutStream timeoutStream, TimeoutStream timeoutStream2) {
        LOGGER.debug("Try to become Leader");
        return this.repository.getAndUpdate(jobServiceManagementInfo.getId(), jobServiceManagementInfo2 -> {
            OffsetDateTime offsetDateTime = DateUtil.now().toOffsetDateTime();
            if (Objects.isNull(jobServiceManagementInfo2) || Objects.isNull(jobServiceManagementInfo2.getToken()) || Objects.equals(jobServiceManagementInfo2.getToken(), jobServiceManagementInfo.getToken()) || Objects.isNull(jobServiceManagementInfo2.getLastHeartbeat()) || jobServiceManagementInfo2.getLastHeartbeat().isBefore(offsetDateTime.minusSeconds(this.heartbeatExpirationInSeconds))) {
                jobServiceManagementInfo.setLastHeartbeat(offsetDateTime);
                LOGGER.info("SET Leader {}", jobServiceManagementInfo);
                this.leader.set(true);
                enableCommunication();
                timeoutStream2.resume();
                timeoutStream.pause();
                return jobServiceManagementInfo;
            }
            if (isLeader()) {
                LOGGER.info("Not Leader");
                this.leader.set(false);
                disableCommunication();
            }
            timeoutStream2.pause();
            timeoutStream.resume();
            return null;
        });
    }

    protected Uni<Void> release(JobServiceManagementInfo jobServiceManagementInfo) {
        return this.repository.set(new JobServiceManagementInfo(jobServiceManagementInfo.getId(), null, null)).onItem().invoke(this::disableCommunication).onItem().invoke(jobServiceManagementInfo2 -> {
            this.leader.set(false);
        }).onItem().invoke(jobServiceManagementInfo3 -> {
            LOGGER.info("Leader instance released");
        }).onFailure().invoke(th -> {
            LOGGER.error("Error releasing leader");
        }).replaceWithVoid();
    }

    protected Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo jobServiceManagementInfo) {
        LOGGER.debug("Heartbeat Leader");
        return isLeader() ? this.repository.heartbeat(jobServiceManagementInfo) : Uni.createFrom().nullItem();
    }

    private void buildAndSetInstanceInfo() {
        this.currentInfo.set(new JobServiceManagementInfo(this.leaderManagementId, generateToken(), DateUtil.now().toOffsetDateTime()));
        LOGGER.info("Current Job Service Instance {}", this.currentInfo.get());
    }

    private String generateToken() {
        return UUID.randomUUID().toString();
    }

    protected JobServiceManagementInfo getCurrentInfo() {
        return this.currentInfo.get();
    }

    protected TimeoutStream getCheckLeader() {
        return this.checkLeader;
    }

    protected TimeoutStream getHeartbeat() {
        return this.heartbeat;
    }
}
