package org.jbpm.springboot.samples;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ProcessService;
import org.jbpm.springboot.samples.events.listeners.CountDownLatchEventListener;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.rules.SpringClassRule;
import org.springframework.test.context.junit4.rules.SpringMethodRule;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;

@SpringBootTest(classes = {JBPMApplication.class, TestAutoConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@TestPropertySource(locations = {"classpath:application-test.properties"})
@RunWith(Parameterized.class)
/* loaded from: input_file:org/jbpm/springboot/samples/KafkaProxySampleTest.class */
public class KafkaProxySampleTest extends KafkaBaseTest {

    @Rule
    public final SpringMethodRule smr = new SpringMethodRule();

    @Rule
    public Network network = Network.newNetwork();

    @Rule
    public KafkaContainer kafka = new KafkaContainer().withExposedPorts(new Integer[]{Integer.valueOf(TOXY_PROXY_PORT)}).withNetwork(this.network);

    @Rule
    public ToxiproxyContainer toxiproxy = new ToxiproxyContainer().withNetwork(this.network);

    @Autowired
    private DeploymentService deploymentService;

    @Autowired
    private ProcessService processService;

    @Autowired
    CountDownLatchEventListener countDownLatchEventListener;
    protected String deploymentId;
    protected String proxyBootstrap;
    protected ToxiproxyContainer.ContainerProxy kafkaProxy;
    private static final Logger logger = LoggerFactory.getLogger(KafkaProxySampleTest.class);
    private static final int TOXY_PROXY_PORT = Integer.parseInt(System.getProperty("toxiproxy.port"));

    @ClassRule
    public static final SpringClassRule scr = new SpringClassRule();
    protected static KafkaFixture kafkaFixture = new KafkaFixture();

    @BeforeClass
    public static void generalSetup() {
        kafkaFixture.generalSetup();
    }

    @Before
    public void setup() throws IOException, InterruptedException {
        this.toxiproxy.start();
        this.kafkaProxy = this.toxiproxy.getProxy(this.kafka, TOXY_PROXY_PORT);
        this.kafka.start();
        kafkaFixture.createTopic(this.kafka);
        this.proxyBootstrap = this.kafkaProxy.getContainerIpAddress() + ":" + this.kafkaProxy.getProxyPort();
        System.setProperty("bootstrap.servers", this.proxyBootstrap);
        System.setProperty("client.id", "test_jbpm");
        System.setProperty("key.serializer", StringSerializer.class.getName());
        System.setProperty("value.serializer", StringSerializer.class.getName());
        this.deploymentId = kafkaFixture.setup(this.deploymentService, this.strategy);
        this.countDownLatchEventListener.setVariable("kafka-result");
    }

    @After
    public void cleanup() {
        this.kafka.stop();
        this.toxiproxy.stop();
        kafkaFixture.cleanup(this.deploymentService);
    }

    @Test(timeout = 240000)
    public void testKafkaWIHNoConnection() throws Exception {
        this.countDownLatchEventListener.configureNode("kafka-process.kafka-process", "TaskErrorAfterKafkaMessageSent", 2);
        this.kafkaProxy.setConnectionCut(true);
        Assert.assertTrue(this.processService.startProcess(this.deploymentId, "kafka-process.kafka-process", kafkaFixture.getProcessVariables()).longValue() > 0);
        this.countDownLatchEventListener.getCountDown().await();
        Assert.assertEquals("failure", (String) this.countDownLatchEventListener.getResult());
    }

    @Test(timeout = 60000)
    public void testKafkaWIHReconnect() throws Exception {
        this.countDownLatchEventListener.configure("kafka-process.kafka-process", 1);
        this.kafkaProxy.setConnectionCut(true);
        reconnectProxyLater(10);
        Assert.assertTrue(this.processService.startProcess(this.deploymentId, "kafka-process.kafka-process", kafkaFixture.getProcessVariables()).longValue() > 0);
        kafkaFixture.assertConsumerMessages(this.proxyBootstrap);
        this.countDownLatchEventListener.getCountDown().await();
        Assert.assertEquals("success", this.countDownLatchEventListener.getResult());
    }

    private void reconnectProxyLater(int i) {
        new Thread(() -> {
            try {
                new CountDownLatch(1).await(i, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            this.kafkaProxy.setConnectionCut(false);
        }).start();
    }
}
