/*
 * Decompiled with CFR 0.152.
 */
package org.kie.server.springboot.samples;

import eu.rekawek.toxiproxy.model.Toxic;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ProcessService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.server.springboot.samples.KafkaFixture;
import org.kie.server.springboot.samples.ProxyAwareKafkaContainer;
import org.kie.server.springboot.samples.TestAutoConfiguration;
import org.kie.server.springboot.samples.kafka.KieServerApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;

@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest(classes={KieServerApplication.class, TestAutoConfiguration.class}, webEnvironment=SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(locations={"classpath:application-test.properties"})
@DirtiesContext(classMode=DirtiesContext.ClassMode.BEFORE_CLASS)
public class ProxyAwareKafkaEmitterTest
extends KafkaFixture {
    protected static final int TOXY_PROXY_PORT = Integer.parseInt(System.getProperty("toxiproxy.port"));
    protected static Network network = Network.newNetwork();
    protected static ProxyAwareKafkaContainer kafka = (ProxyAwareKafkaContainer)((KafkaContainer)new ProxyAwareKafkaContainer().withExposedPorts(new Integer[]{TOXY_PROXY_PORT})).withNetwork(network);
    protected static ToxiproxyContainer toxiproxy = (ToxiproxyContainer)new ToxiproxyContainer().withNetwork(network);
    protected static ToxiproxyContainer.ContainerProxy kafkaProxy;
    @Autowired
    protected DeploymentService deploymentService;
    @Autowired
    protected ProcessService processService;
    protected String deploymentId;

    @BeforeClass
    public static void beforeClass() {
        System.setProperty("org.kie.jbpm.event.emitters.kafka.topic.processes", "custom-processes");
        toxiproxy.start();
        kafkaProxy = toxiproxy.getProxy((GenericContainer)kafka, TOXY_PROXY_PORT);
        String proxyBootstrapServer = kafkaProxy.getContainerIpAddress() + ":" + kafkaProxy.getProxyPort();
        kafka.setHost(proxyBootstrapServer);
        kafka.start();
        bootstrapServers = proxyBootstrapServer;
        ProxyAwareKafkaEmitterTest.generalSetup(true);
    }

    @After
    public void cleanup() {
        this.cleanup(this.deploymentService);
    }

    @AfterClass
    public static void teardown() {
        kafka.stop();
        toxiproxy.stop();
        System.clearProperty("org.kie.jbpm.event.emitters.kafka.topic.processes");
        System.clearProperty("org.kie.jbpm.event.emitters.kafka.boopstrap.servers");
        System.clearProperty("org.kie.jbpm.event.emitters.kafka.client.id");
    }

    @Test(timeout=30000L)
    public void testKafkaEmitterProcessStartAndAbortWithToxics() throws Exception {
        this.deploymentId = this.setup(this.deploymentService, "evaluation");
        Long processInstanceId = this.processService.startProcess(this.deploymentId, "evaluation", this.initParameters());
        Assert.assertNotNull((Object)processInstanceId);
        Assert.assertTrue((processInstanceId > 0L ? 1 : 0) != 0);
        this.consumeAndAssertRecords("custom-processes", "process", 1, 1);
        kafkaProxy.toxics().timeout("timing", ToxicDirection.DOWNSTREAM, 0L);
        this.processService.abortProcessInstance(processInstanceId);
        ProcessInstance pi = this.processService.getProcessInstance(processInstanceId);
        Assert.assertNull((Object)pi);
        for (Toxic t : kafkaProxy.toxics().getAll()) {
            t.remove();
        }
        this.consumeAndAssertRecords("custom-processes", "process", 3, 1);
    }
}

