package org.kie.kogito.jobs.service.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
import java.util.Optional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.jobs.service.events.JobDataEvent;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.resource.JobResource;
import org.kie.kogito.jobs.service.utils.FunctionsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/stream/KafkaJobStreams.class */
public class KafkaJobStreams {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaJobStreams.class);
    private static final String PUBLISH_EVENTS_CONFIG_KEY = "kogito.jobs-service.events-support";
    private ObjectMapper objectMapper;
    private Emitter<String> kafkaEmitter;
    private Optional<Boolean> enabled;
    private String url;

    public KafkaJobStreams() {
    }

    @Inject
    public KafkaJobStreams(ObjectMapper objectMapper, @ConfigProperty(name = "kogito.jobs-service.events-support") Optional<String> optional, @OnOverflow(OnOverflow.Strategy.LATEST) @Channel("kogito-job-service-job-status-events") Emitter<String> emitter, @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String str) {
        this.objectMapper = objectMapper;
        Optional<U> map = optional.map(Boolean::valueOf);
        Boolean bool = Boolean.TRUE;
        bool.getClass();
        this.enabled = map.filter((v1) -> {
            return r2.equals(v1);
        });
        this.kafkaEmitter = emitter;
        this.url = str;
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS)
    public void jobStatusChangeKafkaPublisher(ScheduledJob scheduledJob) {
        this.enabled.map(bool -> {
            return this.kafkaEmitter;
        }).map(emitter -> {
            JobDataEvent build = JobDataEvent.builder().source(this.url + JobResource.JOBS_PATH).data(scheduledJob).build();
            ObjectMapper objectMapper = this.objectMapper;
            objectMapper.getClass();
            return emitter.send(FunctionsUtil.unchecked(objectMapper::writeValueAsString).apply(build));
        }).ifPresent(emitter2 -> {
            LOGGER.debug("Job Status change published to kafka {}", scheduledJob);
        });
    }
}
