/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.vertx.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.hibernate.reactive.mutiny.Mutiny;

public class HibernateReactiveStateStore
implements CheckpointStateStore {
    public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";
    private final String consumerGroupId;
    private final Mutiny.SessionFactory sf;
    private final Class<? extends CheckpointEntity> stateType;

    public HibernateReactiveStateStore(String consumerGroupId, Mutiny.SessionFactory sf, Class<? extends CheckpointEntity> stateType) {
        this.consumerGroupId = consumerGroupId;
        this.sf = sf;
        this.stateType = stateType;
    }

    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> partitions) {
        return Uni.createFrom().deferred(() -> {
            Object[] ids = partitions.stream().map(tp -> new CheckpointEntityId(this.consumerGroupId, (TopicPartition)tp)).toArray(Object[]::new);
            return this.sf.withTransaction(s -> s.find(this.stateType, ids)).map(fetched -> {
                if (fetched == null) {
                    return Collections.emptyMap();
                }
                return fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null).collect(Collectors.toMap(CheckpointEntity::topicPartition, e -> new ProcessingState(e, e.offset.longValue())));
            });
        }).runSubscriptionOn(HibernateReactiveStateStore::runOnSafeContext);
    }

    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> state) {
        return Uni.createFrom().deferred(() -> {
            Object[] entities = state.entrySet().stream().filter(e -> !ProcessingState.isEmptyOrNull((ProcessingState)((ProcessingState)e.getValue()))).map(e -> CheckpointEntity.from((ProcessingState)e.getValue(), new CheckpointEntityId(this.consumerGroupId, (TopicPartition)e.getKey()))).toArray();
            return this.sf.withTransaction(s -> s.mergeAll(entities));
        }).runSubscriptionOn(HibernateReactiveStateStore::runOnSafeContext);
    }

    private static void runOnSafeContext(Runnable r) {
        if (VertxContext.isOnDuplicatedContext()) {
            VertxContextSafetyToggle.setCurrentContextSafe((boolean)true);
            r.run();
        } else {
            Context duplicatedContext = VertxContext.createNewDuplicatedContext();
            VertxContextSafetyToggle.setContextSafe((Context)duplicatedContext, (boolean)true);
            duplicatedContext.runOnContext(x -> r.run());
        }
    }

    @ApplicationScoped
    @Identifier(value="quarkus-hibernate-reactive")
    public static class Factory
    implements CheckpointStateStore.Factory {
        @Inject
        Mutiny.SessionFactory sf;

        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, Vertx vertx, KafkaConsumer<?, ?> consumer, Class<?> stateType) {
            String consumerGroupId = (String)consumer.configuration().get("group.id");
            if (!CheckpointEntity.class.isAssignableFrom(stateType)) {
                throw new IllegalArgumentException("State type needs to extend `CheckpointEntity`");
            }
            return new HibernateReactiveStateStore(consumerGroupId, this.sf, stateType);
        }
    }
}

