/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.executor;

import io.vertx.axle.core.Vertx;
import io.vertx.axle.core.buffer.Buffer;
import io.vertx.axle.ext.web.client.HttpRequest;
import io.vertx.axle.ext.web.client.HttpResponse;
import io.vertx.axle.ext.web.client.WebClient;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.URIBuilder;
import org.kie.kogito.jobs.service.converters.HttpConverters;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.model.HTTPRequestCallback;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class HttpJobExecutor
implements JobExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpJobExecutor.class);
    @Inject
    Vertx vertx;
    private WebClient client;
    @Inject
    HttpConverters httpConverters;
    @Inject
    JobStreams jobStreams;

    @PostConstruct
    void initialize() {
        this.client = WebClient.create((Vertx)this.vertx);
    }

    private CompletionStage<HttpResponse<Buffer>> executeCallback(HTTPRequestCallback request) {
        LOGGER.debug("Executing callback {}", (Object)request);
        URI uri = URIBuilder.toURI((String)request.getUrl());
        HttpRequest clientRequest = this.client.request(this.httpConverters.convertHttpMethod(request.getMethod()), uri.getPort(), uri.getHost(), uri.getPath());
        Optional.ofNullable(request.getQueryParams()).ifPresent(params -> clientRequest.queryParams().addAll(params));
        return clientRequest.send();
    }

    private String getResponseCode(HttpResponse<Buffer> response) {
        return Optional.ofNullable(response.statusCode()).map(String::valueOf).orElse(null);
    }

    private <T extends JobExecutionResponse> PublisherBuilder<T> handleResponse(T response) {
        LOGGER.debug("handle response {}", response);
        return ReactiveStreams.of(response).map(JobExecutionResponse::getCode).flatMap(code -> code.equals("200") ? this.handleSuccess(response) : this.handleError(response));
    }

    private <T extends JobExecutionResponse> PublisherBuilder<T> handleError(T response) {
        LOGGER.info("handle error {}", response);
        return ReactiveStreams.of(response).peek(this.jobStreams::publishJobError).peek(r -> LOGGER.debug("Error executing job {}.", r));
    }

    private <T extends JobExecutionResponse> PublisherBuilder<T> handleSuccess(T response) {
        LOGGER.info("handle success {}", response);
        return ReactiveStreams.of(response).peek(this.jobStreams::publishJobSuccess).peek(r -> LOGGER.debug("Success executing job {}.", r));
    }

    @Override
    public CompletionStage<ScheduledJob> execute(CompletionStage<ScheduledJob> futureJob) {
        return futureJob.thenCompose(job -> {
            HTTPRequestCallback callback = HTTPRequestCallback.builder().url(job.getCallbackEndpoint()).method(HTTPRequestCallback.HTTPMethod.POST).addQueryParam("limit", job.hasInterval().map(interval -> this.getRepeatableJobCountDown((ScheduledJob)((Object)job))).map(String::valueOf).orElse(null)).build();
            return ReactiveStreams.fromCompletionStage(this.executeCallback(callback)).map(response -> JobExecutionResponse.builder().message(response.statusMessage()).code(this.getResponseCode((HttpResponse<Buffer>)response)).now().jobId(job.getId()).build()).flatMap(this::handleResponse).findFirst().run().thenApply(response -> response.map(r -> job).orElse(null)).exceptionally(ex -> {
                LOGGER.error("Generic error executing job {}", (Object)job, ex);
                this.jobStreams.publishJobError(JobExecutionResponse.builder().message(ex.getMessage()).now().jobId(job.getId()).build());
                return job;
            });
        });
    }

    private int getRepeatableJobCountDown(ScheduledJob job) {
        return job.getRepeatLimit() - (job.getExecutionCounter() + 1);
    }

    WebClient getClient() {
        return this.client;
    }
}

