/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.addon.quarkus.messaging.common;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.providers.PublisherDecorator;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.addon.quarkus.messaging.common.BackpressureKogitoEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class BackpressurePublisherDecorator
implements PublisherDecorator {
    private static final Logger logger = LoggerFactory.getLogger(BackpressurePublisherDecorator.class);
    @Inject
    BackpressureKogitoEmitter emitter;

    public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName) {
        return publisher.plug(upstream -> new BackpressureOperator(publisher, channelName));
    }

    private class BackpressureProcessor
    extends MultiOperatorProcessor<Message<?>, Message<?>> {
        private String channelName;

        public BackpressureProcessor(MultiSubscriber<? super Message<?>> downstream, String channelName) {
            super(downstream);
            this.channelName = channelName;
            BackpressurePublisherDecorator.this.emitter.registerHandler(channelName, () -> super.request(1L));
        }

        public void request(long n) {
            if (BackpressurePublisherDecorator.this.emitter.isEnabled(this.channelName)) {
                logger.trace("Requesting {} elements", (Object)n);
                super.request(n);
            } else {
                logger.trace("Blocking {} elements", (Object)n);
            }
        }
    }

    private class BackpressureOperator
    extends AbstractMultiOperator<Message<?>, Message<?>> {
        private String channelName;

        public BackpressureOperator(Multi<? extends Message<?>> upstream, String channelName) {
            super(upstream);
            this.channelName = channelName;
        }

        public void subscribe(MultiSubscriber<? super Message<?>> downstream) {
            this.upstream.subscribe().withSubscriber((MultiSubscriber)new BackpressureProcessor(downstream, this.channelName));
        }
    }
}

