package org.kie.kogito.jobs.service.executor;

import io.vertx.axle.core.Vertx;
import io.vertx.axle.core.buffer.Buffer;
import io.vertx.axle.ext.web.client.HttpRequest;
import io.vertx.axle.ext.web.client.HttpResponse;
import io.vertx.axle.ext.web.client.WebClient;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.URIBuilder;
import org.kie.kogito.jobs.service.converters.HttpConverters;
import org.kie.kogito.jobs.service.model.HTTPRequestCallback;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/executor/HttpJobExecutor.class */
public class HttpJobExecutor implements JobExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpJobExecutor.class);

    @Inject
    Vertx vertx;
    private WebClient client;

    @Inject
    HttpConverters httpConverters;

    @Inject
    JobStreams jobStreams;

    @PostConstruct
    void initialize() {
        this.client = WebClient.create(this.vertx);
    }

    private CompletionStage<HttpResponse<Buffer>> executeCallback(HTTPRequestCallback hTTPRequestCallback) {
        LOGGER.debug("Executing callback {}", hTTPRequestCallback);
        URI uri = URIBuilder.toURI(hTTPRequestCallback.getUrl());
        HttpRequest request = this.client.request(this.httpConverters.convertHttpMethod(hTTPRequestCallback.getMethod()), uri.getPort(), uri.getHost(), uri.getPath());
        Optional.ofNullable(hTTPRequestCallback.getQueryParams()).ifPresent(map -> {
            request.queryParams().addAll(map);
        });
        return request.send();
    }

    private String getResponseCode(HttpResponse<Buffer> httpResponse) {
        return (String) Optional.ofNullable(Integer.valueOf(httpResponse.statusCode())).map((v0) -> {
            return String.valueOf(v0);
        }).orElse(null);
    }

    private <T extends JobExecutionResponse> PublisherBuilder<T> handleResponse(T t) {
        LOGGER.debug("handle response {}", t);
        return ReactiveStreams.of(t).map((v0) -> {
            return v0.getCode();
        }).flatMap(str -> {
            return str.equals("200") ? handleSuccess(t) : handleError(t);
        });
    }

    private <T extends JobExecutionResponse> PublisherBuilder<T> handleError(T t) {
        LOGGER.info("handle error {}", t);
        PublisherBuilder of = ReactiveStreams.of(t);
        JobStreams jobStreams = this.jobStreams;
        Objects.requireNonNull(jobStreams);
        return of.peek(jobStreams::publishJobError).peek(jobExecutionResponse -> {
            LOGGER.debug("Error executing job {}.", jobExecutionResponse);
        });
    }

    private <T extends JobExecutionResponse> PublisherBuilder<T> handleSuccess(T t) {
        LOGGER.info("handle success {}", t);
        PublisherBuilder of = ReactiveStreams.of(t);
        JobStreams jobStreams = this.jobStreams;
        Objects.requireNonNull(jobStreams);
        return of.peek(jobStreams::publishJobSuccess).peek(jobExecutionResponse -> {
            LOGGER.debug("Success executing job {}.", jobExecutionResponse);
        });
    }

    @Override // org.kie.kogito.jobs.service.executor.JobExecutor
    public CompletionStage<ScheduledJob> execute(CompletionStage<ScheduledJob> completionStage) {
        return completionStage.thenCompose(scheduledJob -> {
            return ReactiveStreams.fromCompletionStage(executeCallback(HTTPRequestCallback.builder().url(scheduledJob.getCallbackEndpoint()).method(HTTPRequestCallback.HTTPMethod.POST).addQueryParam("limit", (String) scheduledJob.hasInterval().map(l -> {
                return Integer.valueOf(getRepeatableJobCountDown(scheduledJob));
            }).map((v0) -> {
                return String.valueOf(v0);
            }).orElse(null)).build())).map(httpResponse -> {
                return JobExecutionResponse.builder().message(httpResponse.statusMessage()).code(getResponseCode(httpResponse)).now().jobId(scheduledJob.getId()).build();
            }).flatMap(this::handleResponse).findFirst().run().thenApply(optional -> {
                return (ScheduledJob) optional.map(jobExecutionResponse -> {
                    return scheduledJob;
                }).orElse(null);
            }).exceptionally(th -> {
                LOGGER.error("Generic error executing job {}", scheduledJob, th);
                this.jobStreams.publishJobError(JobExecutionResponse.builder().message(th.getMessage()).now().jobId(scheduledJob.getId()).build());
                return scheduledJob;
            });
        });
    }

    private int getRepeatableJobCountDown(ScheduledJob scheduledJob) {
        return scheduledJob.getRepeatLimit().intValue() - (scheduledJob.getExecutionCounter().intValue() + 1);
    }

    WebClient getClient() {
        return this.client;
    }
}
