package org.kie.server.springboot.samples;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.jbpm.kie.services.impl.KModuleDeploymentUnit;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ProcessService;
import org.jbpm.services.api.RuntimeDataService;
import org.jbpm.services.api.model.ProcessInstanceDesc;
import org.junit.Assert;
import org.junit.Assume;
import org.kie.api.runtime.query.QueryContext;
import org.kie.server.services.jbpm.kafka.KafkaServerExtension;
import org.kie.server.springboot.samples.utils.KieJarBuildHelper;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:org/kie/server/springboot/samples/KafkaFixture.class */
public abstract class KafkaFixture {
    protected static final String GROUP_ID = "org.kie.server.testing";
    protected static final String VERSION = "1.0.0.Final";
    protected static final String KAFKA_EXTENSION_PREFIX = "org.kie.server.jbpm-kafka.ext.";
    protected static final String SIGNAL_MAPPING_PROPERTY = "org.kie.server.jbpm-kafka.ext.signals.mapping";
    protected static final String MESSAGE_MAPPING_PROPERTY = "org.kie.server.jbpm-kafka.ext.message.mapping";
    protected static final String AUTO = "AUTO";
    protected static final String NONE = "NONE";
    protected static final String SIGNALLING_PROJECT = "signalling-project";
    protected static final String START_SIGNAL_PROCESS_ID = "StartSignalProcess";
    protected static final String START_MESSAGE_PROCESS_ID = "StartMessageProcess";
    protected static final String START_MESSAGE_POJO_PROCESS_ID = "StartMessagePojoProcess";
    protected static final String START_MESSAGE_COMPLEX_POJO_PROCESS_ID = "StartMessageComplexPojoProcess";
    protected static final String START_MESSAGE_POJO_CLASS_NOT_FOUND_PROCESS_ID = "StartMessagePojoClassNotFoundProcess";
    protected static final String BOUNDARY_SIGNAL_PROCESS_ID = "BoundarySignalProcess";
    protected static final String BOUNDARY_MESSAGE_PROCESS_ID = "BoundaryMessageProcess";
    protected static final String INTERMEDIATE_CATCH_EVENT_SIGNAL_PROCESS_ID = "IntermediateCatchEventSignal";
    protected static final String INTERMEDIATE_CATCH_EVENT_MESSAGE_PROCESS_ID = "IntermediateCatchEventMessage";
    protected static final String SUBPROCESS_SIGNAL_PROCESS_ID = "SubprocessSignalProcess";
    protected static final String SUBPROCESS_MESSAGE_PROCESS_ID = "SubprocessMessageProcess";
    protected static final String SEND_PROJECT = "send-project";
    protected static final String INTERMEDIATE_THROW_EVENT_MESSAGE_PROCESS_ID = "IntermediateThrowEventMessage";
    protected static final String INTERMEDIATE_THROW_EVENT_SIGNAL_PROCESS_ID = "IntermediateThrowEventSignal";
    protected static final String END_SIGNAL_PROCESS_ID = "EndSignalProcess";
    protected static final String END_MESSAGE_PROCESS_ID = "EndMessageProcess";
    protected static final String END_MESSAGE_OUTPUT_POJO_PROCESS_ID = "EndMessageOutputPojo";
    protected static final String PARALLEL_INTERMEDIATE_THROW_EVENT_MESSAGE_PROCESS_ID = "ParallelIntermediateThrowEventMessages";
    protected static final String PARALLEL_INTERMEDIATE_THROW_EVENT_SIGNAL_PROCESS_ID = "ParallelIntermediateThrowEventSignals";
    protected static final String ALT_PROJECT = "alt-project";
    protected static final String BOUNDARY_SIGNAL = "boundarySignal";
    protected static final String BOUNDARY_MESSAGE = "boundaryMessage";
    protected static final String START_SIGNAL = "startSignal";
    protected static final String START_MESSAGE = "startMessage";
    protected static final String START_MESSAGE_POJO = "startMessagePojo";
    protected static final String START_MESSAGE_COMPLEX_POJO = "startMessageComplexPojo";
    protected static final String START_MESSAGE_POJO_CLASS_NOT_FOUND = "startMessagePojoClassNotFound";
    protected static final String INTERMEDIATE_SIGNAL = "intermediateSignal";
    protected static final String INTERMEDIATE_MESSAGE = "intermediateMessage";
    protected static final String SUBPROCESS_SIGNAL = "subprocessSignal";
    protected static final String SUBPROCESS_MESSAGE = "subprocessMessage";
    protected static final String END_SIGNAL = "endSignal";
    protected static final String END_MESSAGE = "endMessage";
    protected static final String END_MESSAGE_OUTPUT_POJO = "endMessageOutputPojo";
    protected static final String SUBPROCESS_SCRIPT_NODE = "sub-script";
    protected static final String PATH = "src/test/resources/kjars/";
    protected static final String VARIABLES = "variables";
    protected static final String JOHN = "john";
    protected static String bootstrapServers;
    protected static KafkaContainer kafka = new KafkaContainer();
    protected static Properties props = new Properties();

    public static void generalSetup() {
        Assume.assumeTrue(isDockerAvailable());
        kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
        kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
        kafka.start();
        bootstrapServers = kafka.getBootstrapServers();
        System.setProperty("org.kie.server.jbpm-kafka.ext.bootstrap.servers", bootstrapServers);
        EntityManagerFactoryManager.get().clear();
        props.put("bootstrap.servers", bootstrapServers);
        createTopics();
    }

    public static void createTopics() {
        try {
            AdminClient create = AdminClient.create(props);
            try {
                create.createTopics(Arrays.asList(new NewTopic(BOUNDARY_SIGNAL, 1, (short) 1), new NewTopic(BOUNDARY_MESSAGE, 1, (short) 1), new NewTopic(START_SIGNAL, 1, (short) 1), new NewTopic(START_MESSAGE, 1, (short) 1), new NewTopic(START_MESSAGE_POJO, 1, (short) 1), new NewTopic(START_MESSAGE_COMPLEX_POJO, 1, (short) 1), new NewTopic(START_MESSAGE_POJO_CLASS_NOT_FOUND, 1, (short) 1), new NewTopic(INTERMEDIATE_SIGNAL, 1, (short) 1), new NewTopic(INTERMEDIATE_MESSAGE, 1, (short) 1), new NewTopic(SUBPROCESS_SIGNAL, 1, (short) 1), new NewTopic(SUBPROCESS_MESSAGE, 1, (short) 1), new NewTopic(END_SIGNAL, 1, (short) 1), new NewTopic(END_MESSAGE, 1, (short) 1), new NewTopic(END_MESSAGE_OUTPUT_POJO, 1, (short) 1))).all().get(1L, TimeUnit.MINUTES);
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            Assert.fail("Exception when creating topics: " + e.getMessage());
        }
    }

    public KModuleDeploymentUnit setup(DeploymentService deploymentService, String str) {
        System.setProperty(SIGNAL_MAPPING_PROPERTY, AUTO);
        KieJarBuildHelper.createKieJar(PATH + str);
        KModuleDeploymentUnit kModuleDeploymentUnit = new KModuleDeploymentUnit(GROUP_ID, str, VERSION);
        deploymentService.deploy(kModuleDeploymentUnit);
        return kModuleDeploymentUnit;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanup(DeploymentService deploymentService, KModuleDeploymentUnit kModuleDeploymentUnit) {
        if (deploymentService != null && kModuleDeploymentUnit != null) {
            deploymentService.undeploy(kModuleDeploymentUnit);
        }
        System.clearProperty(MESSAGE_MAPPING_PROPERTY);
        System.clearProperty(SIGNAL_MAPPING_PROPERTY);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abortAllProcesses(RuntimeDataService runtimeDataService, ProcessService processService) {
        Collection<ProcessInstanceDesc> processInstances;
        if (runtimeDataService == null || processService == null || (processInstances = runtimeDataService.getProcessInstances(Collections.singletonList(1), (String) null, (QueryContext) null)) == null) {
            return;
        }
        for (ProcessInstanceDesc processInstanceDesc : processInstances) {
            processService.abortProcessInstance(processInstanceDesc.getDeploymentId(), processInstanceDesc.getId());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForConsumerGroupToBeReady() {
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).pollDelay(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!listConsumerGroups().isEmpty());
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [java.util.List] */
    protected List<String> listConsumerGroups() {
        ArrayList arrayList = new ArrayList();
        try {
            AdminClient create = AdminClient.create(props);
            try {
                arrayList = (List) ((Collection) create.listConsumerGroups().all().get()).stream().map((v0) -> {
                    return v0.groupId();
                }).collect(Collectors.toList());
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            Assert.fail("Exception when listConsumerGroups: " + e.getMessage());
        }
        return arrayList;
    }

    protected Properties producerProps(String str, boolean z) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        if (z) {
            properties.put("transactional.id", "my-transactional-id");
        }
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ListAppender<ILoggingEvent> addLogAppender() {
        Logger logger = LoggerFactory.getLogger(KafkaServerExtension.class.getPackage().getName());
        ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
        listAppender.start();
        logger.addAppender(listAppender);
        return listAppender;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<ILoggingEvent> getErrorLog(ListAppender<ILoggingEvent> listAppender) {
        Optional<ILoggingEvent> findAny = listAppender.list.stream().filter(iLoggingEvent -> {
            return iLoggingEvent.getLevel() == Level.ERROR;
        }).findAny();
        Assert.assertTrue("no trace printed when failed", findAny.isPresent());
        return findAny;
    }

    public void sendRecord(String str, String str2) {
        KafkaProducer<String, byte[]> kafkaProducer = new KafkaProducer<>(producerProps(bootstrapServers, false));
        try {
            send(kafkaProducer, str, str2);
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void send(KafkaProducer<String, byte[]> kafkaProducer, String str, String str2) {
        try {
            kafkaProducer.send(new ProducerRecord(str, str2.getBytes())).get();
        } catch (InterruptedException | ExecutionException e) {
            Assert.fail("Not expected exception: " + e.getMessage());
        }
    }

    public void sendTransactionalRecords(Map<String, List<String>> map) {
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps(bootstrapServers, true));
        try {
            kafkaProducer.initTransactions();
            kafkaProducer.beginTransaction();
            map.entrySet().forEach(entry -> {
                ((List) entry.getValue()).forEach(str -> {
                    send(kafkaProducer, (String) entry.getKey(), str);
                });
            });
            kafkaProducer.commitTransaction();
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void consumeAndAssertRecords(String str, int i) {
        ConsumerRecords consumeMessages = consumeMessages(str);
        Assert.assertEquals(i, consumeMessages.count());
        Iterator it = consumeMessages.iterator();
        while (it.hasNext()) {
            assertRecord((ConsumerRecord) it.next(), str);
        }
    }

    protected void assertRecord(ConsumerRecord<String, byte[]> consumerRecord, String str) {
        Map<String, Object> jsonObject = getJsonObject(consumerRecord);
        Assert.assertNull(consumerRecord.key());
        Assert.assertEquals(str, consumerRecord.topic());
        Assert.assertEquals("1.0", jsonObject.get("specversion"));
        Assert.assertTrue(jsonObject.get("data").toString().contains("my-value"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, Object> getJsonObject(ConsumerRecord<String, byte[]> consumerRecord) {
        Map<String, Object> map = null;
        try {
            map = (Map) new ObjectMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).readValue((byte[]) consumerRecord.value(), Map.class);
        } catch (IOException e) {
            Assert.fail("Exception when reading value: " + e.getMessage());
        }
        return map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> ConsumerRecords<String, T> consumeMessages(String str) {
        KafkaConsumer<String, T> createConsumer = createConsumer(str);
        try {
            ConsumerRecords<String, T> poll = createConsumer.poll(Duration.ofSeconds(5L));
            createConsumer.commitSync();
            if (createConsumer != null) {
                createConsumer.close();
            }
            return poll;
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected <T> KafkaConsumer<String, T> createConsumer(String str) {
        KafkaConsumer<String, T> kafkaConsumer = new KafkaConsumer<>(consumerProperties());
        kafkaConsumer.subscribe(Collections.singletonList(str));
        return kafkaConsumer;
    }

    protected Properties consumerProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrapServers);
        properties.setProperty("group.id", "jbpm_test_consumer");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

    private static boolean isDockerAvailable() {
        try {
            DockerClientFactory.instance().client();
            return true;
        } catch (Throwable th) {
            return false;
        }
    }
}
