/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.jdbc.stringbased;

import io.reactivex.rxjava3.core.Flowable;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.transaction.Transaction;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.io.ByteBuffer;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.StreamAwareMarshaller;
import org.infinispan.commons.persistence.Store;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.AbstractIterator;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.Util;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.marshall.persistence.PersistenceMarshaller;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.PrivateMetadata;
import org.infinispan.persistence.jdbc.JdbcUtil;
import org.infinispan.persistence.jdbc.configuration.JdbcStringBasedStoreConfiguration;
import org.infinispan.persistence.jdbc.connectionfactory.ConnectionFactory;
import org.infinispan.persistence.jdbc.impl.table.TableManager;
import org.infinispan.persistence.jdbc.impl.table.TableManagerFactory;
import org.infinispan.persistence.keymappers.Key2StringMapper;
import org.infinispan.persistence.keymappers.TwoWayKey2StringMapper;
import org.infinispan.persistence.keymappers.UnsupportedKeyTypeException;
import org.infinispan.persistence.spi.AdvancedCacheWriter;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.spi.MarshalledValue;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.spi.SegmentedAdvancedLoadWriteStore;
import org.infinispan.persistence.spi.TransactionalCacheWriter;
import org.infinispan.persistence.support.BatchModification;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Store(shared=true)
@ConfiguredBy(value=JdbcStringBasedStoreConfiguration.class)
public class JdbcStringBasedStore<K, V>
implements SegmentedAdvancedLoadWriteStore<K, V>,
TransactionalCacheWriter<K, V> {
    private static final org.infinispan.persistence.jdbc.logging.Log log = (org.infinispan.persistence.jdbc.logging.Log)LogFactory.getLog(JdbcStringBasedStore.class, org.infinispan.persistence.jdbc.logging.Log.class);
    private final Map<Transaction, Connection> transactionConnectionMap = new ConcurrentHashMap<Transaction, Connection>();
    private JdbcStringBasedStoreConfiguration configuration;
    private InitializationContext ctx;
    private Key2StringMapper key2StringMapper;
    private ConnectionFactory connectionFactory;
    private MarshallableEntryFactory<K, V> marshalledEntryFactory;
    private PersistenceMarshaller marshaller;
    private TableManager tableManager;
    private TimeService timeService;
    private KeyPartitioner keyPartitioner;

    public void init(InitializationContext ctx) {
        this.ctx = ctx;
        this.configuration = (JdbcStringBasedStoreConfiguration)ctx.getConfiguration();
        this.marshalledEntryFactory = ctx.getMarshallableEntryFactory();
        this.marshaller = ctx.getPersistenceMarshaller();
        this.timeService = ctx.getTimeService();
        this.keyPartitioner = this.configuration.segmented() ? ctx.getKeyPartitioner() : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        block16: {
            String cacheName = this.ctx.getCache().getName();
            if (this.configuration.manageConnectionFactory()) {
                ConnectionFactory factory = ConnectionFactory.getConnectionFactory(this.configuration.connectionFactory().connectionFactoryClass());
                factory.start(this.configuration.connectionFactory(), factory.getClass().getClassLoader());
                this.connectionFactory = factory;
                this.tableManager = this.getTableManager(cacheName);
                this.tableManager.start();
            }
            if (!this.configuration.table().createOnStart()) {
                Connection connection = null;
                try {
                    connection = this.connectionFactory.getConnection();
                    if (this.tableManager.metaTableExists(connection)) {
                        TableManager.Metadata meta = this.tableManager.getMetadata(connection);
                        int storedSegments = meta.getSegments();
                        if (!this.configuration.segmented() && storedSegments != -1) {
                            throw log.existingStoreNoSegmentation();
                        }
                        int configuredSegments = this.ctx.getCache().getCacheConfiguration().clustering().hash().numSegments();
                        if (this.configuration.segmented() && storedSegments != configuredSegments) {
                            throw log.existingStoreSegmentMismatch(storedSegments, configuredSegments);
                        }
                        this.tableManager.updateMetaTable(connection);
                        break block16;
                    }
                    Log.PERSISTENCE.startMigratingPersistenceData(cacheName);
                    try {
                        this.migrateFromV11();
                    }
                    catch (SQLException e) {
                        throw Log.PERSISTENCE.persistedDataMigrationFailed(cacheName, (Throwable)e);
                    }
                    this.tableManager.createMetaTable(connection);
                    Log.PERSISTENCE.persistedDataSuccessfulMigrated(cacheName);
                }
                finally {
                    this.connectionFactory.releaseConnection(connection);
                }
            }
        }
        try {
            Object mapper = Util.loadClassStrict((String)this.configuration.key2StringMapper(), (ClassLoader)this.ctx.getGlobalConfiguration().classLoader()).newInstance();
            if (mapper instanceof Key2StringMapper) {
                this.key2StringMapper = (Key2StringMapper)mapper;
            }
        }
        catch (Exception e) {
            log.errorf("Trying to instantiate %s, however it failed due to %s", this.configuration.key2StringMapper(), e.getClass().getName());
            throw new IllegalStateException("This should not happen.", e);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Using key2StringMapper: %s", this.key2StringMapper.getClass().getName());
        }
        if (this.configuration.preload()) {
            this.enforceTwoWayMapper("preload");
        }
        if (this.ctx.getCache().getCacheConfiguration() != null && this.ctx.getCache().getCacheConfiguration().clustering().cacheMode().isDistributed()) {
            this.enforceTwoWayMapper("distribution/rehashing");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void migrateFromV11() throws SQLException {
        if (this.ctx.getGlobalConfiguration().serialization().marshaller() != null) {
            return;
        }
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = this.connectionFactory.getConnection();
            conn.setAutoCommit(false);
            String sql = this.tableManager.getLoadNonExpiredAllRowsSql();
            ps = conn.prepareStatement(sql);
            ps.setLong(1, this.timeService.wallClockTime());
            rs = ps.executeQuery();
            Marshaller userMarshaller = this.marshaller.getUserMarshaller();
            try (PreparedStatement upsertBatch = conn.prepareStatement(this.tableManager.getUpdateRowSql());){
                int batchSize = 0;
                while (rs.next()) {
                    Metadata meta;
                    ++batchSize;
                    InputStream inputStream = rs.getBinaryStream(1);
                    String keyStr = rs.getString(2);
                    long timestamp = rs.getLong(3);
                    int segment = this.keyPartitioner == null ? -1 : rs.getInt(4);
                    MarshalledValue mv = (MarshalledValue)JdbcUtil.unmarshall(inputStream, (StreamAwareMarshaller)this.marshaller);
                    Object value = JdbcUtil.unmarshall(mv.getValueBytes(), userMarshaller);
                    try {
                        meta = (Metadata)JdbcUtil.unmarshall(mv.getMetadataBytes(), userMarshaller);
                    }
                    catch (IllegalArgumentException e) {
                        meta = (Metadata)JdbcUtil.unmarshall(mv.getMetadataBytes(), (Marshaller)this.marshaller);
                    }
                    PrivateMetadata internalMeta = (PrivateMetadata)JdbcUtil.unmarshall(mv.getInternalMetadataBytes(), (Marshaller)this.marshaller);
                    MarshallableEntry entry = this.marshalledEntryFactory.create(null, value, meta, internalMeta, mv.getCreated(), mv.getLastUsed());
                    ByteBuffer byteBuffer = JdbcUtil.marshall(entry.getMarshalledValue(), (Marshaller)this.marshaller);
                    this.tableManager.prepareUpdateStatement(upsertBatch, keyStr, timestamp, segment, byteBuffer);
                    upsertBatch.addBatch();
                    if (batchSize != this.configuration.maxBatchSize()) continue;
                    batchSize = 0;
                    upsertBatch.executeBatch();
                    upsertBatch.clearBatch();
                }
                if (batchSize != 0) {
                    upsertBatch.executeBatch();
                }
                conn.commit();
            }
        }
        catch (Throwable throwable) {
            JdbcUtil.safeClose(rs);
            JdbcUtil.safeClose(ps);
            this.connectionFactory.releaseConnection(conn);
            throw throwable;
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
    }

    public void stop() {
        try {
            if (this.tableManager != null) {
                this.tableManager.stop();
                this.tableManager = null;
            }
        }
        catch (Throwable t) {
            log.debug("Exception while stopping", t);
        }
        try {
            log.tracef("Stopping connection factory: %s", this.connectionFactory);
            if (this.connectionFactory != null) {
                this.connectionFactory.stop();
            }
        }
        catch (Throwable t) {
            log.debug("Exception while stopping", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isAvailable() {
        if (this.tableManager == null || this.connectionFactory == null) {
            return false;
        }
        Connection connection = null;
        try {
            connection = this.connectionFactory.getConnection();
            boolean bl = connection != null && connection.isValid(10);
            return bl;
        }
        catch (SQLException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    void setTableManager(TableManager tableManager) {
        this.tableManager = tableManager;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    private int getSegment(MarshallableEntry entry) {
        if (this.keyPartitioner == null) {
            return -1;
        }
        return this.keyPartitioner.getSegment(entry.getKey());
    }

    public void write(int segment, MarshallableEntry<? extends K, ? extends V> entry) {
        this.write(entry);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(MarshallableEntry<? extends K, ? extends V> entry) {
        Connection connection = null;
        String keyStr = this.key2Str(entry.getKey());
        try {
            connection = this.connectionFactory.getConnection();
            this.write(entry, connection, keyStr, this.getSegment(entry));
        }
        catch (SQLException ex) {
            org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureStoringKey(keyStr, ex);
            throw new PersistenceException(String.format("Error while storing string key to database; key: '%s'", keyStr), (Throwable)ex);
        }
        catch (InterruptedException e) {
            if (log.isTraceEnabled()) {
                log.trace("Interrupted while marshalling to store");
            }
            Thread.currentThread().interrupt();
        }
        finally {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    private void write(MarshallableEntry entry, Connection connection, int segment) throws SQLException, InterruptedException {
        this.write(entry, connection, this.key2Str(entry.getKey()), segment);
    }

    private void write(MarshallableEntry entry, Connection connection, String keyStr, int segment) throws SQLException, InterruptedException {
        if (this.tableManager.isUpsertSupported()) {
            this.executeUpsert(connection, entry, keyStr, segment);
        } else {
            this.executeLegacyUpdate(connection, entry, keyStr, segment);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeUpsert(Connection connection, MarshallableEntry entry, String keyStr, int segment) throws InterruptedException, SQLException {
        PreparedStatement ps = null;
        String sql = this.tableManager.getUpsertRowSql();
        if (log.isTraceEnabled()) {
            log.tracef("Running sql '%s'. Key string is '%s'", sql, keyStr);
        }
        try {
            ps = connection.prepareStatement(sql);
            ps.setQueryTimeout(this.configuration.writeQueryTimeout());
            this.prepareUpsertStatement(entry, keyStr, segment, ps);
            ps.executeUpdate();
        }
        finally {
            JdbcUtil.safeClose(ps);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeLegacyUpdate(Connection connection, MarshallableEntry entry, String keyStr, int segment) throws SQLException {
        String sql = this.tableManager.getSelectIdRowSql();
        if (log.isTraceEnabled()) {
            log.tracef("Running sql '%s'. Key string is '%s'", sql, keyStr);
        }
        PreparedStatement ps = null;
        try {
            ps = connection.prepareStatement(sql);
            ps.setQueryTimeout(this.configuration.writeQueryTimeout());
            ps.setString(1, keyStr);
            ResultSet rs = ps.executeQuery();
            boolean update = rs.next();
            sql = update ? this.tableManager.getUpdateRowSql() : this.tableManager.getInsertRowSql();
            JdbcUtil.safeClose(rs);
            JdbcUtil.safeClose(ps);
            if (log.isTraceEnabled()) {
                log.tracef("Running sql '%s'. Key string is '%s'", sql, keyStr);
            }
            ps = connection.prepareStatement(sql);
            ps.setQueryTimeout(this.configuration.writeQueryTimeout());
            this.prepareStatement(entry, keyStr, segment, ps, !update);
            ps.executeUpdate();
        }
        finally {
            JdbcUtil.safeClose(ps);
        }
    }

    public CompletionStage<Void> bulkUpdate(Publisher<MarshallableEntry<? extends K, ? extends V>> publisher) {
        if (!this.tableManager.isUpsertSupported()) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            Flowable.fromPublisher(publisher).doOnNext(this::write).subscribe(RxJavaInterop.emptyConsumer(), future::completeExceptionally, () -> future.complete(null));
            return future;
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Flowable.using(() -> {
            Connection connection = this.connectionFactory.getConnection();
            PreparedStatement upsertBatch = connection.prepareStatement(this.tableManager.getUpsertRowSql());
            upsertBatch.setQueryTimeout(this.configuration.writeQueryTimeout());
            return new KeyValuePair((Object)connection, (Object)upsertBatch);
        }, kvp -> this.createBatchFlowable((PreparedStatement)kvp.getValue(), publisher), kvp -> {
            JdbcUtil.safeClose((Statement)kvp.getValue());
            this.connectionFactory.releaseConnection((Connection)kvp.getKey());
        }).subscribe(RxJavaInterop.emptyConsumer(), future::completeExceptionally, () -> future.complete(null));
        return future;
    }

    private Flowable<List<MarshallableEntry<? extends K, ? extends V>>> createBatchFlowable(PreparedStatement upsertBatch, Publisher<MarshallableEntry<? extends K, ? extends V>> publisher) {
        return Flowable.fromPublisher(publisher).buffer(this.configuration.maxBatchSize()).doOnNext(entries -> {
            for (MarshallableEntry entry : entries) {
                String keyStr = this.key2Str(entry.getKey());
                this.prepareUpsertStatement(entry, keyStr, this.getSegment(entry), upsertBatch);
                upsertBatch.addBatch();
            }
            upsertBatch.executeBatch();
            upsertBatch.clearBatch();
        }).doOnError(e -> {
            throw org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureWritingBatch((Throwable)e);
        });
    }

    public void deleteBatch(Iterable<Object> keys) {
        Connection connection = null;
        try {
            connection = this.connectionFactory.getConnection();
            try (PreparedStatement deleteBatch = connection.prepareStatement(this.tableManager.getDeleteRowSql());){
                int batchSize = 0;
                for (Object key : keys) {
                    String keyStr = this.key2Str(key);
                    deleteBatch.setString(1, keyStr);
                    deleteBatch.addBatch();
                    if (++batchSize != this.configuration.maxBatchSize()) continue;
                    batchSize = 0;
                    deleteBatch.executeBatch();
                    deleteBatch.clearBatch();
                }
                if (batchSize != 0) {
                    deleteBatch.executeBatch();
                }
            }
        }
        catch (SQLException e) {
            throw org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureDeletingBatch(keys, e);
        }
        finally {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    public MarshallableEntry<K, V> get(int segment, Object key) {
        return this.loadEntry(key);
    }

    public MarshallableEntry<K, V> loadEntry(Object key) {
        String lockingKey = this.key2Str(key);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        MarshallableEntry entry = null;
        try {
            String sql = this.tableManager.getSelectRowSql();
            conn = this.connectionFactory.getConnection();
            ps = conn.prepareStatement(sql);
            ps.setQueryTimeout(this.configuration.readQueryTimeout());
            ps.setString(1, lockingKey);
            rs = ps.executeQuery();
            if (rs.next()) {
                InputStream inputStream = rs.getBinaryStream(2);
                MarshalledValue value = (MarshalledValue)JdbcUtil.unmarshall(inputStream, (StreamAwareMarshaller)this.marshaller);
                entry = this.marshalledEntryFactory.create(key, value);
            }
        }
        catch (SQLException e) {
            try {
                org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureReadingKey(key, lockingKey, e);
                throw new PersistenceException(String.format("SQL error while fetching stored entry with key: %s, lockingKey: %s", key, lockingKey), (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
        if (entry != null && entry.getMetadata() != null && entry.isExpired(this.timeService.wallClockTime())) {
            return null;
        }
        return entry;
    }

    public void clear() {
        Connection conn = null;
        Statement statement = null;
        try {
            String sql = this.tableManager.getDeleteAllRowsSql();
            conn = this.connectionFactory.getConnection();
            statement = conn.createStatement();
            int result = statement.executeUpdate(sql);
            if (log.isTraceEnabled()) {
                log.tracef("Successfully removed %d rows.", result);
            }
        }
        catch (SQLException ex) {
            try {
                org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.failedClearingJdbcCacheStore(ex);
                throw new PersistenceException("Failed clearing cache store", (Throwable)ex);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(statement);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(statement);
        this.connectionFactory.releaseConnection(conn);
    }

    public void clear(IntSet segments) {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            String sql = this.tableManager.getDeleteRowsSqlForSegments(segments.size());
            conn = this.connectionFactory.getConnection();
            ps = conn.prepareStatement(sql);
            int offset = 0;
            PrimitiveIterator.OfInt segIter = segments.iterator();
            while (segIter.hasNext()) {
                ps.setInt(++offset, segIter.nextInt());
            }
            int result = ps.executeUpdate();
            if (log.isTraceEnabled()) {
                log.tracef("Successfully removed %d rows.", result);
            }
        }
        catch (SQLException ex) {
            try {
                org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.failedClearingJdbcCacheStore(ex);
                throw new PersistenceException("Failed clearing cache store when using segments " + segments, (Throwable)ex);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
    }

    public boolean delete(int segment, Object key) {
        return this.delete(key);
    }

    public boolean delete(Object key) {
        boolean bl;
        Connection connection = null;
        PreparedStatement ps = null;
        String keyStr = this.key2Str(key);
        try {
            String sql = this.tableManager.getDeleteRowSql();
            if (log.isTraceEnabled()) {
                log.tracef("Running sql '%s' on %s", sql, keyStr);
            }
            connection = this.connectionFactory.getConnection();
            ps = connection.prepareStatement(sql);
            ps.setQueryTimeout(this.configuration.writeQueryTimeout());
            ps.setString(1, keyStr);
            bl = ps.executeUpdate() == 1;
        }
        catch (SQLException ex) {
            try {
                org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureRemovingKeys(ex);
                throw new PersistenceException("Error while removing string keys from database", (Throwable)ex);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(connection);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(connection);
        return bl;
    }

    public void purge(Executor executor, AdvancedCacheWriter.PurgeListener purgeListener) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            String sql = this.tableManager.getSelectOnlyExpiredRowsSql();
            conn = this.connectionFactory.getConnection();
            conn.setAutoCommit(false);
            ps = conn.prepareStatement(sql);
            ps.setLong(1, this.timeService.wallClockTime());
            rs = ps.executeQuery();
            try (PreparedStatement batchDelete = conn.prepareStatement(this.tableManager.getDeleteRowSql());){
                int affectedRows = 0;
                boolean twoWayMapperExists = this.key2StringMapper instanceof TwoWayKey2StringMapper;
                while (rs.next()) {
                    ++affectedRows;
                    String keyStr = rs.getString(2);
                    batchDelete.setString(1, keyStr);
                    batchDelete.addBatch();
                    if (!twoWayMapperExists || purgeListener == null) continue;
                    Object key = ((TwoWayKey2StringMapper)this.key2StringMapper).getKeyMapping(keyStr);
                    purgeListener.entryPurged(key);
                }
                if (!twoWayMapperExists) {
                    org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.twoWayKey2StringMapperIsMissing(TwoWayKey2StringMapper.class.getSimpleName());
                }
                if (affectedRows > 0) {
                    int[] result = batchDelete.executeBatch();
                    if (log.isTraceEnabled()) {
                        log.tracef("Successfully purged %d rows.", result.length);
                    }
                }
                conn.commit();
            }
        }
        catch (SQLException e) {
            try {
                log.failedClearingJdbcCacheStore(e);
                try {
                    conn.rollback();
                }
                catch (SQLException ex) {
                    log.sqlFailureTxRollback(ex);
                }
                throw new PersistenceException("Failed clearing string based JDBC store", (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
    }

    public boolean contains(int segment, Object key) {
        return this.contains(key);
    }

    public boolean contains(Object key) {
        return this.loadEntry(key) != null;
    }

    private <P> Flowable<P> publish(IntSet segments, Function<ResultSet, Flowable<P>> function) {
        return Flowable.using(() -> {
            String sql = segments != null ? this.tableManager.getLoadNonExpiredRowsSqlForSegments(segments.size()) : this.tableManager.getLoadNonExpiredAllRowsSql();
            if (log.isTraceEnabled()) {
                log.tracef("Running sql %s", sql);
            }
            return new FlowableConnection(this.connectionFactory, sql);
        }, fc -> {
            PreparedStatement ps = fc.statement;
            int offset = 1;
            ps.setLong(offset, this.timeService.wallClockTime());
            if (segments != null) {
                PrimitiveIterator.OfInt segIter = segments.iterator();
                while (segIter.hasNext()) {
                    ps.setInt(++offset, segIter.nextInt());
                }
            }
            ps.setFetchSize(this.tableManager.getFetchSize());
            ResultSet rs = ps.executeQuery();
            return ((Flowable)function.apply(rs)).doFinally(() -> JdbcUtil.safeClose(rs));
        }, FlowableConnection::close);
    }

    public Flowable<K> publishKeys(Predicate<? super K> filter) {
        return this.publish(null, rs -> Flowable.fromIterable(() -> new ResultSetKeyIterator((ResultSet)rs, filter)));
    }

    public Publisher<K> publishKeys(IntSet segments, Predicate<? super K> filter) {
        return this.publish(segments, rs -> Flowable.fromIterable(() -> new ResultSetKeyIterator((ResultSet)rs, filter)));
    }

    public Flowable<MarshallableEntry<K, V>> entryPublisher(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
        return this.publish(null, rs -> Flowable.fromIterable(() -> new ResultSetEntryIterator((ResultSet)rs, filter, fetchValue, fetchMetadata)));
    }

    public Publisher<MarshallableEntry<K, V>> entryPublisher(IntSet segments, Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
        return this.publish(segments, rs -> Flowable.fromIterable(() -> new ResultSetEntryIterator((ResultSet)rs, filter, fetchValue, fetchMetadata)));
    }

    public void prepareWithModifications(Transaction transaction, BatchModification batchModification) throws PersistenceException {
        try {
            Connection connection = this.getTxConnection(transaction);
            connection.setAutoCommit(false);
            boolean upsertSupported = this.tableManager.isUpsertSupported();
            try (PreparedStatement upsertBatch = upsertSupported ? connection.prepareStatement(this.tableManager.getUpsertRowSql()) : null;
                 PreparedStatement deleteBatch = connection.prepareStatement(this.tableManager.getDeleteRowSql());){
                for (MarshallableEntry entry : batchModification.getMarshallableEntries()) {
                    int segment = this.getSegment(entry);
                    if (upsertSupported) {
                        String keyStr = this.key2Str(entry.getKey());
                        this.prepareUpsertStatement(entry, keyStr, segment, upsertBatch);
                        upsertBatch.addBatch();
                        continue;
                    }
                    this.write(entry, connection, segment);
                }
                for (Object key : batchModification.getKeysToRemove()) {
                    String keyStr = this.key2Str(key);
                    deleteBatch.setString(1, keyStr);
                    deleteBatch.addBatch();
                }
                if (upsertSupported && !batchModification.getMarshallableEntries().isEmpty()) {
                    upsertBatch.executeBatch();
                }
                if (!batchModification.getKeysToRemove().isEmpty()) {
                    deleteBatch.executeUpdate();
                }
            }
        }
        catch (InterruptedException | SQLException e) {
            throw org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.prepareTxFailure(e);
        }
    }

    public void commit(Transaction tx) {
        try {
            Connection connection = this.getTxConnection(tx);
            connection.commit();
        }
        catch (SQLException e) {
            org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureTxCommit(e);
            throw new PersistenceException(String.format("Error during commit of JDBC transaction (%s)", tx), (Throwable)e);
        }
        finally {
            this.destroyTxConnection(tx);
        }
    }

    public void rollback(Transaction tx) {
        try {
            Connection connection = this.getTxConnection(tx);
            connection.rollback();
        }
        catch (SQLException e) {
            org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureTxRollback(e);
            throw new PersistenceException(String.format("Error during rollback of JDBC transaction (%s)", tx), (Throwable)e);
        }
        finally {
            this.destroyTxConnection(tx);
        }
    }

    private Connection getTxConnection(Transaction tx) {
        Connection connection = this.transactionConnectionMap.get(tx);
        if (connection == null) {
            connection = this.connectionFactory.getConnection();
            this.transactionConnectionMap.put(tx, connection);
        }
        return connection;
    }

    private void destroyTxConnection(Transaction tx) {
        Connection connection = this.transactionConnectionMap.remove(tx);
        if (connection != null) {
            this.connectionFactory.releaseConnection(connection);
        }
    }

    public int size() {
        int n;
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = this.connectionFactory.getConnection();
            String sql = this.tableManager.getCountNonExpiredRowsSql();
            ps = conn.prepareStatement(sql);
            ps.setLong(1, this.timeService.wallClockTime());
            rs = ps.executeQuery();
            rs.next();
            n = rs.getInt(1);
        }
        catch (SQLException e) {
            try {
                org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureIntegratingState(e);
                throw new PersistenceException("SQL failure while integrating state into store", (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
        return n;
    }

    public int size(IntSet segments) {
        int n;
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = this.connectionFactory.getConnection();
            String sql = this.tableManager.getCountNonExpiredRowsSqlForSegments(segments.size());
            ps = conn.prepareStatement(sql);
            int offset = 1;
            ps.setLong(offset, this.timeService.wallClockTime());
            PrimitiveIterator.OfInt segIter = segments.iterator();
            while (segIter.hasNext()) {
                ps.setInt(++offset, segIter.nextInt());
            }
            rs = ps.executeQuery();
            rs.next();
            n = rs.getInt(1);
        }
        catch (SQLException e) {
            try {
                org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureIntegratingState(e);
                throw new PersistenceException("SQL failure while integrating state into store", (Throwable)e);
            }
            catch (Throwable throwable) {
                JdbcUtil.safeClose(rs);
                JdbcUtil.safeClose(ps);
                this.connectionFactory.releaseConnection(conn);
                throw throwable;
            }
        }
        JdbcUtil.safeClose(rs);
        JdbcUtil.safeClose(ps);
        this.connectionFactory.releaseConnection(conn);
        return n;
    }

    private void prepareUpsertStatement(MarshallableEntry entry, String key, int segment, PreparedStatement ps) throws SQLException {
        this.prepareStatement(entry, key, segment, ps, true);
    }

    private void prepareStatement(MarshallableEntry entry, String key, int segment, PreparedStatement ps, boolean upsert) throws SQLException {
        ByteBuffer byteBuffer = JdbcUtil.marshall(entry.getMarshalledValue(), (Marshaller)this.marshaller);
        if (upsert) {
            this.tableManager.prepareUpsertStatement(ps, key, entry.expiryTime(), segment, byteBuffer);
        } else {
            this.tableManager.prepareUpdateStatement(ps, key, entry.expiryTime(), segment, byteBuffer);
        }
    }

    private String key2Str(Object key) throws PersistenceException {
        if (!this.key2StringMapper.isSupportedType(key.getClass())) {
            throw new UnsupportedKeyTypeException(key);
        }
        String keyStr = this.key2StringMapper.getStringMapping(key);
        return this.tableManager.isStringEncodingRequired() ? this.tableManager.encodeString(keyStr) : keyStr;
    }

    public TableManager getTableManager(String cacheName) {
        if (this.tableManager == null) {
            this.tableManager = TableManagerFactory.getManager(this.ctx, this.connectionFactory, this.configuration, cacheName);
        }
        return this.tableManager;
    }

    private void enforceTwoWayMapper(String where) throws PersistenceException {
        if (!(this.key2StringMapper instanceof TwoWayKey2StringMapper)) {
            org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.invalidKey2StringMapper(where, this.key2StringMapper.getClass().getName());
            throw new PersistenceException(String.format("Invalid key to string mapper : %s", this.key2StringMapper.getClass().getName()));
        }
    }

    private class ResultSetKeyIterator
    extends AbstractIterator<K> {
        private final ResultSet rs;
        private final Predicate<? super K> filter;

        public ResultSetKeyIterator(ResultSet rs, Predicate<? super K> filter) {
            this.rs = rs;
            this.filter = filter;
        }

        protected K getNext() {
            Object key = null;
            try {
                while (key == null && this.rs.next()) {
                    String keyStr = this.rs.getString(2);
                    Object testKey = ((TwoWayKey2StringMapper)JdbcStringBasedStore.this.key2StringMapper).getKeyMapping(keyStr);
                    if (this.filter != null && !this.filter.test(testKey)) continue;
                    key = testKey;
                }
            }
            catch (SQLException e) {
                throw new CacheException((Throwable)e);
            }
            return key;
        }
    }

    private class ResultSetEntryIterator
    extends AbstractIterator<MarshallableEntry<K, V>> {
        private final ResultSet rs;
        private final Predicate<? super K> filter;
        private final boolean fetchValue;
        private final boolean fetchMetadata;

        ResultSetEntryIterator(ResultSet rs, Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
            this.rs = rs;
            this.filter = filter;
            this.fetchValue = fetchValue;
            this.fetchMetadata = fetchMetadata;
        }

        protected MarshallableEntry<K, V> getNext() {
            try {
                while (this.rs.next()) {
                    String keyStr = this.rs.getString(2);
                    Object key = ((TwoWayKey2StringMapper)JdbcStringBasedStore.this.key2StringMapper).getKeyMapping(keyStr);
                    if (this.filter != null && !this.filter.test(key)) continue;
                    if (this.fetchValue || this.fetchMetadata) {
                        InputStream inputStream = this.rs.getBinaryStream(1);
                        MarshalledValue value = (MarshalledValue)JdbcUtil.unmarshall(inputStream, (StreamAwareMarshaller)JdbcStringBasedStore.this.marshaller);
                        return JdbcStringBasedStore.this.marshalledEntryFactory.create(key, this.fetchValue ? value.getValueBytes() : null, this.fetchMetadata ? value.getMetadataBytes() : null, value.getInternalMetadataBytes(), value.getCreated(), value.getLastUsed());
                    }
                    return JdbcStringBasedStore.this.marshalledEntryFactory.create(key);
                }
            }
            catch (SQLException e) {
                throw new CacheException((Throwable)e);
            }
            return null;
        }
    }

    static class FlowableConnection {
        final boolean autoCommit;
        final ConnectionFactory factory;
        final Connection connection;
        final PreparedStatement statement;

        FlowableConnection(ConnectionFactory factory, String sql) throws SQLException {
            this.factory = factory;
            this.connection = factory.getConnection();
            this.autoCommit = this.connection.getAutoCommit();
            this.statement = this.connection.prepareStatement(sql);
            if (this.autoCommit) {
                this.connection.setAutoCommit(false);
            }
        }

        void close() {
            JdbcUtil.safeClose(this.statement);
            if (this.autoCommit) {
                try {
                    this.connection.rollback();
                }
                catch (SQLException e) {
                    org.infinispan.persistence.jdbc.logging.Log.PERSISTENCE.sqlFailureTxRollback(e);
                }
            }
            this.factory.releaseConnection(this.connection);
        }
    }
}

