package org.kie.kogito.serverless.workflow.executor;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonCloudEventData;
import io.serverlessworkflow.api.Workflow;
import java.net.URI;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.kie.kogito.serverless.workflow.fluent.ActionBuilder;
import org.kie.kogito.serverless.workflow.fluent.EventDefBuilder;
import org.kie.kogito.serverless.workflow.fluent.FunctionBuilder;
import org.kie.kogito.serverless.workflow.fluent.StateBuilder;
import org.kie.kogito.serverless.workflow.fluent.WorkflowBuilder;
import org.kie.kogito.serverless.workflow.models.JsonNodeModel;

/* loaded from: input_file:org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.class */
public class WorkflowEventSubscriberTest {
    @Test
    void testCallbackSubscriber() throws InterruptedException, TimeoutException {
        Workflow build = ((WorkflowBuilder) WorkflowBuilder.workflow("testCallback").start(StateBuilder.callback(ActionBuilder.call(FunctionBuilder.expr("concat", "{slogan:.slogan+\"er Beti\"}")), EventDefBuilder.eventDef("testSubscribe"))).end()).build();
        StaticWorkflowApplication create = StaticWorkflowApplication.create();
        try {
            String id = create.execute(build, WorkflowBuilder.jsonObject().put("slogan", "Viva ")).getId();
            publish("testSubscribe", buildCloudEvent("testSubscribe", id).withData(JsonCloudEventData.wrap(WorkflowBuilder.jsonObject().put("additionalData", "This has been injected by the event"))).build());
            Assertions.assertThat(((JsonNodeModel) create.waitForFinish(id, Duration.ofSeconds(3L)).orElseThrow()).getWorkflowdata()).isEqualTo(WorkflowBuilder.jsonObject().put("additionalData", "This has been injected by the event").put("slogan", "Viva er Beti"));
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testEventSubscriber() throws InterruptedException, TimeoutException {
        Workflow build = ((WorkflowBuilder) WorkflowBuilder.workflow("testEvent").start(StateBuilder.operation()).next(StateBuilder.event().events().event(EventDefBuilder.eventDef("testSubscribe")).action(ActionBuilder.call(FunctionBuilder.expr("concat", "{slogan:.slogan+\"er Beti\"}"))).endBranch()).end()).build();
        StaticWorkflowApplication create = StaticWorkflowApplication.create();
        try {
            String id = create.execute(build, Collections.emptyMap()).getId();
            publish("testSubscribe", buildCloudEvent("testSubscribe", id).withData(JsonCloudEventData.wrap(WorkflowBuilder.jsonObject().put("slogan", "Viva "))).build());
            Assertions.assertThat(((JsonNodeModel) create.waitForFinish(id, Duration.ofSeconds(3L)).orElseThrow()).getWorkflowdata()).isEqualTo(WorkflowBuilder.jsonObject().put("slogan", "Viva er Beti"));
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testCallbackSubscriberWithoutEvent() throws InterruptedException {
        Workflow build = ((WorkflowBuilder) WorkflowBuilder.workflow("testFailure").start(StateBuilder.callback(ActionBuilder.call(FunctionBuilder.expr("concat", ".slogan+\" er Beti\"")), EventDefBuilder.eventDef("testSubscribeEvent"))).end()).build();
        StaticWorkflowApplication create = StaticWorkflowApplication.create();
        try {
            String id = create.execute(build, WorkflowBuilder.jsonObject().put("slogan", "Viva")).getId();
            Assertions.assertThatExceptionOfType(TimeoutException.class).isThrownBy(() -> {
                create.waitForFinish(id, Duration.ofSeconds(2L));
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private CloudEventBuilder buildCloudEvent(String str, String str2) {
        return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("")).withType(str).withTime(OffsetDateTime.now()).withExtension("kogitoprocrefid", str2);
    }

    private void publish(String str, CloudEvent cloudEvent) {
        MockConsumer<byte[], CloudEvent> mockConsumer = MockKafkaEventReceiverFactory.consumer;
        Assertions.assertThat(mockConsumer.subscription()).contains(new String[]{str});
        mockConsumer.addRecord(new ConsumerRecord(str, 0, 0L, new byte[0], cloudEvent));
    }
}
