package org.kie.server.springboot.samples;

import eu.rekawek.toxiproxy.model.Toxic;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.util.Iterator;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ProcessService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kie.server.springboot.samples.kafka.KieServerApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;

@SpringBootTest(classes = {KieServerApplication.class, TestAutoConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
@TestPropertySource(locations = {"classpath:application-test.properties"})
@RunWith(SpringJUnit4ClassRunner.class)
/* loaded from: input_file:org/kie/server/springboot/samples/ProxyAwareKafkaEmitterTest.class */
public class ProxyAwareKafkaEmitterTest extends KafkaFixture {
    protected static final int TOXY_PROXY_PORT = Integer.parseInt(System.getProperty("toxiproxy.port"));
    protected static Network network = Network.newNetwork();
    protected static ProxyAwareKafkaContainer kafka = (ProxyAwareKafkaContainer) new ProxyAwareKafkaContainer().withExposedPorts(new Integer[]{Integer.valueOf(TOXY_PROXY_PORT)}).withNetwork(network);
    protected static ToxiproxyContainer toxiproxy = new ToxiproxyContainer().withNetwork(network);
    protected static ToxiproxyContainer.ContainerProxy kafkaProxy;

    @Autowired
    protected DeploymentService deploymentService;

    @Autowired
    protected ProcessService processService;
    protected String deploymentId;

    @BeforeClass
    public static void beforeClass() {
        Assume.assumeTrue(isDockerAvailable());
        System.setProperty("org.kie.jbpm.event.emitters.kafka.topic.processes", "custom-processes");
        toxiproxy.start();
        kafkaProxy = toxiproxy.getProxy(kafka, TOXY_PROXY_PORT);
        String str = kafkaProxy.getContainerIpAddress() + ":" + kafkaProxy.getProxyPort();
        kafka.setHost(str);
        kafka.start();
        bootstrapServers = str;
        generalSetup(true);
    }

    @After
    public void cleanup() {
        cleanup(this.deploymentService);
    }

    @AfterClass
    public static void teardown() {
        kafka.stop();
        toxiproxy.stop();
        System.clearProperty("org.kie.jbpm.event.emitters.kafka.topic.processes");
        System.clearProperty("org.kie.jbpm.event.emitters.kafka.boopstrap.servers");
        System.clearProperty("org.kie.jbpm.event.emitters.kafka.client.id");
    }

    @Test(timeout = 30000)
    public void testKafkaEmitterProcessStartAndAbortWithToxics() throws Exception {
        this.deploymentId = setup(this.deploymentService, "evaluation");
        Long startProcess = this.processService.startProcess(this.deploymentId, "evaluation", initParameters());
        Assert.assertNotNull(startProcess);
        Assert.assertTrue(startProcess.longValue() > 0);
        consumeAndAssertRecords("custom-processes", "process", 1, 1);
        kafkaProxy.toxics().timeout("timing", ToxicDirection.DOWNSTREAM, 0L);
        this.processService.abortProcessInstance(startProcess);
        Assert.assertNull(this.processService.getProcessInstance(startProcess));
        Iterator it = kafkaProxy.toxics().getAll().iterator();
        while (it.hasNext()) {
            ((Toxic) it.next()).remove();
        }
        consumeAndAssertRecords("custom-processes", "process", 3, 1);
    }
}
