package io.smallrye.reactive.messaging.extension;

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/extension/EmitterImpl.class */
public class EmitterImpl<T> implements Emitter<T> {
    private final AtomicReference<FlowableEmitter<Message<? extends T>>> internal = new AtomicReference<>();
    private final Flowable<Message<? extends T>> publisher;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) EmitterImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public EmitterImpl(String str, String str2, long j, long j2) {
        FlowableOnSubscribe flowableOnSubscribe = flowableEmitter -> {
            if (this.internal.compareAndSet(null, flowableEmitter)) {
                return;
            }
            flowableEmitter.onError(new Exception("Emitter already created"));
        };
        if (str2 == null) {
            this.publisher = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER).onBackpressureBuffer(j2, () -> {
                LOGGER.error("Buffer full for emitter {}", str);
            }, BackpressureOverflowStrategy.ERROR);
            return;
        }
        switch (OnOverflow.Strategy.valueOf(str2)) {
            case BUFFER:
                Flowable<Message<? extends T>> create = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
                if (j > 0) {
                    this.publisher = create.onBackpressureBuffer(j, () -> {
                        LOGGER.error("Buffer full for emitter {}", str);
                    }, BackpressureOverflowStrategy.ERROR);
                    return;
                } else {
                    this.publisher = create;
                    return;
                }
            case DROP:
                this.publisher = Flowable.create(flowableOnSubscribe, BackpressureStrategy.DROP);
                return;
            case FAIL:
                this.publisher = Flowable.create(flowableOnSubscribe, BackpressureStrategy.ERROR);
                return;
            case LATEST:
                this.publisher = Flowable.create(flowableOnSubscribe, BackpressureStrategy.LATEST);
                return;
            case NONE:
                this.publisher = Flowable.create(flowableOnSubscribe, BackpressureStrategy.MISSING);
                return;
            default:
                throw new IllegalArgumentException("Invalid back pressure strategy: " + str2);
        }
    }

    public Publisher<Message<? extends T>> getPublisher() {
        return this.publisher;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnected() {
        return this.internal.get() != null;
    }

    @Override // io.smallrye.reactive.messaging.annotations.Emitter
    public synchronized Emitter<T> send(T t) {
        if (t == null) {
            throw new IllegalArgumentException("`null` is not a valid value");
        }
        FlowableEmitter<Message<? extends T>> verify = verify();
        if (t instanceof Message) {
            verify.onNext((Message) t);
        } else {
            verify.onNext(Message.of(t));
        }
        return this;
    }

    private synchronized FlowableEmitter<Message<? extends T>> verify() {
        FlowableEmitter<Message<? extends T>> flowableEmitter = this.internal.get();
        if (flowableEmitter == null) {
            throw new IllegalStateException("Stream not yet connected");
        }
        if (flowableEmitter.isCancelled()) {
            throw new IllegalStateException("Stream has been terminated");
        }
        return flowableEmitter;
    }

    @Override // io.smallrye.reactive.messaging.annotations.Emitter
    public synchronized void complete() {
        verify().onComplete();
    }

    @Override // io.smallrye.reactive.messaging.annotations.Emitter
    public synchronized void error(Exception exc) {
        if (exc == null) {
            throw new IllegalArgumentException("`null` is not a valid exception");
        }
        verify().onError(exc);
    }

    @Override // io.smallrye.reactive.messaging.annotations.Emitter
    public synchronized boolean isCancelled() {
        FlowableEmitter<Message<? extends T>> flowableEmitter = this.internal.get();
        return flowableEmitter == null || flowableEmitter.isCancelled();
    }

    @Override // io.smallrye.reactive.messaging.annotations.Emitter
    public boolean isRequested() {
        return !isCancelled() && this.internal.get().requested() > 0;
    }
}
