package io.quarkus.smallrye.reactivemessaging.runtime;

import io.quarkus.arc.AlternativePriority;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.mutiny.core.WorkerExecutor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.inject.Inject;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.LoggerFactory;

@AlternativePriority(1)
@ApplicationScoped
/* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.class */
public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry {
    private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
    private static final String WORKER_CONCURRENCY = "max-concurrency";

    @Inject
    ExecutionHolder executionHolder;
    private final Map<String, Integer> workerConcurrency = new HashMap();
    private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap();

    @Override // io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry
    public void terminate(@Observes(notifyObserver = Reception.IF_EXISTS) @BeforeDestroyed(ApplicationScoped.class) @Priority(100) Object obj) {
        if (this.workerExecutors.isEmpty()) {
            return;
        }
        Iterator<WorkerExecutor> it = this.workerExecutors.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Override // io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry
    public <T> Uni<T> executeWork(Uni<T> uni, String str, boolean z) {
        Objects.requireNonNull(uni, "Action to execute not provided");
        return str == null ? this.executionHolder.vertx().executeBlocking(uni, z) : getWorker(str).executeBlocking(uni, z);
    }

    private WorkerExecutor getWorker(String str) {
        Objects.requireNonNull(str, "Worker Name not specified");
        if (this.workerExecutors.containsKey(str)) {
            return this.workerExecutors.get(str);
        }
        if (!this.workerConcurrency.containsKey(str)) {
            throw new IllegalArgumentException("@Blocking referred to invalid worker name.");
        }
        WorkerExecutor workerExecutor = this.workerExecutors.get(str);
        if (workerExecutor == null) {
            synchronized (this) {
                workerExecutor = this.workerExecutors.get(str);
                if (workerExecutor == null) {
                    workerExecutor = this.executionHolder.vertx().createSharedWorkerExecutor(str, this.workerConcurrency.get(str).intValue());
                    LoggerFactory.getLogger((Class<?>) WorkerPoolRegistry.class).info("Created worker pool named " + str + " with concurrency of " + this.workerConcurrency.get(str));
                    this.workerExecutors.put(str, workerExecutor);
                }
            }
        }
        if (workerExecutor != null) {
            return workerExecutor;
        }
        throw new RuntimeException("Failed to create Worker for " + str);
    }

    @Override // io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry
    public void defineWorker(String str, String str2, String str3) {
        Objects.requireNonNull(str, "className was empty");
        Objects.requireNonNull(str2, "Method was empty");
        if (str3.equals("<no-value>")) {
            return;
        }
        if (Validation.isBlank(str3)) {
            throw getBlockingError(str, str2, "value is blank or null");
        }
        String str4 = "smallrye.messaging.worker." + str3 + ".max-concurrency";
        Optional optionalValue = ConfigProvider.getConfig().getOptionalValue(str4, Integer.class);
        if (!optionalValue.isPresent()) {
            throw getBlockingError(str, str2, str4 + " was not defined");
        }
        this.workerConcurrency.put(str3, (Integer) optionalValue.get());
    }

    private IllegalArgumentException getBlockingError(String str, String str2, String str3) {
        return new IllegalArgumentException("Invalid method annotated with @Blocking: " + str + "#" + str2 + " - " + str3);
    }
}
