package org.kie.kogito.jobs.service.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.ZonedDateTime;
import org.assertj.core.api.Assertions;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient;
import org.kie.kogito.jobs.service.events.JobDataEvent;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.Recipient;
import org.kie.kogito.jobs.service.model.RecipientInstance;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.stream.AbstractJobStreams;
import org.kie.kogito.timer.Trigger;
import org.kie.kogito.timer.impl.PointInTimeTrigger;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.class */
public abstract class AbstractJobStreamsTest<T extends AbstractJobStreams> {
    protected static final String URL = "http://localhost:8180";
    private static final String SERIALIZED_MESSAGE = "SERIALIZED_MESSAGE";
    private static final String JOB_ID = "JOB_ID";
    private static final String CORRELATION_ID = "CORRELATION_ID";
    private static final String SCHEDULE_ID = "SCHEDULE_ID";

    @Captor
    protected ArgumentCaptor<Message<String>> messageCaptor;

    @Mock
    protected ObjectMapper objectMapper;

    @Captor
    protected ArgumentCaptor<Object> eventCaptor;

    @Mock
    protected Emitter<String> emitter;
    T jobStreams;
    private static final JobStatus STATUS = JobStatus.SCHEDULED;
    private static final ZonedDateTime LAST_UPDATE = ZonedDateTime.parse("2022-08-03T18:00:15.001+01:00");
    private static final Integer RETRIES = 1;
    private static final Integer PRIORITY = 1;
    private static final Integer EXECUTION_COUNTER = 1;
    private static final Recipient RECIPIENT = new RecipientInstance(HttpRecipient.builder().forStringPayload().url("http://recipient").build());
    private static final Trigger TRIGGER = new PointInTimeTrigger();

    @BeforeEach
    void setUp() {
        this.jobStreams = (T) Mockito.spy(createJobStreams());
    }

    protected abstract T createJobStreams();

    @Test
    void jobStatusChangeWithAck() throws Exception {
        JobDetails mockJobDetails = mockJobDetails();
        ((ObjectMapper) Mockito.doReturn(SERIALIZED_MESSAGE).when(this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        executeStatusChange(mockJobDetails).ack();
        ((AbstractJobStreams) Mockito.verify(this.jobStreams)).onAck(mockJobDetails);
    }

    @Test
    void jobStatusChangeWithNack() throws Exception {
        JobDetails mockJobDetails = mockJobDetails();
        ((ObjectMapper) Mockito.doReturn(SERIALIZED_MESSAGE).when(this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        Message<String> executeStatusChange = executeStatusChange(mockJobDetails);
        Exception exc = new Exception("Nack error");
        executeStatusChange.nack(exc);
        ((AbstractJobStreams) Mockito.verify(this.jobStreams)).onNack(exc, mockJobDetails);
    }

    private Message<String> executeStatusChange(JobDetails jobDetails) throws Exception {
        this.jobStreams.jobStatusChange(jobDetails);
        ((ObjectMapper) Mockito.verify(this.objectMapper)).writeValueAsString(this.eventCaptor.capture());
        Assertions.assertThat(this.eventCaptor.getValue()).isInstanceOf(JobDataEvent.class);
        assertExpectedEvent((JobDataEvent) this.eventCaptor.getValue());
        ((Emitter) Mockito.verify(this.emitter)).send((Message) this.messageCaptor.capture());
        Message<String> message = (Message) this.messageCaptor.getValue();
        Assertions.assertThat(message).isNotNull();
        Assertions.assertThat((String) message.getPayload()).isEqualTo(SERIALIZED_MESSAGE);
        assertExpectedMetadata(message);
        return message;
    }

    @Test
    void jobStatusChangeWithUnexpectedError() throws Exception {
        executeStatusChangeWithUnexpectedError(mockJobDetails());
    }

    @Test
    void jobStatusChangeWithUnexpectedErrorAndContinue() throws Exception {
        JobDetails mockJobDetails = mockJobDetails();
        executeStatusChangeWithUnexpectedError(mockJobDetails);
        ((ObjectMapper) Mockito.doReturn(SERIALIZED_MESSAGE).when(this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        this.jobStreams.jobStatusChange(mockJobDetails);
        ((Emitter) Mockito.verify(this.emitter)).send((Message) this.messageCaptor.capture());
        Message<String> message = (Message) this.messageCaptor.getValue();
        Assertions.assertThat(message).isNotNull();
        Assertions.assertThat((String) message.getPayload()).isEqualTo(SERIALIZED_MESSAGE);
        assertExpectedMetadata(message);
        message.ack();
        ((AbstractJobStreams) Mockito.verify(this.jobStreams)).onAck(mockJobDetails);
    }

    private void executeStatusChangeWithUnexpectedError(JobDetails jobDetails) throws Exception {
        ((ObjectMapper) Mockito.doThrow(new Throwable[]{new RuntimeException("Unexpected error")}).when(this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        this.jobStreams.jobStatusChange(jobDetails);
        ((AbstractJobStreams) Mockito.verify(this.jobStreams, Mockito.never())).onAck((JobDetails) ArgumentMatchers.any());
        ((AbstractJobStreams) Mockito.verify(this.jobStreams, Mockito.never())).onNack((Throwable) ArgumentMatchers.any(), (JobDetails) ArgumentMatchers.any());
    }

    private JobDetails mockJobDetails() {
        return JobDetails.builder().id("JOB_ID").correlationId(CORRELATION_ID).status(STATUS).lastUpdate(LAST_UPDATE).retries(RETRIES).priority(PRIORITY).executionCounter(EXECUTION_COUNTER).scheduledId(SCHEDULE_ID).recipient(RECIPIENT).trigger(TRIGGER).build();
    }

    private void assertExpectedEvent(JobDataEvent jobDataEvent) {
        Assertions.assertThat(jobDataEvent.getId()).isNotNull();
        Assertions.assertThat(jobDataEvent.getType()).isEqualTo("JobEvent");
        Assertions.assertThat(jobDataEvent.getSource()).hasToString("http://localhost:8180/jobs");
        ScheduledJob scheduledJob = (ScheduledJob) jobDataEvent.getData();
        Assertions.assertThat(scheduledJob).isNotNull();
        Assertions.assertThat(scheduledJob.getId()).isEqualTo("JOB_ID");
        Assertions.assertThat(scheduledJob.getScheduledId()).isEqualTo(SCHEDULE_ID);
        Assertions.assertThat(scheduledJob.getStatus()).isEqualTo(STATUS);
        Assertions.assertThat(scheduledJob.getRetries()).isEqualTo(RETRIES);
        Assertions.assertThat(scheduledJob.getExecutionCounter()).isEqualTo(EXECUTION_COUNTER);
        Assertions.assertThat(scheduledJob.getLastUpdate()).isEqualTo(LAST_UPDATE);
    }

    protected void assertExpectedMetadata(Message<String> message) {
        Assertions.assertThat(message.getMetadata()).isEqualTo(Metadata.empty());
    }
}
