/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.resteasy.plugins.providers.sse;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.resteasy_jaxrs.i18n.Messages;

public class SseBroadcasterImpl
implements SseBroadcaster {
    private ConcurrentLinkedQueue<SseEventSink> outputQueue = new ConcurrentLinkedQueue();
    private final List<BiConsumer<SseEventSink, Throwable>> onErrorConsumers = new CopyOnWriteArrayList<BiConsumer<SseEventSink, Throwable>>();
    private final List<Consumer<SseEventSink>> closeConsumers = new CopyOnWriteArrayList<Consumer<SseEventSink>>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final Lock readLock;
    private final Lock writeLock;

    public SseBroadcasterImpl() {
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.writeLock.lock();
        try {
            this.outputQueue.forEach(eventSink -> {
                eventSink.close();
                try {
                    eventSink.close();
                }
                catch (RuntimeException e) {
                    LogMessages.LOGGER.debug(e.getLocalizedMessage());
                }
                finally {
                    this.notifyOnCloseListeners((SseEventSink)eventSink);
                }
            });
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void checkClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException(Messages.MESSAGES.sseBroadcasterIsClosed());
        }
    }

    private void notifyOnCloseListeners(SseEventSink eventSink) {
        if (this.outputQueue.remove(eventSink)) {
            this.closeConsumers.forEach(consumer -> consumer.accept(eventSink));
        }
    }

    private void notifyOnErrorListeners(SseEventSink eventSink, Throwable throwable) {
        if (throwable instanceof IOException || throwable instanceof IllegalStateException) {
            this.notifyOnCloseListeners(eventSink);
        }
        this.onErrorConsumers.forEach(consumer -> consumer.accept(eventSink, throwable));
    }

    @Override
    public void onError(BiConsumer<SseEventSink, Throwable> onError2) {
        this.checkClosed();
        this.onErrorConsumers.add(onError2);
    }

    @Override
    public void onClose(Consumer<SseEventSink> onClose) {
        this.checkClosed();
        this.closeConsumers.add(onClose);
    }

    @Override
    public void register(SseEventSink sseEventSink) {
        this.checkClosed();
        this.readLock.lock();
        try {
            this.checkClosed();
            this.outputQueue.add(sseEventSink);
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public CompletionStage<?> broadcast(OutboundSseEvent event) {
        this.checkClosed();
        CompletionStage<Object> ret = CompletableFuture.completedFuture(null);
        for (SseEventSink eventSink : this.outputQueue) {
            ret = ret.thenCompose(v -> {
                try {
                    return eventSink.send(event).exceptionally(err -> {
                        this.notifyOnErrorListeners(eventSink, (Throwable)err);
                        return null;
                    });
                }
                catch (Exception e) {
                    this.notifyOnErrorListeners(eventSink, e);
                    return CompletableFuture.completedFuture(null);
                }
            });
        }
        return ret;
    }
}

