package org.apache.cxf.jaxrs.sse;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.ext.MessageBodyWriter;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.SseEventSink;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;

/* loaded from: input_file:org/apache/cxf/jaxrs/sse/SseEventSinkImpl.class */
public class SseEventSinkImpl implements SseEventSink {
    public static final String BUFFER_SIZE_PROPERTY = "org.apache.cxf.sse.sink.buffer.size";
    private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation[0];
    private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
    private static final int DEFAULT_BUFFER_SIZE = 10000;
    private final AsyncContext ctx;
    private final MessageBodyWriter<OutboundSseEvent> writer;
    private final Queue<QueuedEvent> buffer;
    private final AtomicBoolean closed;
    private final AtomicBoolean dispatching;
    private final AtomicReference<Throwable> throwable;
    private final AtomicBoolean completed;
    private final int bufferSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cxf/jaxrs/sse/SseEventSinkImpl$QueuedEvent.class */
    public static class QueuedEvent {
        private final OutboundSseEvent event;
        private final CompletableFuture<?> completion;

        QueuedEvent(OutboundSseEvent outboundSseEvent, CompletableFuture<?> completableFuture) {
            this.event = outboundSseEvent;
            this.completion = completableFuture;
        }
    }

    public SseEventSinkImpl(MessageBodyWriter<OutboundSseEvent> messageBodyWriter, AsyncResponse asyncResponse, AsyncContext asyncContext) {
        this(messageBodyWriter, asyncResponse, asyncContext, DEFAULT_BUFFER_SIZE);
    }

    public SseEventSinkImpl(MessageBodyWriter<OutboundSseEvent> messageBodyWriter, AsyncResponse asyncResponse, AsyncContext asyncContext, int i) {
        this.closed = new AtomicBoolean(false);
        this.dispatching = new AtomicBoolean(false);
        this.throwable = new AtomicReference<>();
        this.completed = new AtomicBoolean(false);
        this.writer = messageBodyWriter;
        this.buffer = new ArrayBlockingQueue(i);
        this.ctx = asyncContext;
        this.bufferSize = i;
        if (asyncContext == null) {
            throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. Is the Servlet configured properly?");
        }
        asyncContext.getResponse().setContentType("text/event-stream");
        asyncContext.addListener(new AsyncListener() { // from class: org.apache.cxf.jaxrs.sse.SseEventSinkImpl.1
            public void onComplete(AsyncEvent asyncEvent) throws IOException {
                SseEventSinkImpl.this.close();
            }

            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
            }

            public void onError(AsyncEvent asyncEvent) throws IOException {
                if (SseEventSinkImpl.this.throwable.get() != null || SseEventSinkImpl.this.throwable.compareAndSet(null, asyncEvent.getThrowable())) {
                    SseEventSinkImpl.this.close();
                }
            }

            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
            }
        });
    }

    public AsyncContext getAsyncContext() {
        return this.ctx;
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        LOG.fine("Closing SSE sink now");
        if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
            LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
        }
        if (this.completed.compareAndSet(false, true)) {
            try {
                if (this.ctx.getRequest() != null) {
                    LOG.fine("Completing the AsyncContext");
                    this.ctx.complete();
                }
            } catch (IllegalStateException e) {
                LOG.fine("Failed to close the AsyncContext cleanly: " + e.getMessage());
            }
        }
        Throwable th = this.throwable.get();
        if (th == null) {
            th = new IllegalStateException("The sink has been already closed");
        }
        QueuedEvent poll = this.buffer.poll();
        while (true) {
            QueuedEvent queuedEvent = poll;
            if (queuedEvent == null) {
                return;
            }
            queuedEvent.completion.completeExceptionally(th);
            poll = this.buffer.poll();
        }
    }

    private boolean awaitQueueToDrain(int i, TimeUnit timeUnit) {
        long nanos = timeUnit.toNanos(i) / 20;
        int i2 = 0;
        while (this.dispatching.get()) {
            i2++;
            if (i2 >= 20) {
                break;
            }
            LockSupport.parkNanos(nanos);
        }
        return this.buffer.isEmpty();
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public CompletionStage<?> send(OutboundSseEvent outboundSseEvent) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.closed.get() || this.writer == null) {
            completableFuture.completeExceptionally(new IllegalStateException("The sink is already closed, unable to queue SSE event for send"));
        } else {
            Throwable th = this.throwable.get();
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else if (!this.buffer.offer(new QueuedEvent(outboundSseEvent, completableFuture))) {
                completableFuture.completeExceptionally(new IllegalStateException("The buffer is full (" + this.bufferSize + "), unable to queue SSE event for send. Please use 'org.apache.cxf.sse.sink.buffer.size' property to increase the limit."));
            } else if (this.dispatching.compareAndSet(false, true)) {
                this.ctx.start(this::dequeue);
            }
        }
        return completableFuture;
    }

    /* JADX WARN: Finally extract failed */
    private void dequeue() {
        Throwable th = this.throwable.get();
        while (true) {
            try {
                QueuedEvent poll = this.buffer.poll();
                if (poll == null) {
                    break;
                }
                OutboundSseEvent outboundSseEvent = poll.event;
                CompletableFuture<?> completableFuture = poll.completion;
                if (th == null) {
                    try {
                        LOG.fine("Dispatching SSE event over the wire");
                        this.writer.writeTo(outboundSseEvent, outboundSseEvent.getClass(), outboundSseEvent.getGenericType(), EMPTY_ANNOTATIONS, outboundSseEvent.getMediaType(), (MultivaluedMap) null, this.ctx.getResponse().getOutputStream());
                        this.ctx.getResponse().flushBuffer();
                        LOG.fine("Completing the future successfully");
                        completableFuture.complete(null);
                    } catch (Exception e) {
                        if (e instanceof IOException) {
                            th = (IOException) e;
                        }
                        LOG.fine("Completing the future unsuccessfully (error enountered)");
                        completableFuture.completeExceptionally(e);
                    }
                } else {
                    LOG.fine("Completing the future unsuccessfully (error enountered previously)");
                    completableFuture.completeExceptionally(th);
                }
            } catch (Throwable th2) {
                boolean z = th != null && this.throwable.compareAndSet(null, th);
                this.dispatching.set(false);
                if (z && this.completed.compareAndSet(false, true)) {
                    LOG.fine("Prematurely completing the AsyncContext due to error encountered: " + th);
                    try {
                        LOG.fine("Completing the AsyncContext");
                        if (this.ctx.getRequest() != null) {
                            this.ctx.complete();
                        }
                    } catch (IllegalStateException e2) {
                        LOG.fine("Failed to close the AsyncContext cleanly: " + e2.getMessage());
                    }
                }
                throw th2;
            }
        }
        boolean z2 = th != null && this.throwable.compareAndSet(null, th);
        this.dispatching.set(false);
        if (z2 && this.completed.compareAndSet(false, true)) {
            LOG.fine("Prematurely completing the AsyncContext due to error encountered: " + th);
            try {
                LOG.fine("Completing the AsyncContext");
                if (this.ctx.getRequest() != null) {
                    this.ctx.complete();
                }
            } catch (IllegalStateException e3) {
                LOG.fine("Failed to close the AsyncContext cleanly: " + e3.getMessage());
            }
        }
    }
}
