package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.5.2.RELEASE.jar:org/springframework/kafka/listener/ConcurrentMessageListenerContainer.class */
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final List<KafkaMessageListenerContainer<K, V>> containers;
    private int concurrency;
    private boolean alwaysClientIdSuffix;

    public ConcurrentMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        super(consumerFactory, containerProperties);
        this.containers = new ArrayList();
        this.concurrency = 1;
        this.alwaysClientIdSuffix = true;
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int i) {
        Assert.isTrue(i > 0, "concurrency must be greater than 0");
        this.concurrency = i;
    }

    public void setAlwaysClientIdSuffix(boolean z) {
        this.alwaysClientIdSuffix = z;
    }

    public List<KafkaMessageListenerContainer<K, V>> getContainers() {
        List<KafkaMessageListenerContainer<K, V>> unmodifiableList;
        synchronized (this.lifecycleMonitor) {
            unmodifiableList = Collections.unmodifiableList(new ArrayList(this.containers));
        }
        return unmodifiableList;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Collection<TopicPartition> getAssignedPartitions() {
        Collection<TopicPartition> collection;
        synchronized (this.lifecycleMonitor) {
            collection = (Collection) this.containers.stream().map((v0) -> {
                return v0.getAssignedPartitions();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }
        return collection;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
        HashMap hashMap;
        synchronized (this.lifecycleMonitor) {
            hashMap = new HashMap();
            this.containers.forEach(kafkaMessageListenerContainer -> {
                Map<String, Collection<TopicPartition>> assignmentsByClientId = kafkaMessageListenerContainer.getAssignmentsByClientId();
                if (assignmentsByClientId != null) {
                    hashMap.putAll(assignmentsByClientId);
                }
            });
        }
        return hashMap;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isContainerPaused() {
        synchronized (this.lifecycleMonitor) {
            boolean isPaused = isPaused();
            if (isPaused) {
                Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
                while (it.hasNext()) {
                    if (!it.next().isContainerPaused()) {
                        return false;
                    }
                }
            }
            return isPaused;
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        Map<String, Map<MetricName, ? extends Metric>> unmodifiableMap;
        synchronized (this.lifecycleMonitor) {
            HashMap hashMap = new HashMap();
            Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
            while (it.hasNext()) {
                hashMap.putAll(it.next().metrics());
            }
            unmodifiableMap = Collections.unmodifiableMap(hashMap);
        }
        return unmodifiableMap;
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        checkTopics();
        ContainerProperties containerProperties = getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && this.concurrency > topicPartitions.length) {
            this.logger.warn(() -> {
                return "When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length;
            });
            this.concurrency = topicPartitions.length;
        }
        setRunning(true);
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> constructContainer = constructContainer(containerProperties, topicPartitions, i);
            String beanName = getBeanName();
            constructContainer.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
            constructContainer.setApplicationContext(getApplicationContext());
            if (getApplicationEventPublisher() != null) {
                constructContainer.setApplicationEventPublisher(getApplicationEventPublisher());
            }
            constructContainer.setClientIdSuffix((this.concurrency > 1 || this.alwaysClientIdSuffix) ? "-" + i : "");
            constructContainer.setGenericErrorHandler(getGenericErrorHandler());
            constructContainer.setAfterRollbackProcessor(getAfterRollbackProcessor());
            constructContainer.setRecordInterceptor(getRecordInterceptor());
            constructContainer.setInterceptBeforeTx(isInterceptBeforeTx());
            constructContainer.setEmergencyStop(() -> {
                stop(() -> {
                });
                publishContainerStoppedEvent();
            });
            if (isPaused()) {
                constructContainer.pause();
            }
            constructContainer.start();
            this.containers.add(constructContainer);
        }
    }

    private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties, TopicPartitionOffset[] topicPartitionOffsetArr, int i) {
        return topicPartitionOffsetArr == null ? new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties) : new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties, partitionSubset(containerProperties, i));
    }

    private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (this.concurrency == 1) {
            return topicPartitions;
        }
        int length = topicPartitions.length;
        if (length == this.concurrency) {
            return new TopicPartitionOffset[]{topicPartitions[i]};
        }
        int i2 = length / this.concurrency;
        return i == this.concurrency - 1 ? (TopicPartitionOffset[]) Arrays.copyOfRange(topicPartitions, i * i2, topicPartitions.length) : (TopicPartitionOffset[]) Arrays.copyOfRange(topicPartitions, i * i2, (i + 1) * i2);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop(Runnable runnable) {
        AtomicInteger atomicInteger = new AtomicInteger();
        if (isRunning()) {
            setRunning(false);
            Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
            while (it.hasNext()) {
                if (it.next().isRunning()) {
                    atomicInteger.incrementAndGet();
                }
            }
            for (KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer : this.containers) {
                if (kafkaMessageListenerContainer.isRunning()) {
                    kafkaMessageListenerContainer.stop(() -> {
                        if (atomicInteger.decrementAndGet() <= 0) {
                            runnable.run();
                        }
                    });
                }
            }
            this.containers.clear();
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer, org.springframework.kafka.listener.MessageListenerContainer
    public void pause() {
        synchronized (this.lifecycleMonitor) {
            super.pause();
            this.containers.forEach((v0) -> {
                v0.pause();
            });
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer, org.springframework.kafka.listener.MessageListenerContainer
    public void resume() {
        synchronized (this.lifecycleMonitor) {
            super.resume();
            this.containers.forEach((v0) -> {
                v0.resume();
            });
        }
    }

    public String toString() {
        return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName=" + getBeanName() + ", running=" + isRunning() + "]";
    }
}
