package org.kie.kogito.rules.units;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.drools.core.common.EventFactHandle;
import org.kie.api.time.SessionPseudoClock;
import org.kie.kogito.rules.DataProcessor;
import org.kie.kogito.rules.DataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kogito-ruleunits-1.14.0-SNAPSHOT.jar:org/kie/kogito/rules/units/EventListDataStream.class */
public class EventListDataStream<T> implements DataStream<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) EventListDataStream.class);
    private final ArrayList<T> values = new ArrayList<>();
    private final List<DataProcessor> subscribers = new ArrayList();

    @SafeVarargs
    public static <T> EventListDataStream<T> create(T... tArr) {
        EventListDataStream<T> eventListDataStream = new EventListDataStream<>();
        for (T t : tArr) {
            eventListDataStream.append(t);
        }
        return eventListDataStream;
    }

    @Override // org.kie.kogito.rules.DataStream
    public void append(T t) {
        this.values.add(t);
        Iterator<DataProcessor> it = this.subscribers.iterator();
        while (it.hasNext()) {
            insertAndAdvanceClock(t, it.next());
        }
    }

    @Override // org.kie.kogito.rules.DataSource
    public void subscribe(DataProcessor dataProcessor) {
        this.subscribers.add(dataProcessor);
        this.values.forEach(obj -> {
            insertAndAdvanceClock(obj, dataProcessor);
        });
    }

    private void insertAndAdvanceClock(T t, DataProcessor dataProcessor) {
        EventFactHandle eventFactHandle = (EventFactHandle) dataProcessor.insert(null, t);
        long startTimestamp = eventFactHandle.getStartTimestamp();
        SessionPseudoClock sessionPseudoClock = (SessionPseudoClock) eventFactHandle.getEntryPoint(null).getReteEvaluator().getSessionClock();
        long currentTime = startTimestamp - sessionPseudoClock.getCurrentTime();
        if (currentTime > 0) {
            sessionPseudoClock.advanceTime(currentTime, TimeUnit.MILLISECONDS);
        } else if (currentTime < 0) {
            LOGGER.warn("Received an event with a timestamp that is " + (-currentTime) + " milliseconds in the past. Evaluation of out of order events could lead to unpredictable results.");
        }
    }
}
