package org.infinispan.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.protostream.annotations.ProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoSchemaBuilderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/infinispan/kafka/InfinispanSinkTask.class */
public class InfinispanSinkTask extends SinkTask {
    private static Logger log = LoggerFactory.getLogger(InfinispanSinkTask.class);
    private RemoteCacheManager cacheManager;
    private RemoteCache<Object, Object> cache;
    private InfinispanSinkConnectorConfig config;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        this.config = new InfinispanSinkConnectorConfig(map);
        setupRemoteCache();
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        boolean booleanValue = this.config.getBoolean(InfinispanSinkConnectorConfig.INFINISPAN_USE_PROTO_CONF).booleanValue();
        log.info("Received {} records", Integer.valueOf(collection.size()));
        for (SinkRecord sinkRecord : collection) {
            log.info("Record kafka coordinates:({}-{}-{}). Writing it to Infinispan...", new Object[]{sinkRecord.topic(), sinkRecord.key(), sinkRecord.value()});
            storeEntry(booleanValue, sinkRecord);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        this.cacheManager.stop();
    }

    private void setupRemoteCache() {
        String string = this.config.getString(InfinispanSinkConnectorConfig.INFINISPAN_HOTROD_PROTOCOL_VERSION_CONF);
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.addServer().host(this.config.getString(InfinispanSinkConnectorConfig.INFINISPAN_CONNECTION_HOSTS_CONF)).port(this.config.getInt(InfinispanSinkConnectorConfig.INFINISPAN_CONNECTION_HOTROD_PORT_CONF).intValue()).socketTimeout(this.config.getInt(InfinispanSinkConnectorConfig.INFINISPAN_HOTROD_SOCKET_TIMEOUT_CONF).intValue()).connectionTimeout(this.config.getInt(InfinispanSinkConnectorConfig.INFINISPAN_HOTROD_CONNECT_TIMEOUT_CONF).intValue()).maxRetries(this.config.getInt(InfinispanSinkConnectorConfig.INFINISPAN_HOTROD_MAX_RETRIES_CONF).intValue()).version(ProtocolVersion.valueOf(string));
        boolean booleanValue = this.config.getBoolean(InfinispanSinkConnectorConfig.INFINISPAN_USE_PROTO_CONF).booleanValue();
        if (booleanValue) {
            log.info("Adding protostream");
            configurationBuilder.marshaller(new ProtoStreamMarshaller());
        }
        configurationBuilder.forceReturnValues(this.config.getBoolean(InfinispanSinkConnectorConfig.INFINISPAN_CACHE_FORCE_RETURN_VALUES_CONF).booleanValue());
        this.cacheManager = new RemoteCacheManager(configurationBuilder.build());
        if (booleanValue) {
            String str = null;
            try {
                str = new ProtoSchemaBuilder().fileName("file.proto").packageName("test").addClass(this.config.getClass(InfinispanSinkConnectorConfig.INFINISPAN_PROTO_MARSHALLER_CLASS_CONF)).build(ProtoStreamMarshaller.getSerializationContext(this.cacheManager));
            } catch (ProtoSchemaBuilderException | IOException e) {
                log.error("Error during building of Protostream Schema {}", e.getMessage());
                e.printStackTrace();
            }
            this.cacheManager.getCache("___protobuf_metadata").put("file.proto", str);
        }
        this.cache = this.cacheManager.getCache(this.config.getString(InfinispanSinkConnectorConfig.INFINISPAN_CONNECTION_CACHE_NAME_CONF));
    }

    private void storeEntry(boolean z, SinkRecord sinkRecord) {
        ObjectMapper objectMapper = new ObjectMapper();
        Class cls = this.config.getClass(InfinispanSinkConnectorConfig.INFINISPAN_PROTO_MARSHALLER_CLASS_CONF);
        boolean booleanValue = this.config.getBoolean(InfinispanSinkConnectorConfig.INFINISPAN_USE_LIFESPAN_CONF).booleanValue();
        boolean booleanValue2 = this.config.getBoolean(InfinispanSinkConnectorConfig.INFINISPAN_USE_MAX_IDLE_CONF).booleanValue();
        long longValue = this.config.getLong(InfinispanSinkConnectorConfig.INFINISPAN_LIFESPAN_ENTRY_CONF).longValue();
        long longValue2 = this.config.getLong(InfinispanSinkConnectorConfig.INFINISPAN_MAX_IDLE_ENTRY_CONF).longValue();
        Object obj = null;
        if (!z) {
            Object put = (booleanValue || booleanValue2) ? (!booleanValue || booleanValue2) ? this.cache.put(sinkRecord.key(), sinkRecord.value(), longValue, TimeUnit.SECONDS, longValue2, TimeUnit.SECONDS) : this.cache.put(sinkRecord.key(), sinkRecord.value(), longValue, TimeUnit.SECONDS) : this.cache.put(sinkRecord.key(), sinkRecord.value());
            if (put != null) {
                log.info("The put operation returned the following result: {}", put);
                return;
            }
            return;
        }
        try {
            obj = objectMapper.readValue((String) sinkRecord.value(), cls);
        } catch (IOException e) {
            log.error("Error during Deserialization of value {}", e.getMessage());
            e.printStackTrace();
        }
        Object put2 = (booleanValue || booleanValue2) ? (!booleanValue || booleanValue2) ? this.cache.put(sinkRecord.key(), obj, longValue, TimeUnit.SECONDS, longValue2, TimeUnit.SECONDS) : this.cache.put(sinkRecord.key(), obj, longValue, TimeUnit.SECONDS) : this.cache.put(sinkRecord.key(), obj);
        if (put2 != null) {
            log.info("The put operation returned the following result: {}", put2);
        }
    }
}
