package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.Nullable;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.5.jar:org/springframework/kafka/core/KafkaAdmin.class */
public class KafkaAdmin extends KafkaResourceFactory implements ApplicationContextAware, SmartInitializingSingleton, KafkaAdminOperations {
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private final Map<String, Object> configs;
    private ApplicationContext applicationContext;
    private boolean fatalIfBrokerNotAvailable;
    private boolean initializingContext;
    private boolean modifyTopicConfigs;
    private String clusterId;
    public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10);
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));
    private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private int operationTimeout = 30;
    private boolean autoCreate = true;

    /* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.5.jar:org/springframework/kafka/core/KafkaAdmin$NewTopics.class */
    public static class NewTopics {
        private final Collection<NewTopic> newTopics = new ArrayList();

        public NewTopics(NewTopic... newTopicArr) {
            this.newTopics.addAll(Arrays.asList(newTopicArr));
        }

        Collection<NewTopic> getNewTopics() {
            return this.newTopics;
        }
    }

    public KafkaAdmin(Map<String, Object> map) {
        this.configs = new HashMap(map);
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setCloseTimeout(int i) {
        this.closeTimeout = Duration.ofSeconds(i);
    }

    public void setOperationTimeout(int i) {
        this.operationTimeout = i;
    }

    public int getOperationTimeout() {
        return this.operationTimeout;
    }

    public void setFatalIfBrokerNotAvailable(boolean z) {
        this.fatalIfBrokerNotAvailable = z;
    }

    public void setAutoCreate(boolean z) {
        this.autoCreate = z;
    }

    public void setModifyTopicConfigs(boolean z) {
        this.modifyTopicConfigs = z;
    }

    @Override // org.springframework.kafka.core.KafkaAdminOperations
    public Map<String, Object> getConfigurationProperties() {
        HashMap hashMap = new HashMap(this.configs);
        checkBootstrap(hashMap);
        return Collections.unmodifiableMap(hashMap);
    }

    @Override // org.springframework.beans.factory.SmartInitializingSingleton
    public void afterSingletonsInstantiated() {
        this.initializingContext = true;
        if (this.autoCreate) {
            initialize();
        }
    }

    public final boolean initialize() {
        Collection<NewTopic> newTopics = newTopics();
        if (newTopics.size() > 0) {
            AdminClient adminClient = null;
            try {
                adminClient = createAdmin();
            } catch (Exception e) {
                if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
                    throw new IllegalStateException("Could not create admin", e);
                }
                LOGGER.error(e, "Could not create admin");
            }
            try {
                if (adminClient != null) {
                    try {
                        synchronized (this) {
                            this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
                        }
                        addOrModifyTopicsIfNeeded(adminClient, newTopics);
                        this.initializingContext = false;
                        adminClient.close(this.closeTimeout);
                        return true;
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        this.initializingContext = false;
                        adminClient.close(this.closeTimeout);
                    } catch (Exception e3) {
                        if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
                            throw new IllegalStateException("Could not configure topics", e3);
                        }
                        LOGGER.error(e3, "Could not configure topics");
                        this.initializingContext = false;
                        adminClient.close(this.closeTimeout);
                    }
                }
            } catch (Throwable th) {
                this.initializingContext = false;
                adminClient.close(this.closeTimeout);
                throw th;
            }
        }
        this.initializingContext = false;
        return false;
    }

    private Collection<NewTopic> newTopics() {
        HashMap hashMap = new HashMap(this.applicationContext.getBeansOfType(NewTopic.class, false, false));
        Map beansOfType = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
        AtomicInteger atomicInteger = new AtomicInteger();
        beansOfType.forEach((str, newTopics) -> {
            newTopics.getNewTopics().forEach(newTopic -> {
                hashMap.put(str + "#" + atomicInteger.getAndIncrement(), newTopic);
            });
        });
        for (Map.Entry entry : ((Map) hashMap.entrySet().stream().filter(entry2 -> {
            return entry2.getValue() instanceof TopicForRetryable;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).entrySet()) {
            Iterator it = hashMap.entrySet().iterator();
            boolean z = false;
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry entry3 = (Map.Entry) it.next();
                if (((NewTopic) entry3.getValue()).name().equals(((NewTopic) entry.getValue()).name()) && !(entry3.getValue() instanceof TopicForRetryable)) {
                    z = true;
                    break;
                }
            }
            if (z) {
                hashMap.remove(entry.getKey());
            }
        }
        return new ArrayList(hashMap.values());
    }

    @Override // org.springframework.kafka.core.KafkaAdminOperations
    @Nullable
    public String clusterId() {
        if (this.clusterId == null) {
            try {
                AdminClient createAdmin = createAdmin();
                try {
                    this.clusterId = createAdmin.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
                    if (createAdmin != null) {
                        createAdmin.close();
                    }
                } catch (Throwable th) {
                    if (createAdmin != null) {
                        try {
                            createAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LOGGER.error(e2, "Could not obtaine cluster info");
            }
        }
        return this.clusterId;
    }

    @Override // org.springframework.kafka.core.KafkaAdminOperations
    public void createOrModifyTopics(NewTopic... newTopicArr) {
        AdminClient createAdmin = createAdmin();
        try {
            addOrModifyTopicsIfNeeded(createAdmin, Arrays.asList(newTopicArr));
            if (createAdmin != null) {
                createAdmin.close();
            }
        } catch (Throwable th) {
            if (createAdmin != null) {
                try {
                    createAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.springframework.kafka.core.KafkaAdminOperations
    public Map<String, TopicDescription> describeTopics(String... strArr) {
        AdminClient createAdmin = createAdmin();
        try {
            HashMap hashMap = new HashMap();
            try {
                hashMap.putAll(createAdmin.describeTopics(Arrays.asList(strArr)).allTopicNames().get(this.operationTimeout, TimeUnit.SECONDS));
                if (createAdmin != null) {
                    createAdmin.close();
                }
                return hashMap;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted while getting topic descriptions", e);
            } catch (ExecutionException | TimeoutException e2) {
                throw new KafkaException("Failed to obtain topic descriptions", e2);
            }
        } catch (Throwable th) {
            if (createAdmin != null) {
                try {
                    createAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private AdminClient createAdmin() {
        HashMap hashMap = new HashMap(this.configs);
        checkBootstrap(hashMap);
        return AdminClient.create((Map<String, Object>) hashMap);
    }

    private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> collection) {
        if (collection.size() > 0) {
            Map<String, NewTopic> hashMap = new HashMap<>();
            collection.forEach(newTopic -> {
                hashMap.compute(newTopic.name(), (str, newTopic) -> {
                    return newTopic;
                });
            });
            DescribeTopicsResult describeTopics = adminClient.describeTopics((Collection<String>) collection.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList()));
            List<NewTopic> arrayList = new ArrayList<>();
            Map<String, NewPartitions> checkPartitions = checkPartitions(hashMap, describeTopics, arrayList);
            if (arrayList.size() > 0) {
                addTopics(adminClient, arrayList);
            }
            if (checkPartitions.size() > 0) {
                createMissingPartitions(adminClient, checkPartitions);
            }
            if (this.modifyTopicConfigs) {
                LinkedList linkedList = new LinkedList(collection);
                linkedList.removeAll(arrayList);
                Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches = checkTopicsForConfigMismatches(adminClient, linkedList);
                if (checkTopicsForConfigMismatches.isEmpty()) {
                    return;
                }
                adjustConfigMismatches(adminClient, collection, checkTopicsForConfigMismatches);
            }
        }
    }

    private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(AdminClient adminClient, Collection<NewTopic> collection) {
        try {
            Map<ConfigResource, Config> map = adminClient.describeConfigs((List) collection.stream().map(newTopic -> {
                return new ConfigResource(ConfigResource.Type.TOPIC, newTopic.name());
            }).collect(Collectors.toList())).all().get(this.operationTimeout, TimeUnit.SECONDS);
            HashMap hashMap = new HashMap();
            for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {
                Optional<NewTopic> findFirst = collection.stream().filter(newTopic2 -> {
                    return newTopic2.name().equals(((ConfigResource) entry.getKey()).name());
                }).findFirst();
                ArrayList arrayList = new ArrayList();
                if (findFirst.isPresent() && findFirst.get().configs() != null) {
                    for (Map.Entry<String, String> entry2 : findFirst.get().configs().entrySet()) {
                        ConfigEntry configEntry = entry.getValue().get(entry2.getKey());
                        if (!entry2.getValue().equals(configEntry.value())) {
                            arrayList.add(configEntry);
                        }
                    }
                    if (arrayList.size() > 0) {
                        hashMap.put(entry.getKey(), arrayList);
                    }
                }
            }
            return hashMap;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KafkaException("Interrupted while getting topic descriptions:" + collection, e);
        } catch (ExecutionException | TimeoutException e2) {
            throw new KafkaException("Failed to obtain topic descriptions:" + collection, e2);
        }
    }

    private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic> collection, Map<ConfigResource, List<ConfigEntry>> map) {
        for (Map.Entry<ConfigResource, List<ConfigEntry>> entry : map.entrySet()) {
            ConfigResource key = entry.getKey();
            Optional<NewTopic> findFirst = collection.stream().filter(newTopic -> {
                return newTopic.name().equals(key.name());
            }).findFirst();
            if (findFirst.isPresent()) {
                for (ConfigEntry configEntry : entry.getValue()) {
                    ArrayList arrayList = new ArrayList();
                    Map<String, String> configs = findFirst.get().configs();
                    if (configs.get(configEntry.name()) != null) {
                        arrayList.add(new AlterConfigOp(new ConfigEntry(configEntry.name(), configs.get(configEntry.name())), AlterConfigOp.OpType.SET));
                    }
                    if (arrayList.size() > 0) {
                        try {
                            adminClient.incrementalAlterConfigs(Map.of(key, arrayList)).all().get(this.operationTimeout, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new KafkaException("Interrupted while getting topic descriptions", e);
                        } catch (ExecutionException | TimeoutException e2) {
                            throw new KafkaException("Failed to obtain topic descriptions", e2);
                        }
                    }
                }
            }
        }
    }

    private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> map, DescribeTopicsResult describeTopicsResult, List<NewTopic> list) {
        HashMap hashMap = new HashMap();
        describeTopicsResult.topicNameValues().forEach((str, kafkaFuture) -> {
            NewTopic newTopic = (NewTopic) map.get(str);
            try {
                TopicDescription topicDescription = (TopicDescription) kafkaFuture.get(this.operationTimeout, TimeUnit.SECONDS);
                if (newTopic.numPartitions() >= 0 && newTopic.numPartitions() < topicDescription.partitions().size()) {
                    LOGGER.info(() -> {
                        return String.format("Topic '%s' exists but has a different partition count: %d not %d", str, Integer.valueOf(topicDescription.partitions().size()), Integer.valueOf(newTopic.numPartitions()));
                    });
                } else if (newTopic.numPartitions() > topicDescription.partitions().size()) {
                    LOGGER.info(() -> {
                        return String.format("Topic '%s' exists but has a different partition count: %d not %d, increasing if the broker supports it", str, Integer.valueOf(topicDescription.partitions().size()), Integer.valueOf(newTopic.numPartitions()));
                    });
                    hashMap.put(str, NewPartitions.increaseTo(newTopic.numPartitions()));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e2) {
                list.add(newTopic);
            } catch (TimeoutException e3) {
                throw new KafkaException("Timed out waiting to get existing topics", e3);
            }
        });
        return hashMap;
    }

    private void addTopics(AdminClient adminClient, List<NewTopic> list) {
        try {
            adminClient.createTopics(list).all().get(this.operationTimeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error(e, "Interrupted while waiting for topic creation results");
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof TopicExistsException) {
                LOGGER.debug(e2.getCause(), "Failed to create topics");
            } else {
                LOGGER.error(e2.getCause(), "Failed to create topics");
                throw new KafkaException("Failed to create topics", e2.getCause());
            }
        } catch (TimeoutException e3) {
            throw new KafkaException("Timed out waiting for create topics results", e3);
        }
    }

    private void createMissingPartitions(AdminClient adminClient, Map<String, NewPartitions> map) {
        try {
            adminClient.createPartitions(map).all().get(this.operationTimeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error(e, "Interrupted while waiting for partition creation results");
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof InvalidPartitionsException) {
                LOGGER.debug(e2.getCause(), "Failed to create partitions");
                return;
            }
            LOGGER.error(e2.getCause(), "Failed to create partitions");
            if (!(e2.getCause() instanceof UnsupportedVersionException)) {
                throw new KafkaException("Failed to create partitions", e2.getCause());
            }
        } catch (TimeoutException e3) {
            throw new KafkaException("Timed out waiting for create partitions results", e3);
        }
    }
}
