package io.smallrye.stork;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.stork.config.ServiceDiscoveryConfig;
import io.smallrye.stork.utils.DurationUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/stork/CachingServiceDiscovery.class */
public abstract class CachingServiceDiscovery implements ServiceDiscovery {
    public final Duration refreshPeriod;
    private volatile ServiceInstancesCache cacheData;
    final AtomicReference<Refresh> refresh = new AtomicReference<>(null);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CachingServiceDiscovery.class);
    public static final Duration DEFAULT_REFRESH_INTERVAL = Duration.ofMinutes(5);

    /* loaded from: input_file:io/smallrye/stork/CachingServiceDiscovery$Refresh.class */
    public static class Refresh {
        final CompletableFuture<List<ServiceInstance>> result = new CompletableFuture<>();
        final Uni<List<ServiceInstance>> uniResult = Uni.createFrom().completionStage(this.result).memoize().indefinitely();

        protected void trigger(Supplier<Uni<List<ServiceInstance>>> supplier) {
            try {
                UniSubscribe<List<ServiceInstance>> subscribe = supplier.get().subscribe();
                CompletableFuture<List<ServiceInstance>> completableFuture = this.result;
                Objects.requireNonNull(completableFuture);
                Consumer<? super List<ServiceInstance>> consumer = (v1) -> {
                    r1.complete(v1);
                };
                CompletableFuture<List<ServiceInstance>> completableFuture2 = this.result;
                Objects.requireNonNull(completableFuture2);
                subscribe.with(consumer, completableFuture2::completeExceptionally);
            } catch (Throwable th) {
                this.result.completeExceptionally(th);
            }
        }

        public Uni<List<ServiceInstance>> result() {
            return this.uniResult;
        }
    }

    public CachingServiceDiscovery(ServiceDiscoveryConfig serviceDiscoveryConfig) {
        String str = serviceDiscoveryConfig.parameters().get("refresh-period");
        try {
            this.refreshPeriod = str != null ? DurationUtils.parseDuration(str) : DEFAULT_REFRESH_INTERVAL;
        } catch (DateTimeParseException e) {
            throw new IllegalArgumentException("refresh-period for service discovery should be a number, got: " + str, e);
        }
    }

    @Override // io.smallrye.stork.ServiceDiscovery
    public Uni<List<ServiceInstance>> getServiceInstances() {
        if (this.cacheData == null) {
            Refresh compareAndExchange = this.refresh.compareAndExchange(null, new Refresh());
            if (compareAndExchange != null) {
                return compareAndExchange.result();
            }
            Refresh refresh = this.refresh.get();
            refresh.trigger(() -> {
                return fetchNewServiceInstances(Collections.emptyList());
            });
            return refresh.result().onItem().invoke(this::replaceCacheData).onFailure().invoke(this::handleFetchError);
        }
        if (refreshNotNeed()) {
            return Uni.createFrom().item((UniCreate) this.cacheData.getServiceInstances());
        }
        Refresh compareAndExchange2 = this.refresh.compareAndExchange(null, new Refresh());
        if (compareAndExchange2 != null) {
            return compareAndExchange2.result();
        }
        Refresh refresh2 = this.refresh.get();
        refresh2.trigger(() -> {
            return fetchNewServiceInstances(this.cacheData.getServiceInstances());
        });
        return refresh2.result().onItem().invoke(this::replaceCacheData).onFailure().invoke(this::handleFetchError);
    }

    private void handleFetchError(Throwable th) {
        log.error("Failed to fetch service instances", th);
        this.refresh.set(null);
    }

    private void replaceCacheData(List<ServiceInstance> list) {
        this.cacheData = new ServiceInstancesCache(list, LocalDateTime.now());
        this.refresh.set(null);
    }

    public boolean refreshNotNeed() {
        return this.cacheData.getLastFetchDateTime().isAfter(LocalDateTime.now().minus((TemporalAmount) this.refreshPeriod));
    }

    public abstract Uni<List<ServiceInstance>> fetchNewServiceInstances(List<ServiceInstance> list);
}
