/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.kafka.InfinispanSinkConnectorConfig;
import org.infinispan.kafka.VersionUtil;
import org.infinispan.protostream.SerializationContext;
import org.infinispan.protostream.annotations.ProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoSchemaBuilderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfinispanSinkTask
extends SinkTask {
    private static Logger log = LoggerFactory.getLogger(InfinispanSinkTask.class);
    private RemoteCacheManager cacheManager;
    private RemoteCache<Object, Object> cache;
    private InfinispanSinkConnectorConfig config;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        this.config = new InfinispanSinkConnectorConfig(map);
        this.setupRemoteCache();
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        boolean useProto = this.config.getBoolean("infinispan.use.proto");
        int recordsCount = collection.size();
        log.info("Received {} records", (Object)recordsCount);
        for (SinkRecord record : collection) {
            log.info("Record kafka coordinates:({}-{}-{}). Writing it to Infinispan...", new Object[]{record.topic(), record.key(), record.value()});
            this.storeEntry(useProto, record);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        this.cacheManager.stop();
    }

    private void setupRemoteCache() {
        String version = this.config.getString("infinispan.hotrod.protocol.version");
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServer().host(this.config.getString("infinispan.connection.hosts")).port(this.config.getInt("infinispan.connection.hotrod.port").intValue()).socketTimeout(this.config.getInt("infinispan.hotrod.socket_timeout").intValue()).connectionTimeout(this.config.getInt("infinispan.hotrod.connect_timeout").intValue()).maxRetries(this.config.getInt("infinispan.hotrod.max_retries").intValue()).version(ProtocolVersion.valueOf((String)version));
        boolean useProto = this.config.getBoolean("infinispan.use.proto");
        if (useProto) {
            log.info("Adding protostream");
            builder.marshaller((Marshaller)new ProtoStreamMarshaller());
        }
        builder.forceReturnValues(this.config.getBoolean("infinispan.cache.force.return.values").booleanValue());
        this.cacheManager = new RemoteCacheManager(builder.build());
        if (useProto) {
            SerializationContext serCtx = ProtoStreamMarshaller.getSerializationContext((RemoteCacheManager)this.cacheManager);
            ProtoSchemaBuilder protoSchemaBuilder = new ProtoSchemaBuilder();
            Class marshaller = this.config.getClass("infinispan.proto.marshaller.class");
            String memoSchemaFile = null;
            try {
                memoSchemaFile = protoSchemaBuilder.fileName("file.proto").packageName("test").addClass(marshaller).build(serCtx);
            }
            catch (IOException | ProtoSchemaBuilderException e) {
                log.error("Error during building of Protostream Schema {}", (Object)e.getMessage());
                e.printStackTrace();
            }
            RemoteCache metadataCache = this.cacheManager.getCache("___protobuf_metadata");
            metadataCache.put((Object)"file.proto", (Object)memoSchemaFile);
        }
        this.cache = this.cacheManager.getCache(this.config.getString("infinispan.connection.cache.name"));
    }

    private void storeEntry(boolean useProto, SinkRecord record) {
        ObjectMapper objectMapper = new ObjectMapper();
        Class marshaller = this.config.getClass("infinispan.proto.marshaller.class");
        boolean useLifespan = this.config.getBoolean("infinispan.use.lifespan");
        boolean useMaxIdle = this.config.getBoolean("infinispan.use.maxidle");
        long lifespan = this.config.getLong("infinispan.cache.lifespan.entry");
        long maxIdle = this.config.getLong("infinispan.cache.maxidle.entry");
        Object p = null;
        if (useProto) {
            try {
                p = objectMapper.readValue((String)record.value(), marshaller);
            }
            catch (IOException e) {
                log.error("Error during Deserialization of value {}", (Object)e.getMessage());
                e.printStackTrace();
            }
            Object returnValue = !useLifespan && !useMaxIdle ? this.cache.put(record.key(), p) : (useLifespan && !useMaxIdle ? this.cache.put(record.key(), p, lifespan, TimeUnit.SECONDS) : this.cache.put(record.key(), p, lifespan, TimeUnit.SECONDS, maxIdle, TimeUnit.SECONDS));
            if (returnValue != null) {
                log.info("The put operation returned the following result: {}", returnValue);
            }
        } else {
            Object returnValue = !useLifespan && !useMaxIdle ? this.cache.put(record.key(), record.value()) : (useLifespan && !useMaxIdle ? this.cache.put(record.key(), record.value(), lifespan, TimeUnit.SECONDS) : this.cache.put(record.key(), record.value(), lifespan, TimeUnit.SECONDS, maxIdle, TimeUnit.SECONDS));
            if (returnValue != null) {
                log.info("The put operation returned the following result: {}", returnValue);
            }
        }
    }
}

