package org.kie.kogito.jobs.service.repository.impl;

import io.vertx.core.Vertx;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;

/* loaded from: input_file:test-resources/jobs-service.jar:org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.class */
public abstract class BaseReactiveJobRepository implements ReactiveJobRepository {
    private Vertx vertx;
    private JobStreams jobStreams;

    public BaseReactiveJobRepository(Vertx vertx, JobStreams jobStreams) {
        this.vertx = vertx;
        this.jobStreams = jobStreams;
    }

    public <T> CompletionStage<T> runAsync(Supplier<T> supplier) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.vertx.executeBlocking(promise -> {
            completableFuture.complete(supplier.get());
        }, asyncResult -> {
        });
        return completableFuture;
    }

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public PublisherBuilder<ScheduledJob> findByStatus(JobStatus... jobStatusArr) {
        return findAll().filter(scheduledJob -> {
            return Objects.nonNull(scheduledJob.getStatus());
        }).filter(scheduledJob2 -> {
            Stream stream = Arrays.stream(jobStatusArr);
            JobStatus status = scheduledJob2.getStatus();
            Objects.requireNonNull(status);
            return stream.anyMatch((v1) -> {
                return r1.equals(v1);
            });
        });
    }

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public CompletionStage<ScheduledJob> save(ScheduledJob scheduledJob) {
        CompletionStage<ScheduledJob> doSave = doSave(scheduledJob);
        JobStreams jobStreams = this.jobStreams;
        Objects.requireNonNull(jobStreams);
        return doSave.thenApply(jobStreams::publishJobStatusChange);
    }

    public abstract CompletionStage<ScheduledJob> doSave(ScheduledJob scheduledJob);

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public CompletionStage<ScheduledJob> delete(ScheduledJob scheduledJob) {
        return delete(scheduledJob.getId()).thenApply(scheduledJob2 -> {
            return this.jobStreams.publishJobStatusChange(scheduledJob);
        });
    }

    @Override // org.kie.kogito.jobs.service.repository.ReactiveJobRepository
    public CompletionStage<ScheduledJob> merge(String str, ScheduledJob scheduledJob) {
        return (CompletionStage) Optional.ofNullable(str).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).filter(str2 -> {
            return StringUtils.isBlank(scheduledJob.getId()) || str2.equals(scheduledJob.getId());
        }).map(str3 -> {
            return get(str3).thenApply((v0) -> {
                return Optional.ofNullable(v0);
            }).thenApply(optional -> {
                return optional.map(scheduledJob2 -> {
                    return doMerge(scheduledJob, scheduledJob2);
                });
            }).thenCompose(optional2 -> {
                return (CompletionStage) optional2.map(this::save).orElse(CompletableFuture.completedFuture(null));
            });
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Id is empty or not equals to Job.id : " + str);
        });
    }

    private ScheduledJob doMerge(ScheduledJob scheduledJob, ScheduledJob scheduledJob2) {
        return ScheduledJob.builder().of(scheduledJob2).merge(scheduledJob).build();
    }
}
