package org.jboss.resteasy.microprofile.client.publisher;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.ext.Providers;
import javax.ws.rs.sse.InboundSseEvent;
import org.jboss.logging.Logger;
import org.jboss.resteasy.plugins.providers.sse.SseEventInputImpl;
import org.jboss.resteasy.spi.ResteasyProviderFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:wildfly.zip:modules/system/layers/base/org/jboss/resteasy/resteasy-client-microprofile/main/resteasy-client-microprofile-base-3.15.1.Final.jar:org/jboss/resteasy/microprofile/client/publisher/SSEPublisher.class */
public class SSEPublisher<T> implements Publisher<T> {
    private final SseEventInputImpl input;
    private final Type genericType;
    private final Providers providers;
    private final ExecutorService executor;
    private static final Runnable CLEARED = () -> {
    };
    private static final Logger LOGGER = Logger.getLogger((Class<?>) SSEPublisher.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:wildfly.zip:modules/system/layers/base/org/jboss/resteasy/resteasy-client-microprofile/main/resteasy-client-microprofile-base-3.15.1.Final.jar:org/jboss/resteasy/microprofile/client/publisher/SSEPublisher$SSEProcessor.class */
    public static class SSEProcessor<T> implements Subscription {
        private final Subscriber<T> downstream;
        private final Queue<T> queue;
        private final int bufferSize;
        private Throwable failure;
        private volatile boolean done;
        private final AtomicLong requested = new AtomicLong();
        private final AtomicInteger wip = new AtomicInteger();
        private final AtomicReference<Runnable> onTermination = new AtomicReference<>();

        SSEProcessor(Subscriber<T> subscriber, int i) {
            this.downstream = subscriber;
            this.bufferSize = i;
            this.queue = new SpscLinkedArrayQueue(i);
        }

        public void emit(T t) {
            if (this.done || isCancelled()) {
                return;
            }
            if (t == null) {
                throw new NullPointerException("Reactive Streams Rule 2.13 violated: The received item is `null`");
            }
            if (this.queue.size() == this.bufferSize) {
                SSEPublisher.LOGGER.debugf("Dropping server-sent-event '%s' due to lack of downstream requests", this.queue.poll());
            }
            this.queue.offer(t);
            drain();
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (j > 0) {
                Subscriptions.add(this.requested, j);
                drain();
            } else {
                cancel();
                this.downstream.onError(new IllegalArgumentException("Reactive Streams Rule 3.9 violated: request must be positive, but was " + j));
            }
        }

        @Override // org.reactivestreams.Subscription
        public final void cancel() {
            cleanup();
        }

        public boolean isCancelled() {
            return this.onTermination.get() == SSEPublisher.CLEARED;
        }

        /* JADX WARN: Code restructure failed: missing block: B:37:0x0096, code lost:
        
            if (r10 != r0) goto L48;
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x009d, code lost:
        
            if (isCancelled() == false) goto L39;
         */
        /* JADX WARN: Code restructure failed: missing block: B:40:0x00a7, code lost:
        
            r0 = r5.done;
            r0 = r0.isEmpty();
         */
        /* JADX WARN: Code restructure failed: missing block: B:41:0x00b7, code lost:
        
            if (r0 == false) goto L48;
         */
        /* JADX WARN: Code restructure failed: missing block: B:43:0x00bc, code lost:
        
            if (r0 == false) goto L48;
         */
        /* JADX WARN: Code restructure failed: missing block: B:46:0x00c3, code lost:
        
            if (r5.failure == null) goto L46;
         */
        /* JADX WARN: Code restructure failed: missing block: B:47:0x00c6, code lost:
        
            sendErrorToDownstream(r5.failure);
         */
        /* JADX WARN: Code restructure failed: missing block: B:48:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:49:0x00d1, code lost:
        
            sendCompletionToDownstream();
         */
        /* JADX WARN: Code restructure failed: missing block: B:50:0x00d5, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:52:0x00a0, code lost:
        
            r0.clear();
         */
        /* JADX WARN: Code restructure failed: missing block: B:53:0x00a6, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:55:0x00da, code lost:
        
            if (r10 == 0) goto L51;
         */
        /* JADX WARN: Code restructure failed: missing block: B:56:0x00dd, code lost:
        
            org.jboss.resteasy.microprofile.client.publisher.Subscriptions.produced(r5.requested, r10);
         */
        /* JADX WARN: Code restructure failed: missing block: B:57:0x00e7, code lost:
        
            r6 = r5.wip.addAndGet(-r6);
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        void drain() {
            /*
                Method dump skipped, instructions count: 246
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.jboss.resteasy.microprofile.client.publisher.SSEPublisher.SSEProcessor.drain():void");
        }

        protected void onCompletion() {
            this.done = true;
            drain();
        }

        protected void onError(Throwable th) {
            if (this.done || isCancelled()) {
                return;
            }
            if (th == null) {
                throw new NullPointerException("Reactive Streams Rule 2.13 violated: The received error is `null`");
            }
            this.failure = th;
            this.done = true;
            drain();
        }

        private void cleanup() {
            Runnable andSet = this.onTermination.getAndSet(SSEPublisher.CLEARED);
            if (andSet == null || andSet == SSEPublisher.CLEARED) {
                return;
            }
            andSet.run();
        }

        private void sendCompletionToDownstream() {
            if (isCancelled()) {
                return;
            }
            try {
                this.downstream.onComplete();
            } finally {
                cleanup();
            }
        }

        private void sendErrorToDownstream(Throwable th) {
            if (th == null) {
                th = new NullPointerException("Reactive Streams Rule 2.13 violated: The received error is `null`");
            }
            if (isCancelled()) {
                return;
            }
            try {
                this.downstream.onError(th);
            } finally {
                cleanup();
            }
        }
    }

    public SSEPublisher(Type type, Providers providers, SseEventInputImpl sseEventInputImpl, ExecutorService executorService) {
        this.genericType = type;
        this.input = sseEventInputImpl;
        this.providers = providers;
        this.executor = executorService;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        SSEProcessor sSEProcessor = new SSEProcessor(subscriber, Integer.getInteger("resteasy.microprofile.sseclient.buffersize", 512).intValue());
        subscriber.onSubscribe(sSEProcessor);
        pump(sSEProcessor, this.input);
    }

    private void pump(final SSEProcessor sSEProcessor, final SseEventInputImpl sseEventInputImpl) {
        final Map<Class<?>, Object> contextDataMap = ResteasyProviderFactory.getContextDataMap();
        Runnable runnable = new Runnable() { // from class: org.jboss.resteasy.microprofile.client.publisher.SSEPublisher.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                ResteasyProviderFactory.pushContextDataMap(contextDataMap);
                if (SSEPublisher.this.genericType instanceof ParameterizedType) {
                    Type type = ((ParameterizedType) SSEPublisher.this.genericType).getActualTypeArguments()[0];
                    if (type.equals(InboundSseEvent.class)) {
                        while (true) {
                            try {
                                InboundSseEvent read = sseEventInputImpl.read(SSEPublisher.this.providers);
                                if (read == null) {
                                    break;
                                } else {
                                    sSEProcessor.emit(read);
                                }
                            } catch (Exception e) {
                                sSEProcessor.onError(e);
                                return;
                            }
                        }
                        sSEProcessor.onCompletion();
                    } else {
                        while (true) {
                            try {
                                InboundSseEvent read2 = sseEventInputImpl.read(SSEPublisher.this.providers);
                                if (read2 == null) {
                                    break;
                                } else {
                                    sSEProcessor.emit(read2.readData((Class) type));
                                }
                            } catch (Exception e2) {
                                sSEProcessor.onError(e2);
                                return;
                            }
                        }
                    }
                    sSEProcessor.onCompletion();
                }
            }
        };
        try {
            this.executor.execute(runnable);
        } catch (RejectedExecutionException e) {
            LOGGER.warnf("Executor %s rejected emit event task", this.executor);
            new Thread(runnable, "SseClientPublisherNewThread").start();
        }
    }
}
