package org.kie.kogito.jobs.service.repository.postgresql;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.Tuple;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.utils.DateUtil;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.class */
public class PostgreSqlJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository {
    private static final String JOB_DETAILS_TABLE = "job_details";
    private static final String JOB_DETAILS_COLUMNS = "id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger";
    private PgPool client;
    private TriggerMarshaller triggerMarshaller;
    private RecipientMarshaller recipientMarshaller;

    PostgreSqlJobRepository() {
        super((Vertx) null, (JobStreams) null);
    }

    @Inject
    public PostgreSqlJobRepository(Vertx vertx, JobStreams jobStreams, PgPool pgPool, TriggerMarshaller triggerMarshaller, RecipientMarshaller recipientMarshaller) {
        super(vertx, jobStreams);
        this.client = pgPool;
        this.triggerMarshaller = triggerMarshaller;
        this.recipientMarshaller = recipientMarshaller;
    }

    public CompletionStage<JobDetails> doSave(JobDetails jobDetails) {
        return this.client.preparedQuery("INSERT INTO job_details (id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (id) DO UPDATE SET correlation_id = $2, status = $3, last_update = $4, retries = $5, execution_counter = $6, scheduled_id = $7, payload = $8, type = $9, priority = $10, recipient = $11, trigger = $12 RETURNING id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger").execute(Tuple.tuple((List) Stream.of(jobDetails.getId(), jobDetails.getCorrelationId(), Optional.ofNullable(jobDetails.getStatus()).map((v0) -> {
            return v0.name();
        }).orElse(null), Optional.ofNullable(jobDetails.getLastUpdate()).map((v0) -> {
            return v0.toOffsetDateTime();
        }).orElse(null), jobDetails.getRetries(), jobDetails.getExecutionCounter(), jobDetails.getScheduledId(), Optional.ofNullable(jobDetails.getPayload()).map(obj -> {
            return new JsonObject(obj.toString());
        }).orElse(null), Optional.ofNullable(jobDetails.getType()).map((v0) -> {
            return v0.name();
        }).orElse(null), jobDetails.getPriority(), this.recipientMarshaller.marshall(jobDetails.getRecipient()), this.triggerMarshaller.marshall(jobDetails.getTrigger())).collect(Collectors.toList()))).onItem().transform((v0) -> {
            return v0.iterator();
        }).onItem().transform(rowIterator -> {
            if (rowIterator.hasNext()) {
                return from((Row) rowIterator.next());
            }
            return null;
        }).emitOn(Infrastructure.getDefaultExecutor()).subscribeAsCompletionStage();
    }

    public CompletionStage<JobDetails> get(String str) {
        return this.client.preparedQuery("SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger FROM job_details WHERE id = $1").execute(Tuple.of(str)).onItem().transform((v0) -> {
            return v0.iterator();
        }).onItem().transform(rowIterator -> {
            if (rowIterator.hasNext()) {
                return from((Row) rowIterator.next());
            }
            return null;
        }).emitOn(Infrastructure.getDefaultExecutor()).subscribeAsCompletionStage();
    }

    public CompletionStage<Boolean> exists(String str) {
        return this.client.preparedQuery("SELECT id FROM job_details WHERE id = $1").execute(Tuple.of(str)).onItem().transform(rowSet -> {
            return Boolean.valueOf(rowSet.rowCount() > 0);
        }).emitOn(Infrastructure.getDefaultExecutor()).subscribeAsCompletionStage();
    }

    public CompletionStage<JobDetails> delete(String str) {
        return this.client.preparedQuery("DELETE FROM job_details WHERE id = $1 RETURNING id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger").execute(Tuple.of(str)).onItem().transform((v0) -> {
            return v0.iterator();
        }).onItem().transform(rowIterator -> {
            if (rowIterator.hasNext()) {
                return from((Row) rowIterator.next());
            }
            return null;
        }).emitOn(Infrastructure.getDefaultExecutor()).subscribeAsCompletionStage();
    }

    public PublisherBuilder<JobDetails> findAll() {
        return ReactiveStreams.fromPublisher(this.client.query("SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger FROM job_details").execute().onItem().transformToMulti(rowSet -> {
            return Multi.createFrom().iterable(rowSet);
        }).onItem().transform(this::from).emitOn(Infrastructure.getDefaultExecutor()));
    }

    public PublisherBuilder<JobDetails> findByStatusBetweenDatesOrderByPriority(ZonedDateTime zonedDateTime, ZonedDateTime zonedDateTime2, JobStatus... jobStatusArr) {
        return ReactiveStreams.fromPublisher(this.client.query("SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, payload, type, priority, recipient, trigger FROM job_details" + (" WHERE " + createStatusQuery(jobStatusArr) + " AND " + createTimeQuery(zonedDateTime, zonedDateTime2)) + " ORDER BY priority DESC").execute().onItem().transformToMulti(rowSet -> {
            return Multi.createFrom().iterable(rowSet);
        }).onItem().transform(this::from).emitOn(Infrastructure.getDefaultExecutor()));
    }

    static String createStatusQuery(JobStatus... jobStatusArr) {
        return (String) Arrays.stream(jobStatusArr).map((v0) -> {
            return v0.name();
        }).collect(Collectors.joining("', '", "status IN ('", "')"));
    }

    static String createTimeQuery(ZonedDateTime zonedDateTime, ZonedDateTime zonedDateTime2) {
        return ("(trigger->>'nextFireTime')::INT8 > " + zonedDateTime.toInstant().toEpochMilli()) + " AND " + ("(trigger->>'nextFireTime')::INT8 < " + zonedDateTime2.toInstant().toEpochMilli());
    }

    JobDetails from(Row row) {
        return JobDetails.builder().id(row.getString("id")).correlationId(row.getString("correlation_id")).status((JobStatus) Optional.ofNullable(row.getString("status")).map(JobStatus::valueOf).orElse(null)).lastUpdate((ZonedDateTime) Optional.ofNullable(row.getOffsetDateTime("last_update")).map(offsetDateTime -> {
            return offsetDateTime.atZoneSameInstant(DateUtil.DEFAULT_ZONE);
        }).orElse(null)).retries(row.getInteger("retries")).executionCounter(row.getInteger("execution_counter")).scheduledId(row.getString("scheduled_id")).payload(Optional.ofNullable((JsonObject) row.get(JsonObject.class, 7)).map((v0) -> {
            return v0.toString();
        }).orElse(null)).type((JobDetails.Type) Optional.ofNullable(row.getString("type")).map(JobDetails.Type::valueOf).orElse(null)).priority(row.getInteger("priority")).recipient(this.recipientMarshaller.unmarshall((JsonObject) row.get(JsonObject.class, 10))).trigger(this.triggerMarshaller.unmarshall((JsonObject) row.get(JsonObject.class, 11))).build();
    }
}
