package org.kie.kogito.jobs.service.messaging;

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.api.event.CancelJobRequestEvent;
import org.kie.kogito.jobs.api.event.CreateProcessInstanceJobRequestEvent;
import org.kie.kogito.jobs.api.event.JobCloudEvent;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/kie/kogito/jobs/service/messaging/ReactiveMessagingEventConsumerTest.class */
class ReactiveMessagingEventConsumerTest {
    private static final String JOB_ID = "JOB_ID";
    private static final String INTERNAL_ERROR = "Internal error";
    private static final String JOB_QUERY_ERROR = "Job query error";

    @Mock
    private TimerDelegateJobScheduler scheduler;

    @Mock
    private ReactiveJobRepository jobRepository;

    @Mock
    private Message<JobCloudEvent<?>> message;
    private ReactiveMessagingEventConsumer eventConsumer;

    @Captor
    private ArgumentCaptor<Throwable> errorCaptor;

    ReactiveMessagingEventConsumerTest() {
    }

    @BeforeEach
    void setUp() {
        ((Message) Mockito.lenient().doReturn(CompletableFuture.completedFuture(null)).when(this.message)).ack();
        ((Message) Mockito.lenient().doReturn(CompletableFuture.completedFuture(null)).when(this.message)).nack((Throwable) ArgumentMatchers.any());
        this.eventConsumer = new ReactiveMessagingEventConsumer();
        this.eventConsumer.scheduler = this.scheduler;
        this.eventConsumer.jobRepository = this.jobRepository;
    }

    @Test
    void onCreateProcessInstanceJobWithNonExistingJobSuccessful() {
        prepareCreateProcessInstanceJobWithExistingJobResult(null);
        executeSuccessfulScheduledJobExecution();
    }

    @Test
    void onCreateProcessInstanceJobWithExistingScheduledJobSuccessful() {
        prepareCreateProcessInstanceJobWithExistingJobResult(JobDetails.builder().id(JOB_ID).status(JobStatus.SCHEDULED).build());
        executeSuccessfulScheduledJobExecution();
    }

    @Test
    void onCreateProcessInstanceJobWithExistingRetryJobSuccessful() {
        onCreateProcessInstanceJobExistingNonScheduledSuccessful(JobStatus.RETRY);
    }

    @Test
    void onCreateProcessInstanceJobWithExistingCanceledJobSuccessful() {
        onCreateProcessInstanceJobExistingNonScheduledSuccessful(JobStatus.CANCELED);
    }

    @Test
    void onCreateProcessInstanceJobWithExistingErrorJobSuccessful() {
        onCreateProcessInstanceJobExistingNonScheduledSuccessful(JobStatus.ERROR);
    }

    @Test
    void onCreateProcessInstanceJobWithExistingExecutedJobSuccessful() {
        onCreateProcessInstanceJobExistingNonScheduledSuccessful(JobStatus.EXECUTED);
    }

    @Test
    void onCreateProcessInstanceJobWithJobQueryError() {
        Job job = new Job();
        job.setId(JOB_ID);
        ((Message) Mockito.doReturn(CreateProcessInstanceJobRequestEvent.builder().job(job).build()).when(this.message)).getPayload();
        ((ReactiveJobRepository) Mockito.doReturn(CompletableFuture.failedFuture(new Exception(JOB_QUERY_ERROR))).when(this.jobRepository)).get(JOB_ID);
        executeFailedExecution(JOB_QUERY_ERROR);
        ((TimerDelegateJobScheduler) Mockito.verify(this.scheduler, Mockito.never())).schedule((JobDetails) ArgumentMatchers.any());
    }

    @Test
    void onCreateProcessInstanceJobWithJobScheduleError() {
        Job job = new Job();
        job.setId(JOB_ID);
        ((Message) Mockito.doReturn(CreateProcessInstanceJobRequestEvent.builder().job(job).build()).when(this.message)).getPayload();
        ((ReactiveJobRepository) Mockito.doReturn(CompletableFuture.completedStage(JobDetails.builder().id(JOB_ID).status(JobStatus.SCHEDULED).build())).when(this.jobRepository)).get(JOB_ID);
        ((TimerDelegateJobScheduler) Mockito.doReturn(ReactiveStreams.fromCompletionStage(CompletableFuture.failedStage(new Exception(INTERNAL_ERROR))).buildRs()).when(this.scheduler)).schedule((JobDetails) ArgumentMatchers.any());
        executeFailedExecution(INTERNAL_ERROR);
        ((TimerDelegateJobScheduler) Mockito.verify(this.scheduler)).schedule((JobDetails) ArgumentMatchers.any());
    }

    private void onCreateProcessInstanceJobExistingNonScheduledSuccessful(JobStatus jobStatus) {
        prepareCreateProcessInstanceJobWithExistingJobResult(JobDetails.builder().id(JOB_ID).status(jobStatus).build());
        executeSuccessfulNonScheduledJobExecution();
    }

    private void prepareCreateProcessInstanceJobWithExistingJobResult(JobDetails jobDetails) {
        Job job = new Job();
        job.setId(JOB_ID);
        ((Message) Mockito.doReturn(CreateProcessInstanceJobRequestEvent.builder().job(job).build()).when(this.message)).getPayload();
        ((ReactiveJobRepository) Mockito.doReturn(CompletableFuture.completedFuture(jobDetails)).when(this.jobRepository)).get(JOB_ID);
        ((TimerDelegateJobScheduler) Mockito.lenient().doReturn(ReactiveStreams.of(JobDetails.builder().build()).buildRs()).when(this.scheduler)).schedule((JobDetails) ArgumentMatchers.any());
    }

    private void executeSuccessfulScheduledJobExecution() {
        executeSuccessfulExecution();
        ((TimerDelegateJobScheduler) Mockito.verify(this.scheduler)).schedule((JobDetails) ArgumentMatchers.any());
    }

    private void executeSuccessfulNonScheduledJobExecution() {
        executeSuccessfulExecution();
        ((TimerDelegateJobScheduler) Mockito.verify(this.scheduler, Mockito.never())).schedule((JobDetails) ArgumentMatchers.any());
    }

    @Test
    void onCancelJobWithNonExistingJobSuccessful() {
        prepareCancelJobWithExistingJob(null);
        executeSuccessfulCancelJob();
    }

    @Test
    void onCancelJobWithExistingJobSuccessful() {
        prepareCancelJobWithExistingJob(JobDetails.builder().id(JOB_ID).build());
        executeSuccessfulCancelJob();
    }

    private void prepareCancelJobWithExistingJob(JobDetails jobDetails) {
        ((Message) Mockito.doReturn(CancelJobRequestEvent.builder().jobId(JOB_ID).build()).when(this.message)).getPayload();
        ((TimerDelegateJobScheduler) Mockito.doReturn(CompletableFuture.completedFuture(jobDetails)).when(this.scheduler)).cancel(JOB_ID);
    }

    private void executeSuccessfulCancelJob() {
        executeSuccessfulExecution();
        ((TimerDelegateJobScheduler) Mockito.verify(this.scheduler)).cancel(JOB_ID);
    }

    @Test
    void onCancelJobWithError() {
        ((Message) Mockito.doReturn(CancelJobRequestEvent.builder().jobId(JOB_ID).build()).when(this.message)).getPayload();
        ((TimerDelegateJobScheduler) Mockito.doReturn(CompletableFuture.failedFuture(new Exception(INTERNAL_ERROR))).when(this.scheduler)).cancel(JOB_ID);
        executeFailedExecution(INTERNAL_ERROR);
        ((TimerDelegateJobScheduler) Mockito.verify(this.scheduler)).cancel(JOB_ID);
    }

    private void executeSuccessfulExecution() {
        this.eventConsumer.onKogitoServiceRequest(this.message).subscribe().with(r1 -> {
        }, Assertions::fail);
        ((Message) Mockito.verify(this.message)).ack();
        ((Message) Mockito.verify(this.message, Mockito.never())).nack((Throwable) ArgumentMatchers.any());
    }

    private void executeFailedExecution(String str) {
        this.eventConsumer.onKogitoServiceRequest(this.message).subscribe().with(r1 -> {
        }, Assertions::fail);
        ((Message) Mockito.verify(this.message, Mockito.never())).ack();
        ((Message) Mockito.verify(this.message)).nack((Throwable) this.errorCaptor.capture());
        org.assertj.core.api.Assertions.assertThat((Throwable) this.errorCaptor.getValue()).isNotNull().isInstanceOf(JobServiceException.class).hasMessageContaining(str);
    }
}
