package org.kie.hacep;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.command.FireUntilHaltCommand;
import org.kie.remote.command.InsertCommand;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.message.ControlMessage;
import org.kie.remote.util.KafkaRemoteUtil;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/hacep/PodAsLeaderTest.class */
public class PodAsLeaderTest extends KafkaFullTopicsTests {
    private Logger logger = LoggerFactory.getLogger("org.hacep");

    @Test(timeout = 30000)
    public void processOneSentMessageAsLeaderTest() {
        Bootstrap.startEngine(this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer consumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig("eventsConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer consumer2 = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig("controlConsumerProcessOneSentMessageAsLeaderTest"));
        Properties properties = (Properties) Config.getProducerConfig("InsertBatchStockTickets").clone();
        properties.put("skip.listener.autostart", true);
        this.logger.warn("Insert Stock Ticket event");
        this.kafkaServerTest.insertBatchStockTicketEvent(1, this.topicsConfig, RemoteKieSession.class, properties, KafkaRemoteUtil.getListener(properties, false));
        try {
            try {
                this.logger.warn("Checks on Events topic");
                AtomicReference atomicReference = new AtomicReference();
                AtomicReference atomicReference2 = new AtomicReference();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                while (atomicInteger.get() < 2) {
                    consumer.poll(Duration.ofSeconds(2L)).forEach(obj -> {
                        ConsumerRecord consumerRecord = (ConsumerRecord) obj;
                        Assert.assertNotNull(consumerRecord);
                        Assert.assertEquals(consumerRecord.topic(), this.envConfig.getEventsTopicName());
                        Assert.assertEquals(consumerRecord.offset(), atomicInteger.get());
                        RemoteCommand remoteCommand = (RemoteCommand) SerializationUtil.deserialize((byte[]) consumerRecord.value());
                        this.logger.warn("Event {}:{} offset:{}", new Object[]{Integer.valueOf(atomicInteger.get()), remoteCommand, Long.valueOf(consumerRecord.offset())});
                        Assert.assertNotNull(remoteCommand.getId());
                        if (atomicInteger.get() == 0) {
                            atomicReference.set(consumerRecord);
                            Assert.assertTrue(remoteCommand instanceof FireUntilHaltCommand);
                        }
                        if (atomicInteger.get() == 1) {
                            Assert.assertTrue(remoteCommand instanceof InsertCommand);
                            atomicReference2.set(consumerRecord);
                        }
                        atomicInteger.incrementAndGet();
                    });
                    this.logger.warn("Attempt number:{}", Integer.valueOf(atomicInteger2.incrementAndGet()));
                    if (atomicInteger2.get() == 10) {
                        throw new RuntimeException("No Events message available after " + atomicInteger2 + "attempts.");
                    }
                }
                this.logger.warn("Checks on Control topic");
                ArrayList arrayList = new ArrayList();
                atomicInteger2.set(0);
                while (arrayList.size() < 2) {
                    consumer2.poll(Duration.ofSeconds(2L)).forEach(obj2 -> {
                        ConsumerRecord consumerRecord = (ConsumerRecord) obj2;
                        Assert.assertNotNull(consumerRecord);
                        ControlMessage controlMessage = (ControlMessage) SerializationUtil.deserialize((byte[]) consumerRecord.value());
                        controlMessage.setOffset(consumerRecord.offset());
                        this.logger.warn("Control message found:{}", controlMessage);
                        arrayList.add(controlMessage);
                    });
                    this.logger.warn("Attempt number:{}", Integer.valueOf(atomicInteger2.incrementAndGet()));
                    if (atomicInteger2.get() == 10) {
                        throw new RuntimeException("No control message available after " + atomicInteger2 + "attempts.");
                    }
                }
                Assert.assertEquals(2L, arrayList.size());
                AtomicReference atomicReference3 = new AtomicReference();
                AtomicReference atomicReference4 = new AtomicReference();
                atomicInteger.set(0);
                arrayList.forEach(controlMessage -> {
                    if (atomicInteger.get() == 0) {
                        Assert.assertNotNull(controlMessage);
                        atomicReference3.set(controlMessage);
                    }
                    if (atomicInteger.get() == 1) {
                        Assert.assertNotNull(controlMessage);
                        atomicReference4.set(controlMessage);
                    }
                    atomicInteger.incrementAndGet();
                });
                Assert.assertEquals(((ControlMessage) atomicReference3.get()).getId(), ((ConsumerRecord) atomicReference.get()).key());
                Assert.assertTrue(((ControlMessage) atomicReference3.get()).getSideEffects().isEmpty());
                Assert.assertEquals(((ControlMessage) atomicReference4.get()).getId(), ((ConsumerRecord) atomicReference2.get()).key());
                Assert.assertTrue(!((ControlMessage) atomicReference4.get()).getSideEffects().isEmpty());
                this.logger.warn("Test ended, going to stop kafka");
                consumer.close();
                this.logger.warn("Event consumer closed");
                consumer2.close();
                this.logger.warn("Control consumer closed");
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            consumer.close();
            this.logger.warn("Event consumer closed");
            consumer2.close();
            this.logger.warn("Control consumer closed");
            throw th;
        }
    }
}
