/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.transactions;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.eclipse.microprofile.reactive.messaging.Message;

public class KafkaTransactionsImpl<T>
extends MutinyEmitterImpl<T>
implements KafkaTransactions<T> {
    private final KafkaClientService clientService;
    private final KafkaProducer<?, ?> producer;
    private volatile Transaction<?> currentTransaction;
    private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();

    public KafkaTransactionsImpl(EmitterConfiguration config, long defaultBufferSize, KafkaClientService clientService) {
        super(config, defaultBufferSize);
        this.clientService = clientService;
        this.producer = clientService.getProducer(config.name());
    }

    @Override
    public synchronized boolean isTransactionInProgress() {
        return this.currentTransaction != null;
    }

    @Override
    @CheckReturnValue
    public synchronized <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
        if (this.currentTransaction == null) {
            return new Transaction<R>().execute(work);
        }
        throw KafkaExceptions.ex.transactionInProgress(this.name);
    }

    @Override
    @CheckReturnValue
    public synchronized <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
        Map<TopicPartition, OffsetAndMetadata> offsets;
        String channel;
        IncomingKafkaRecordBatchMetadata metadata;
        Optional batchMetadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class);
        Optional recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
        if (batchMetadata.isPresent()) {
            metadata = (IncomingKafkaRecordBatchMetadata)batchMetadata.get();
            channel = metadata.getChannel();
            offsets = metadata.getOffsets().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(((OffsetAndMetadata)e.getValue()).offset() + 1L)));
        } else if (recordMetadata.isPresent()) {
            metadata = (IncomingKafkaRecordMetadata)recordMetadata.get();
            channel = metadata.getChannel();
            offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
            offsets.put(new TopicPartition(metadata.getTopic(), metadata.getPartition()), new OffsetAndMetadata(metadata.getOffset() + 1L));
        } else {
            throw KafkaExceptions.ex.noKafkaMetadataFound(message);
        }
        KafkaConsumer consumer = this.clientService.getConsumer(channel);
        if (consumer == null) {
            throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
        }
        if (this.currentTransaction == null) {
            return new Transaction<Object>((Uni<Void>)consumer.consumerGroupMetadata().chain(groupMetadata -> this.producer.sendOffsetsToTransaction(offsets, (ConsumerGroupMetadata)groupMetadata)), r -> Uni.createFrom().item(r), VOID_UNI, t -> consumer.resetToLastCommittedPositions().chain(() -> Uni.createFrom().failure(t))).execute(work);
        }
        throw KafkaExceptions.ex.transactionInProgress(this.name);
    }

    private static <R> Uni<R> defaultAfterCommit(R result) {
        return Uni.createFrom().item(result);
    }

    private static <R> Uni<R> defaultAfterAbort(Throwable throwable) {
        return Uni.createFrom().failure(throwable);
    }

    private class Transaction<R>
    implements TransactionalEmitter<T> {
        private final Uni<Void> beforeCommit;
        private final Function<R, Uni<R>> afterCommit;
        private final Uni<Void> beforeAbort;
        private final Function<Throwable, Uni<R>> afterAbort;
        private volatile boolean abort;

        public Transaction() {
            this((Uni<Void>)VOID_UNI, x$0 -> KafkaTransactionsImpl.defaultAfterCommit(x$0), (Uni<Void>)VOID_UNI, x$0 -> KafkaTransactionsImpl.defaultAfterAbort(x$0));
        }

        public Transaction(Uni<Void> beforeCommit, Function<R, Uni<R>> afterCommit, Uni<Void> beforeAbort, Function<Throwable, Uni<R>> afterAbort) {
            this.beforeCommit = beforeCommit;
            this.afterCommit = afterCommit;
            this.beforeAbort = beforeAbort;
            this.afterAbort = afterAbort;
        }

        Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> work) {
            KafkaTransactionsImpl.this.currentTransaction = this;
            Context context = Vertx.currentContext();
            Uni beginTx = KafkaTransactionsImpl.this.producer.beginTransaction();
            if (context != null) {
                beginTx = beginTx.emitOn(runnable -> context.runOnContext(x -> runnable.run()));
            }
            return beginTx.chain(() -> this.executeInTransaction(work)).eventually(() -> KafkaTransactionsImpl.this.currentTransaction = null);
        }

        private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
            return Uni.createFrom().nullItem().chain(() -> (Uni)work.apply(this)).call(() -> KafkaTransactionsImpl.this.producer.flush()).onFailure().call(throwable -> this.abort()).onCancellation().call(() -> this.abort()).call(() -> this.abort ? this.abort() : this.commit()).onFailure().recoverWithUni(throwable -> this.afterAbort.apply((Throwable)throwable)).onItem().transformToUni(result -> this.afterCommit.apply(result));
        }

        private Uni<Void> commit() {
            return this.beforeCommit.call(KafkaTransactionsImpl.this.producer::commitTransaction);
        }

        private Uni<Void> abort() {
            Uni uni = this.beforeAbort.call(KafkaTransactionsImpl.this.producer::abortTransaction);
            return this.abort ? uni.chain(() -> Uni.createFrom().failure((Throwable)new TransactionAbortedException())) : uni;
        }

        @Override
        public <M extends Message<? extends T>> void send(M msg) {
            KafkaTransactionsImpl.this.send(msg.withNack(throwable -> CompletableFuture.completedFuture(null)));
        }

        @Override
        public void send(T payload) {
            KafkaTransactionsImpl.this.send(payload).subscribe().with(unused -> {}, KafkaLogging.log::unableToSendRecord);
        }

        @Override
        public void markForAbort() {
            this.abort = true;
        }

        @Override
        public boolean isMarkedForAbort() {
            return this.abort;
        }
    }
}

