package org.kie.kogito.jobs.service.repository.impl;

import io.vertx.core.Vertx;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;

/* loaded from: input_file:org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.class */
public abstract class BaseReactiveJobRepository implements ReactiveJobRepository {
    private Vertx vertx;
    private JobStreams jobStreams;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseReactiveJobRepository(Vertx vertx, JobStreams jobStreams) {
        this.vertx = vertx;
        this.jobStreams = jobStreams;
    }

    public <T> CompletionStage<T> runAsync(Supplier<T> supplier) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.vertx.executeBlocking(promise -> {
            completableFuture.complete(supplier.get());
        }, asyncResult -> {
        });
        return completableFuture;
    }

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public PublisherBuilder<JobDetails> findByStatus(JobStatus... jobStatusArr) {
        return findAll().filter(jobDetails -> {
            return Objects.nonNull(jobDetails.getStatus());
        }).filter(jobDetails2 -> {
            Stream stream = Arrays.stream(jobStatusArr);
            JobStatus status = jobDetails2.getStatus();
            Objects.requireNonNull(status);
            return stream.anyMatch((v1) -> {
                return r1.equals(v1);
            });
        });
    }

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public CompletionStage<JobDetails> save(JobDetails jobDetails) {
        CompletionStage<JobDetails> doSave = doSave(jobDetails);
        JobStreams jobStreams = this.jobStreams;
        Objects.requireNonNull(jobStreams);
        return doSave.thenApply(jobStreams::publishJobStatusChange);
    }

    public abstract CompletionStage<JobDetails> doSave(JobDetails jobDetails);

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public CompletionStage<JobDetails> delete(JobDetails jobDetails) {
        return delete(jobDetails.getId()).thenApply(jobDetails2 -> {
            return this.jobStreams.publishJobStatusChange(jobDetails);
        });
    }

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public CompletionStage<JobDetails> merge(String str, JobDetails jobDetails) {
        return (CompletionStage) Optional.ofNullable(str).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).filter(str2 -> {
            return StringUtils.isBlank(jobDetails.getId()) || str2.equals(jobDetails.getId());
        }).map(str3 -> {
            return get(str3).thenApply((v0) -> {
                return Optional.ofNullable(v0);
            }).thenApply(optional -> {
                return optional.map(jobDetails2 -> {
                    return doMerge(jobDetails, jobDetails2);
                });
            }).thenCompose(optional2 -> {
                return (CompletionStage) optional2.map(this::save).orElse(CompletableFuture.completedFuture(null));
            });
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Id is empty or not equals to Job.id : " + str);
        });
    }

    private JobDetails doMerge(JobDetails jobDetails, JobDetails jobDetails2) {
        return JobDetails.builder().of(jobDetails2).merge(jobDetails).build();
    }
}
