package io.apicurio.registry.storage.impl.kafkasql.sql;

import io.apicurio.registry.logging.Logged;
import io.apicurio.registry.mt.TenantContext;
import io.apicurio.registry.storage.ArtifactAlreadyExistsException;
import io.apicurio.registry.storage.ArtifactNotFoundException;
import io.apicurio.registry.storage.RegistryStorageException;
import io.apicurio.registry.storage.dto.GroupMetaDataDto;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlCoordinator;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlSubmitter;
import io.apicurio.registry.storage.impl.kafkasql.MessageType;
import io.apicurio.registry.storage.impl.kafkasql.keys.ArtifactKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.ArtifactRuleKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.ArtifactVersionKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.ContentIdKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.ContentKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.GlobalIdKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.GlobalRuleKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.GroupKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.LogConfigKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.MessageKey;
import io.apicurio.registry.storage.impl.kafkasql.values.ArtifactRuleValue;
import io.apicurio.registry.storage.impl.kafkasql.values.ArtifactValue;
import io.apicurio.registry.storage.impl.kafkasql.values.ArtifactVersionValue;
import io.apicurio.registry.storage.impl.kafkasql.values.ContentIdValue;
import io.apicurio.registry.storage.impl.kafkasql.values.ContentValue;
import io.apicurio.registry.storage.impl.kafkasql.values.GlobalIdValue;
import io.apicurio.registry.storage.impl.kafkasql.values.GlobalRuleValue;
import io.apicurio.registry.storage.impl.kafkasql.values.GroupValue;
import io.apicurio.registry.storage.impl.kafkasql.values.LogConfigValue;
import io.apicurio.registry.storage.impl.kafkasql.values.MessageValue;
import io.apicurio.registry.storage.impl.sql.GlobalIdGenerator;
import io.apicurio.registry.types.RegistryException;
import io.apicurio.registry.utils.impexp.ArtifactRuleEntity;
import io.apicurio.registry.utils.impexp.ArtifactVersionEntity;
import io.apicurio.registry.utils.impexp.ContentEntity;
import io.apicurio.registry.utils.impexp.GlobalRuleEntity;
import io.apicurio.registry.utils.impexp.GroupEntity;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Logged
/* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.class */
public class KafkaSqlSink {
    private final Logger log = LoggerFactory.getLogger(getClass().getName());

    @Inject
    KafkaSqlCoordinator coordinator;

    @Inject
    KafkaSqlStore sqlStore;

    @Inject
    KafkaSqlConfiguration configuration;

    @Inject
    KafkaSqlSubmitter submitter;

    @Inject
    TenantContext tenantContext;

    public void processMessage(ConsumerRecord<MessageKey, MessageValue> consumerRecord) {
        UUID extractUuid = extractUuid(consumerRecord);
        this.log.debug("Processing Kafka message with UUID: {}", extractUuid.toString());
        try {
            Object doProcessMessage = doProcessMessage(consumerRecord);
            this.log.debug("Kafka message successfully processed. Notifying listeners of response.");
            this.coordinator.notifyResponse(extractUuid, doProcessMessage);
        } catch (RegistryException e) {
            this.log.debug("Registry exception detected: {}", e.getMessage());
            this.coordinator.notifyResponse(extractUuid, e);
        } catch (Throwable th) {
            this.log.debug("Unexpected exception detected: {}", th.getMessage());
            this.coordinator.notifyResponse(extractUuid, new RegistryException(th));
        }
    }

    private UUID extractUuid(ConsumerRecord<MessageKey, MessageValue> consumerRecord) {
        return (UUID) Optional.ofNullable(consumerRecord.headers().headers("req")).map((v0) -> {
            return v0.iterator();
        }).map(it -> {
            if (it.hasNext()) {
                return (Header) it.next();
            }
            return null;
        }).map((v0) -> {
            return v0.value();
        }).map(String::new).map(UUID::fromString).orElse(null);
    }

    private Object doProcessMessage(ConsumerRecord<MessageKey, MessageValue> consumerRecord) {
        MessageKey messageKey = (MessageKey) consumerRecord.key();
        MessageValue messageValue = (MessageValue) consumerRecord.value();
        this.tenantContext.tenantId(messageKey.getTenantId());
        try {
            MessageType type = messageKey.getType();
            switch (type) {
                case Group:
                    Object processGroupMessage = processGroupMessage((GroupKey) messageKey, (GroupValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processGroupMessage;
                case Artifact:
                    Object processArtifactMessage = processArtifactMessage((ArtifactKey) messageKey, (ArtifactValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processArtifactMessage;
                case ArtifactRule:
                    Object processArtifactRuleMessage = processArtifactRuleMessage((ArtifactRuleKey) messageKey, (ArtifactRuleValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processArtifactRuleMessage;
                case ArtifactVersion:
                    Object processArtifactVersion = processArtifactVersion((ArtifactVersionKey) messageKey, (ArtifactVersionValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processArtifactVersion;
                case Content:
                    Object processContent = processContent((ContentKey) messageKey, (ContentValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processContent;
                case GlobalRule:
                    Object processGlobalRule = processGlobalRule((GlobalRuleKey) messageKey, (GlobalRuleValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processGlobalRule;
                case GlobalId:
                    Object processGlobalId = processGlobalId((GlobalIdKey) messageKey, (GlobalIdValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processGlobalId;
                case ContentId:
                    Object processContentId = processContentId((ContentIdKey) messageKey, (ContentIdValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processContentId;
                case LogConfig:
                    Object processLogConfig = processLogConfig((LogConfigKey) messageKey, (LogConfigValue) messageValue);
                    this.tenantContext.clearTenantId();
                    return processLogConfig;
                default:
                    this.log.warn("Unrecognized message type: %s", consumerRecord.key());
                    throw new RegistryStorageException("Unexpected message type: " + type.name());
            }
        } catch (Throwable th) {
            this.tenantContext.clearTenantId();
            throw th;
        }
    }

    private Object processGroupMessage(GroupKey groupKey, GroupValue groupValue) {
        Supplier supplier = () -> {
            return GroupMetaDataDto.builder().groupId(groupKey.getGroupId()).description(groupValue.getDescription()).artifactsType(groupValue.getArtifactsType()).createdBy(groupValue.getCreatedBy()).createdOn(groupValue.getCreatedOn()).modifiedBy(groupValue.getModifiedBy()).modifiedOn(groupValue.getModifiedOn()).properties(groupValue.getProperties()).build();
        };
        switch (groupValue.getAction()) {
            case Create:
                this.sqlStore.createGroup((GroupMetaDataDto) supplier.get());
                return null;
            case Update:
                this.sqlStore.updateGroupMetaData((GroupMetaDataDto) supplier.get());
                return null;
            case Delete:
                if (groupValue.isOnlyArtifacts()) {
                    this.sqlStore.deleteArtifacts(groupKey.getGroupId());
                    return null;
                }
                this.sqlStore.deleteGroup(groupKey.getGroupId());
                return null;
            case Import:
                GroupEntity groupEntity = new GroupEntity();
                groupEntity.artifactsType = groupValue.getArtifactsType();
                groupEntity.createdBy = groupValue.getCreatedBy();
                groupEntity.createdOn = groupValue.getCreatedOn();
                groupEntity.description = groupValue.getDescription();
                groupEntity.groupId = groupKey.getGroupId();
                groupEntity.modifiedBy = groupValue.getModifiedBy();
                groupEntity.modifiedOn = groupValue.getModifiedOn();
                groupEntity.properties = groupValue.getProperties();
                this.sqlStore.importGroup(groupEntity);
                break;
        }
        this.log.warn("Unsupported group message action: %s", groupKey.getType().name());
        throw new RegistryStorageException("Unsupported group message action: " + groupValue.getAction());
    }

    private Object processArtifactMessage(ArtifactKey artifactKey, final ArtifactValue artifactValue) throws RegistryStorageException {
        try {
            GlobalIdGenerator globalIdGenerator = new GlobalIdGenerator() { // from class: io.apicurio.registry.storage.impl.kafkasql.sql.KafkaSqlSink.1
                @Override // io.apicurio.registry.storage.impl.sql.GlobalIdGenerator
                public Long generate() {
                    return artifactValue.getGlobalId();
                }
            };
            switch (artifactValue.getAction()) {
                case Create:
                    return this.sqlStore.createArtifactWithMetadata(artifactKey.getGroupId(), artifactKey.getArtifactId(), artifactValue.getVersion(), artifactValue.getArtifactType(), artifactValue.getContentHash(), artifactValue.getCreatedBy(), artifactValue.getCreatedOn(), artifactValue.getMetaData(), globalIdGenerator);
                case Update:
                    return this.sqlStore.updateArtifactWithMetadata(artifactKey.getGroupId(), artifactKey.getArtifactId(), artifactValue.getVersion(), artifactValue.getArtifactType(), artifactValue.getContentHash(), artifactValue.getCreatedBy(), artifactValue.getCreatedOn(), artifactValue.getMetaData(), globalIdGenerator);
                case Delete:
                    return this.sqlStore.deleteArtifact(artifactKey.getGroupId(), artifactKey.getArtifactId());
                case Import:
                    ArtifactVersionEntity artifactVersionEntity = new ArtifactVersionEntity();
                    artifactVersionEntity.globalId = artifactValue.getGlobalId().longValue();
                    artifactVersionEntity.groupId = artifactKey.getGroupId();
                    artifactVersionEntity.artifactId = artifactKey.getArtifactId();
                    artifactVersionEntity.version = artifactValue.getVersion();
                    artifactVersionEntity.versionId = artifactValue.getVersionId().intValue();
                    artifactVersionEntity.artifactType = artifactValue.getArtifactType();
                    artifactVersionEntity.state = artifactValue.getState();
                    artifactVersionEntity.name = artifactValue.getMetaData().getName();
                    artifactVersionEntity.description = artifactValue.getMetaData().getDescription();
                    artifactVersionEntity.createdBy = artifactValue.getCreatedBy();
                    artifactVersionEntity.createdOn = artifactValue.getCreatedOn().getTime();
                    artifactVersionEntity.labels = artifactValue.getMetaData().getLabels();
                    artifactVersionEntity.properties = artifactValue.getMetaData().getProperties();
                    artifactVersionEntity.isLatest = artifactValue.getLatest().booleanValue();
                    artifactVersionEntity.contentId = artifactValue.getContentId().longValue();
                    this.sqlStore.importArtifactVersion(artifactVersionEntity);
                    return null;
                default:
                    this.log.warn("Unsupported artifact message action: %s", artifactKey.getType().name());
                    throw new RegistryStorageException("Unsupported artifact message action: " + artifactValue.getAction());
            }
        } catch (ArtifactAlreadyExistsException | ArtifactNotFoundException e) {
            this.submitter.send(artifactKey, null);
            throw e;
        }
    }

    private Object processArtifactRuleMessage(ArtifactRuleKey artifactRuleKey, ArtifactRuleValue artifactRuleValue) {
        switch (artifactRuleValue.getAction()) {
            case Create:
                return this.sqlStore.createArtifactRuleAsync(artifactRuleKey.getGroupId(), artifactRuleKey.getArtifactId(), artifactRuleKey.getRuleType(), artifactRuleValue.getConfig());
            case Update:
                this.sqlStore.updateArtifactRule(artifactRuleKey.getGroupId(), artifactRuleKey.getArtifactId(), artifactRuleKey.getRuleType(), artifactRuleValue.getConfig());
                return null;
            case Delete:
                this.sqlStore.deleteArtifactRule(artifactRuleKey.getGroupId(), artifactRuleKey.getArtifactId(), artifactRuleKey.getRuleType());
                return null;
            case Import:
                ArtifactRuleEntity artifactRuleEntity = new ArtifactRuleEntity();
                artifactRuleEntity.groupId = artifactRuleKey.getGroupId();
                artifactRuleEntity.artifactId = artifactRuleKey.getArtifactId();
                artifactRuleEntity.type = artifactRuleKey.getRuleType();
                artifactRuleEntity.configuration = artifactRuleValue.getConfig().getConfiguration();
                this.sqlStore.importArtifactRule(artifactRuleEntity);
                break;
        }
        this.log.warn("Unsupported artifact rule message action: %s", artifactRuleKey.getType().name());
        throw new RegistryStorageException("Unsupported artifact-rule message action: " + artifactRuleValue.getAction());
    }

    private Object processArtifactVersion(ArtifactVersionKey artifactVersionKey, ArtifactVersionValue artifactVersionValue) {
        switch (artifactVersionValue.getAction()) {
            case Create:
            case Import:
            default:
                this.log.warn("Unsupported artifact version message action: %s", artifactVersionKey.getType().name());
                throw new RegistryStorageException("Unsupported artifact-version message action: " + artifactVersionValue.getAction());
            case Update:
                this.sqlStore.updateArtifactVersionMetaDataAndState(artifactVersionKey.getGroupId(), artifactVersionKey.getArtifactId(), artifactVersionKey.getVersion(), artifactVersionValue.getMetaData(), artifactVersionValue.getState());
                return null;
            case Delete:
                this.sqlStore.deleteArtifactVersion(artifactVersionKey.getGroupId(), artifactVersionKey.getArtifactId(), artifactVersionKey.getVersion());
                return null;
            case Clear:
                this.sqlStore.deleteArtifactVersionMetaData(artifactVersionKey.getGroupId(), artifactVersionKey.getArtifactId(), artifactVersionKey.getVersion());
                return null;
        }
    }

    private Object processContent(ContentKey contentKey, ContentValue contentValue) {
        switch (contentValue.getAction()) {
            case Create:
                if (this.sqlStore.isContentExists(contentKey.getContentHash())) {
                    return null;
                }
                this.sqlStore.storeContent(contentKey.getContentId(), contentKey.getContentHash(), contentValue.getCanonicalHash(), contentValue.getContent());
                return null;
            case Import:
                if (this.sqlStore.isContentExists(contentKey.getContentId())) {
                    return null;
                }
                ContentEntity contentEntity = new ContentEntity();
                contentEntity.contentId = contentKey.getContentId();
                contentEntity.contentHash = contentKey.getContentHash();
                contentEntity.canonicalHash = contentValue.getCanonicalHash();
                contentEntity.contentBytes = contentValue.getContent().bytes();
                this.sqlStore.importContent(contentEntity);
                return null;
            default:
                this.log.warn("Unsupported content message action: %s", contentKey.getType().name());
                throw new RegistryStorageException("Unsupported content message action: " + contentValue.getAction());
        }
    }

    private Object processGlobalRule(GlobalRuleKey globalRuleKey, GlobalRuleValue globalRuleValue) {
        switch (globalRuleValue.getAction()) {
            case Create:
                this.sqlStore.createGlobalRule(globalRuleKey.getRuleType(), globalRuleValue.getConfig());
                return null;
            case Update:
                this.sqlStore.updateGlobalRule(globalRuleKey.getRuleType(), globalRuleValue.getConfig());
                return null;
            case Delete:
                this.sqlStore.deleteGlobalRule(globalRuleKey.getRuleType());
                return null;
            case Import:
                GlobalRuleEntity globalRuleEntity = new GlobalRuleEntity();
                globalRuleEntity.ruleType = globalRuleKey.getRuleType();
                globalRuleEntity.configuration = globalRuleValue.getConfig().getConfiguration();
                this.sqlStore.importGlobalRule(globalRuleEntity);
                return null;
            default:
                this.log.warn("Unsupported global rule message action: %s", globalRuleKey.getType().name());
                throw new RegistryStorageException("Unsupported global-rule message action: " + globalRuleValue.getAction());
        }
    }

    private Object processGlobalId(GlobalIdKey globalIdKey, GlobalIdValue globalIdValue) {
        switch (globalIdValue.getAction()) {
            case Create:
                return Long.valueOf(this.sqlStore.nextGlobalId());
            case Reset:
                this.sqlStore.resetGlobalId();
                return null;
            default:
                this.log.warn("Unsupported global id message action: %s", globalIdKey.getType().name());
                throw new RegistryStorageException("Unsupported global-id message action: " + globalIdValue.getAction());
        }
    }

    private Object processContentId(ContentIdKey contentIdKey, ContentIdValue contentIdValue) {
        switch (contentIdValue.getAction()) {
            case Create:
                return Long.valueOf(this.sqlStore.nextContentId());
            case Reset:
                this.sqlStore.resetContentId();
                return null;
            default:
                this.log.warn("Unsupported content id message action: %s", contentIdKey.getType().name());
                throw new RegistryStorageException("Unsupported content-id message action: " + contentIdValue.getAction());
        }
    }

    private Object processLogConfig(LogConfigKey logConfigKey, LogConfigValue logConfigValue) {
        switch (logConfigValue.getAction()) {
            case Update:
                this.sqlStore.setLogConfiguration(logConfigValue.getConfig());
                return null;
            case Delete:
                this.sqlStore.removeLogConfiguration(logConfigValue.getConfig().getLogger());
                return null;
            default:
                this.log.warn("Unsupported log config message action: %s", logConfigKey.getType().name());
                throw new RegistryStorageException("Unsupported log config message action: " + logConfigValue.getAction());
        }
    }
}
