package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.core.Context;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStream.class */
public class KafkaRecordStream<K, V> extends AbstractMulti<ConsumerRecord<K, V>> {
    private final ReactiveKafkaConsumer<K, V> client;
    private final KafkaConnectorIncomingConfiguration config;
    private final Context context;

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStream$KafkaRecordStreamSubscription.class */
    private class KafkaRecordStreamSubscription implements Subscription {
        private final ReactiveKafkaConsumer<K, V> client;
        private final MultiSubscriber<? super ConsumerRecord<K, V>> downstream;
        private final boolean pauseResumeEnabled;
        private final Uni<ConsumerRecords<K, V>> pollUni;
        private final long retries;
        private volatile boolean cancelled;
        private final AtomicInteger wip = new AtomicInteger();
        private final AtomicLong requested = new AtomicLong();
        private final AtomicBoolean started = new AtomicBoolean();
        private final AtomicBoolean paused = new AtomicBoolean();
        private final Queue<ConsumerRecord<K, V>> queue = new ConcurrentLinkedDeque();

        public KafkaRecordStreamSubscription(ReactiveKafkaConsumer<K, V> reactiveKafkaConsumer, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, MultiSubscriber<? super ConsumerRecord<K, V>> multiSubscriber) {
            this.client = reactiveKafkaConsumer;
            this.pauseResumeEnabled = kafkaConnectorIncomingConfiguration.getPauseIfNoRequests().booleanValue();
            this.downstream = multiSubscriber;
            this.retries = kafkaConnectorIncomingConfiguration.getRetryAttempts().intValue() == -1 ? Long.MAX_VALUE : kafkaConnectorIncomingConfiguration.getRetryAttempts().intValue();
            this.pollUni = reactiveKafkaConsumer.poll().onItem().transform(consumerRecords -> {
                if (consumerRecords.isEmpty()) {
                    return null;
                }
                Queue<ConsumerRecord<K, V>> queue = this.queue;
                queue.getClass();
                consumerRecords.forEach((v1) -> {
                    r1.offer(v1);
                });
                return consumerRecords;
            }).plug(uni -> {
                if (!kafkaConnectorIncomingConfiguration.getRetry().booleanValue()) {
                    return uni;
                }
                return uni.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(kafkaConnectorIncomingConfiguration.getRetryMaxWait().intValue())).atMost(this.retries);
            });
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (j <= 0) {
                throw new IllegalArgumentException("Invalid request");
            }
            if (this.cancelled) {
                return;
            }
            Subscriptions.add(this.requested, j);
            if (this.started.compareAndSet(false, true) && !this.cancelled) {
                poll();
            } else {
                if (this.cancelled) {
                    return;
                }
                dispatch();
            }
        }

        private void poll() {
            if (this.cancelled || this.client.isClosed()) {
                return;
            }
            this.pollUni.subscribe().with(consumerRecords -> {
                if (consumerRecords == null) {
                    this.client.executeWithDelay(this::poll, Duration.ofMillis(2L)).subscribe().with(r1 -> {
                    }, this::report);
                } else {
                    dispatch();
                    this.client.runOnPollingThread(consumer -> {
                        poll();
                    }).subscribe().with(r12 -> {
                    }, this::report);
                }
            }, this::report);
        }

        private void report(Throwable th) {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            this.downstream.onFailure(th);
        }

        void dispatch() {
            if (this.wip.getAndIncrement() != 0) {
                return;
            }
            KafkaRecordStream.this.context.runOnContext(r3 -> {
                run();
            });
        }

        private void run() {
            ConsumerRecord<K, V> poll;
            int i = 1;
            Queue<ConsumerRecord<K, V>> queue = this.queue;
            long j = 0;
            long j2 = this.requested.get();
            while (!isCancelled()) {
                while (j != j2 && (poll = queue.poll()) != null && !isCancelled()) {
                    this.downstream.onItem(poll);
                    j++;
                }
                j2 = this.requested.addAndGet(-j);
                j = 0;
                if (this.pauseResumeEnabled) {
                    if (j2 <= this.queue.size() && this.paused.compareAndSet(false, true)) {
                        KafkaLogging.log.pausingChannel(KafkaRecordStream.this.config.getChannel());
                        this.client.pause().subscribe().with(set -> {
                        }, this::report);
                    } else if (j2 > this.queue.size() && this.paused.compareAndSet(true, false)) {
                        KafkaLogging.log.resumingChannel(KafkaRecordStream.this.config.getChannel());
                        this.client.resume().subscribe().with(r1 -> {
                        }, this::report);
                    }
                }
                int i2 = this.wip.get();
                if (i == i2) {
                    i = this.wip.addAndGet(-i);
                    if (i == 0) {
                        return;
                    }
                } else {
                    i = i2;
                }
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            if (this.wip.getAndIncrement() == 0) {
                this.client.close();
                this.queue.clear();
            }
        }

        boolean isCancelled() {
            if (!this.cancelled) {
                return false;
            }
            this.queue.clear();
            this.client.close();
            return true;
        }
    }

    public KafkaRecordStream(ReactiveKafkaConsumer<K, V> reactiveKafkaConsumer, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Context context) {
        this.config = kafkaConnectorIncomingConfiguration;
        this.client = reactiveKafkaConsumer;
        this.context = context;
    }

    @Override // io.smallrye.mutiny.operators.AbstractMulti
    public void subscribe(MultiSubscriber<? super ConsumerRecord<K, V>> multiSubscriber) {
        multiSubscriber.onSubscribe(new KafkaRecordStreamSubscription(this.client, this.config, multiSubscriber));
    }
}
