/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.resteasy.plugins.providers.sse;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
import org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl;

public class SseBroadcasterImpl
implements SseBroadcaster {
    private ConcurrentLinkedQueue<SseEventSink> outputQueue = new ConcurrentLinkedQueue();
    private final List<BiConsumer<SseEventSink, Throwable>> onErrorConsumers = new CopyOnWriteArrayList<BiConsumer<SseEventSink, Throwable>>();
    private final List<Consumer<SseEventSink>> closeConsumers = new CopyOnWriteArrayList<Consumer<SseEventSink>>();

    public void close() {
        this.outputQueue.forEach(evenSink -> {
            evenSink.close();
            this.closeConsumers.forEach(consumer -> consumer.accept(evenSink));
        });
        this.outputQueue.clear();
    }

    public void onError(BiConsumer<SseEventSink, Throwable> onError) {
        this.onErrorConsumers.add(onError);
    }

    public void onClose(Consumer<SseEventSink> onClose) {
        this.closeConsumers.add(onClose);
    }

    public void register(SseEventSink sseEventSink) {
        this.outputQueue.add(sseEventSink);
    }

    public CompletionStage<?> broadcast(OutboundSseEvent event) {
        return CompletableFuture.runAsync(() -> this.outputQueue.forEach(eventSink -> {
            SseEventOutputImpl outputImpl = (SseEventOutputImpl)((Object)((Object)eventSink));
            if (!outputImpl.isClosed()) {
                outputImpl.send(event, this.callAllErrConsumers());
            } else {
                this.outputQueue.remove(eventSink);
            }
        }));
    }

    BiConsumer<SseEventSink, Throwable> callAllErrConsumers() {
        return (eventSink, err) -> this.onErrorConsumers.forEach(consumer -> consumer.accept(eventSink, err));
    }
}

