package org.kie.kogito.addons.quarkus.jobs.service.embedded.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.annotations.Blocking;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.event.AbstractDataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.jobs.JobsServiceException;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.class */
public class EventPublisherJobStreams {
    public static final String DATA_INDEX_EVENT_PUBLISHER = "org.kie.kogito.index.addon.DataIndexEventPublisher";
    private static final Logger LOGGER = LoggerFactory.getLogger(EventPublisherJobStreams.class);
    private final String url;
    private final EventPublisher eventPublisher;
    private final ObjectMapper objectMapper;

    /* loaded from: input_file:org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams$EventPublisherJobDataEvent.class */
    public static class EventPublisherJobDataEvent extends AbstractDataEvent<byte[]> {
        public EventPublisherJobDataEvent(String str, String str2, byte[] bArr, String str3, String str4, String str5, String str6, String str7) {
            super(str, str2, bArr, str3, str4, str5, str6, (String) null, str7);
        }
    }

    @Inject
    public EventPublisherJobStreams(@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String str, Instance<EventPublisher> instance, ObjectMapper objectMapper) {
        this.url = str;
        this.eventPublisher = (EventPublisher) instance.stream().filter(eventPublisher -> {
            return eventPublisher.getClass().getName().startsWith(DATA_INDEX_EVENT_PUBLISHER);
        }).findFirst().orElse(null);
        this.objectMapper = objectMapper;
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Blocking
    @Incoming("job-status-change-events")
    public void onJobStatusChange(JobDetails jobDetails) {
        if (this.eventPublisher != null) {
            ScheduledJob of = ScheduledJobAdapter.of(jobDetails);
            try {
                try {
                    this.eventPublisher.publish(new EventPublisherJobDataEvent("JobEvent", this.url + "/jobs", this.objectMapper.writeValueAsBytes(of), of.getProcessInstanceId(), of.getRootProcessInstanceId(), of.getProcessId(), of.getRootProcessId(), null));
                } catch (Exception e) {
                    LOGGER.error("Job status change propagation has failed at eventPublisher: " + this.eventPublisher.getClass() + " execution.", e);
                }
            } catch (Exception e2) {
                throw new JobsServiceException("It was not possible to serialize scheduledJob to json: " + of, e2);
            }
        }
    }
}
