package io.smallrye.reactive.messaging.extension;

import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.ChannelRegistar;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MediatorFactory;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.WeavingException;
import io.smallrye.reactive.messaging.annotations.Merge;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.DeploymentException;
import javax.inject.Inject;
import org.drools.core.RuleBaseConfiguration;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:io/smallrye/reactive/messaging/extension/MediatorManager.class */
public class MediatorManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MediatorManager.class);
    public static final String STRICT_MODE_PROPERTY = "smallrye-messaging-strict-binding";

    @Inject
    @Any
    Instance<ChannelRegistar> streamRegistars;

    @Inject
    MediatorFactory mediatorFactory;

    @Inject
    ChannelRegistry channelRegistry;

    @Inject
    BeanManager beanManager;
    private boolean initialized;
    private final CollectedMediatorMetadata collected = new CollectedMediatorMetadata();
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList();
    private final List<AbstractMediator> mediators = new ArrayList();
    private final boolean strictMode = Boolean.parseBoolean(System.getProperty(STRICT_MODE_PROPERTY, RuleBaseConfiguration.DEFAULT_SIGN_ON_SERIALIZATION));

    public MediatorManager() {
        if (this.strictMode) {
            LOGGER.debug("Strict mode enabled");
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
        LOGGER.info("Scanning Type: {}", annotatedType.getJavaClass());
        annotatedType.getMethods().stream().filter(annotatedMethod -> {
            return annotatedMethod.isAnnotationPresent(Incoming.class) || annotatedMethod.isAnnotationPresent(Outgoing.class);
        }).forEach(annotatedMethod2 -> {
            this.collected.add(annotatedMethod2.getJavaMember(), bean);
        });
    }

    public <T> void analyze(Class<?> cls, Bean<T> bean) {
        Class<?> cls2 = cls;
        while (true) {
            Class<?> cls3 = cls2;
            if (cls3 == Object.class) {
                return;
            }
            Arrays.stream(cls3.getDeclaredMethods()).filter(method -> {
                return method.isAnnotationPresent(Incoming.class) || method.isAnnotationPresent(Outgoing.class);
            }).forEach(method2 -> {
                this.collected.add(method2, bean);
            });
            cls2 = cls3.getSuperclass();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @PreDestroy
    public void shutdown() {
        LOGGER.info("Cancel subscriptions");
        this.subscriptions.forEach((v0) -> {
            v0.cancel();
        });
        this.subscriptions.clear();
    }

    public void initializeAndRun() {
        if (this.initialized) {
            throw new IllegalStateException("MediatorManager was already initialized!");
        }
        LOGGER.info("Deployment done... start processing");
        this.streamRegistars.stream().forEach((v0) -> {
            v0.initialize();
        });
        Set<String> outgoingNames = this.channelRegistry.getOutgoingNames();
        LOGGER.info("Initializing mediators");
        this.collected.mediators().forEach(mediatorConfiguration -> {
            AbstractMediator createMediator = createMediator(mediatorConfiguration);
            LOGGER.debug("Initializing {}", createMediator.getMethodAsString());
            if (mediatorConfiguration.getInvokerClass() != null) {
                try {
                    createMediator.setInvoker(mediatorConfiguration.getInvokerClass().newInstance());
                } catch (IllegalAccessException | InstantiationException e) {
                    LOGGER.error("Unable to create invoker instance of " + mediatorConfiguration.getInvokerClass(), e);
                    return;
                }
            }
            try {
                createMediator.initialize(this.beanManager.getReference(mediatorConfiguration.getBean(), Object.class, this.beanManager.createCreationalContext(mediatorConfiguration.getBean())));
                if (createMediator.getConfiguration().shape() == Shape.PUBLISHER) {
                    LOGGER.debug("Registering {} as publisher {}", createMediator.getConfiguration().methodAsString(), createMediator.getConfiguration().getOutgoing());
                    this.channelRegistry.register(createMediator.getConfiguration().getOutgoing(), createMediator.getStream());
                }
                if (createMediator.getConfiguration().shape() == Shape.SUBSCRIBER) {
                    LOGGER.debug("Registering {} as subscriber {}", createMediator.getConfiguration().methodAsString(), createMediator.getConfiguration().getIncoming());
                    this.channelRegistry.register(createMediator.getConfiguration().getIncoming(), createMediator.getComputedSubscriber());
                }
            } catch (Throwable th) {
                LOGGER.error("Unable to initialize mediator: " + createMediator.getMethodAsString(), th);
            }
        });
        try {
            weaving(outgoingNames);
        } catch (WeavingException e) {
            throw new DeploymentException(e);
        }
    }

    private void weaving(Set<String> set) {
        LOGGER.info("Connecting mediators");
        List<AbstractMediator> allNonSatisfiedMediators = getAllNonSatisfiedMediators();
        ArrayList arrayList = new ArrayList();
        while (true) {
            if (allNonSatisfiedMediators.isEmpty()) {
                break;
            }
            int size = allNonSatisfiedMediators.size();
            allNonSatisfiedMediators.forEach(abstractMediator -> {
                LOGGER.info("Attempt to resolve {}", abstractMediator.getMethodAsString());
                getAggregatedSource(this.channelRegistry.getPublishers(abstractMediator.configuration().getIncoming()), abstractMediator, arrayList).ifPresent(publisherBuilder -> {
                    abstractMediator.connectToUpstream(publisherBuilder);
                    LOGGER.info("Connecting {} to `{}` ({})", abstractMediator.getMethodAsString(), abstractMediator.configuration().getIncoming(), publisherBuilder);
                    if (abstractMediator.configuration().getOutgoing() != null) {
                        this.channelRegistry.register(abstractMediator.getConfiguration().getOutgoing(), abstractMediator.getStream());
                    }
                });
            });
            allNonSatisfiedMediators = getAllNonSatisfiedMediators();
            if (allNonSatisfiedMediators.size() == size) {
                if (this.strictMode) {
                    throw new WeavingException("Impossible to bind mediators, some mediators are not connected: " + allNonSatisfiedMediators.stream().map(abstractMediator2 -> {
                        return abstractMediator2.configuration().methodAsString();
                    }).collect(Collectors.toList()) + ", available publishers:" + this.channelRegistry.getIncomingNames() + ", available emitters: " + this.channelRegistry.getEmitterNames());
                }
                LOGGER.warn("Impossible to bind mediators, some mediators are not connected: {}", allNonSatisfiedMediators.stream().map(abstractMediator3 -> {
                    return abstractMediator3.configuration().methodAsString();
                }).collect(Collectors.toList()));
                LOGGER.warn("Available publishers: {}", this.channelRegistry.getIncomingNames());
                LOGGER.warn("Available emitters: {}", this.channelRegistry.getEmitterNames());
            }
        }
        arrayList.forEach(lazySource -> {
            lazySource.configure(this.channelRegistry, LOGGER);
        });
        this.mediators.stream().filter(abstractMediator4 -> {
            return abstractMediator4.configuration().shape() == Shape.SUBSCRIBER;
        }).filter((v0) -> {
            return v0.isConnected();
        }).forEach((v0) -> {
            v0.run();
        });
        for (String str : set) {
            List<AbstractMediator> lookupForMediatorsWithMatchingDownstream = lookupForMediatorsWithMatchingDownstream(str);
            EmitterImpl emitterImpl = (EmitterImpl) this.channelRegistry.getEmitter(str);
            List<SubscriberBuilder<? extends Message, Void>> subscribers = this.channelRegistry.getSubscribers(str);
            for (AbstractMediator abstractMediator5 : lookupForMediatorsWithMatchingDownstream) {
                if (subscribers.size() == 1) {
                    LOGGER.info("Connecting method {} to sink {}", abstractMediator5.getMethodAsString(), str);
                    abstractMediator5.getStream().to((SubscriberBuilder<? super Object, ? extends R>) subscribers.get(0)).run();
                } else if (subscribers.size() > 2) {
                    LOGGER.warn("{} subscribers consuming the stream {}", Integer.valueOf(subscribers.size()), str);
                    subscribers.forEach(subscriberBuilder -> {
                        LOGGER.info("Connecting method {} to sink {}", abstractMediator5.getMethodAsString(), str);
                        abstractMediator5.getStream().to((SubscriberBuilder<? super Object, ? extends R>) subscriberBuilder).run();
                    });
                }
            }
            if (lookupForMediatorsWithMatchingDownstream.isEmpty() && emitterImpl != null) {
                if (subscribers.size() == 1) {
                    LOGGER.info("Connecting emitter to sink {}", str);
                    ReactiveStreams.fromPublisher(emitterImpl.getPublisher()).to((SubscriberBuilder) subscribers.get(0)).run();
                } else if (subscribers.size() > 2) {
                    LOGGER.warn("{} subscribers consuming the stream {}", Integer.valueOf(subscribers.size()), str);
                    subscribers.forEach(subscriberBuilder2 -> {
                        LOGGER.info("Connecting emitter to sink {}", str);
                        ReactiveStreams.fromPublisher(emitterImpl.getPublisher()).to(subscriberBuilder2).run();
                    });
                }
            }
        }
        this.initialized = true;
    }

    private List<AbstractMediator> lookupForMediatorsWithMatchingDownstream(String str) {
        return (List) this.mediators.stream().filter(abstractMediator -> {
            return abstractMediator.configuration().getOutgoing() != null;
        }).filter(abstractMediator2 -> {
            return abstractMediator2.configuration().getOutgoing().equalsIgnoreCase(str);
        }).collect(Collectors.toList());
    }

    private List<AbstractMediator> getAllNonSatisfiedMediators() {
        return (List) this.mediators.stream().filter(abstractMediator -> {
            return !abstractMediator.isConnected();
        }).collect(Collectors.toList());
    }

    private AbstractMediator createMediator(MediatorConfiguration mediatorConfiguration) {
        AbstractMediator create = this.mediatorFactory.create(mediatorConfiguration);
        LOGGER.debug("Mediator created for {}", mediatorConfiguration.methodAsString());
        this.mediators.add(create);
        return create;
    }

    private Optional<PublisherBuilder<? extends Message>> getAggregatedSource(List<PublisherBuilder<? extends Message>> list, AbstractMediator abstractMediator, List<LazySource> list2) {
        if (list.isEmpty()) {
            return Optional.empty();
        }
        Merge.Mode merge = abstractMediator.getConfiguration().getMerge();
        if (merge != null) {
            LazySource lazySource = new LazySource(abstractMediator.configuration().getIncoming(), merge);
            list2.add(lazySource);
            return Optional.of(ReactiveStreams.fromPublisher(lazySource));
        }
        if (list.size() > 1) {
            throw new WeavingException(abstractMediator.configuration().getIncoming(), abstractMediator.getMethodAsString(), list.size());
        }
        return Optional.of(list.get(0));
    }

    public void initializeEmitters(List<String> list) {
        for (String str : list) {
            EmitterImpl emitterImpl = new EmitterImpl(str);
            this.channelRegistry.register(str, ReactiveStreams.fromPublisher(emitterImpl.getPublisher()));
            this.channelRegistry.register(str, emitterImpl);
        }
    }
}
