/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.rocksdb;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.io.ByteBuffer;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.AbstractIterator;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.marshall.persistence.impl.MarshallableEntryImpl;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.rocksdb.PersistenceContextInitializerImpl;
import org.infinispan.persistence.rocksdb.configuration.RocksDBStoreConfiguration;
import org.infinispan.persistence.rocksdb.logging.Log;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.spi.MarshalledValue;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

@ConfiguredBy(value=RocksDBStoreConfiguration.class)
public class RocksDBStore<K, V>
implements NonBlockingStore<K, V> {
    private static final Log log = (Log)LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
    private static final boolean trace = log.isTraceEnabled();
    static final String DATABASE_PROPERTY_NAME_WITH_SUFFIX = "database.";
    static final String COLUMN_FAMILY_PROPERTY_NAME_WITH_SUFFIX = "data.";
    protected RocksDBStoreConfiguration configuration;
    private RocksDB db;
    private RocksDB expiredDb;
    private InitializationContext ctx;
    private TimeService timeService;
    private WriteOptions dataWriteOptions;
    private RocksDBHandler handler;
    private Properties databaseProperties;
    private Properties columnFamilyProperties;
    private Marshaller marshaller;
    private KeyPartitioner keyPartitioner;
    private MarshallableEntryFactory<K, V> entryFactory;
    private BlockingManager blockingManager;

    public CompletionStage<Void> start(InitializationContext ctx) {
        this.configuration = (RocksDBStoreConfiguration)ctx.getConfiguration();
        this.ctx = ctx;
        this.timeService = ctx.getTimeService();
        this.marshaller = ctx.getPersistenceMarshaller();
        this.entryFactory = ctx.getMarshallableEntryFactory();
        this.blockingManager = ctx.getBlockingManager();
        this.keyPartitioner = ctx.getKeyPartitioner();
        ctx.getPersistenceMarshaller().register((SerializationContextInitializer)new PersistenceContextInitializerImpl());
        AdvancedCache cache = ctx.getCache().getAdvancedCache();
        this.handler = this.configuration.segmented() ? new SegmentedRocksDBHandler(cache.getCacheConfiguration().clustering().hash().numSegments()) : new NonSegmentedRocksDBHandler(this.keyPartitioner);
        Properties allProperties = this.configuration.properties();
        for (Map.Entry<Object, Object> entry : allProperties.entrySet()) {
            String key = entry.getKey().toString();
            if (key.startsWith(DATABASE_PROPERTY_NAME_WITH_SUFFIX)) {
                if (this.databaseProperties == null) {
                    this.databaseProperties = new Properties();
                }
                this.databaseProperties.setProperty(key.substring(DATABASE_PROPERTY_NAME_WITH_SUFFIX.length()), entry.getValue().toString());
                continue;
            }
            if (!key.startsWith(COLUMN_FAMILY_PROPERTY_NAME_WITH_SUFFIX)) continue;
            if (this.columnFamilyProperties == null) {
                this.columnFamilyProperties = new Properties();
            }
            this.columnFamilyProperties.setProperty(key.substring(COLUMN_FAMILY_PROPERTY_NAME_WITH_SUFFIX.length()), entry.getValue().toString());
        }
        return this.blockingManager.runBlocking(() -> {
            try {
                this.db = this.handler.open(this.getLocation(), this.dataDbOptions());
                this.expiredDb = this.openDatabase(this.getExpirationLocation(), this.expiredDbOptions());
            }
            catch (Exception e) {
                throw new CacheConfigurationException("Unable to open database", (Throwable)e);
            }
        }, (Object)"rocksdb-open");
    }

    private Path getLocation() {
        return PersistenceUtil.getQualifiedLocation((GlobalConfiguration)this.ctx.getGlobalConfiguration(), (String)this.configuration.location(), (String)this.ctx.getCache().getName(), (String)"data");
    }

    private Path getExpirationLocation() {
        return PersistenceUtil.getQualifiedLocation((GlobalConfiguration)this.ctx.getGlobalConfiguration(), (String)this.configuration.expiredLocation(), (String)this.ctx.getCache().getName(), (String)"expired");
    }

    private WriteOptions dataWriteOptions() {
        if (this.dataWriteOptions == null) {
            this.dataWriteOptions = new WriteOptions().setDisableWAL(false);
        }
        return this.dataWriteOptions;
    }

    protected DBOptions dataDbOptions() {
        DBOptions dbOptions;
        if (this.databaseProperties != null) {
            dbOptions = DBOptions.getDBOptionsFromProps((Properties)this.databaseProperties);
            if (dbOptions == null) {
                throw log.rocksDBUnknownPropertiesSupplied(this.databaseProperties.toString());
            }
        } else {
            dbOptions = new DBOptions();
        }
        return dbOptions.setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
    }

    protected Options expiredDbOptions() {
        return new Options().setCreateIfMissing(true).setComparator(BuiltinComparator.BYTEWISE_COMPARATOR);
    }

    protected RocksDB openDatabase(Path location, Options options) throws RocksDBException {
        File dir = location.toFile();
        dir.mkdirs();
        return RocksDB.open((Options)options, (String)location.toString());
    }

    public CompletionStage<Void> stop() {
        return this.blockingManager.runBlocking(() -> {
            this.handler.close();
            this.expiredDb.close();
        }, (Object)"rocksdb-stop");
    }

    public Set<NonBlockingStore.Characteristic> characteristics() {
        return EnumSet.of(NonBlockingStore.Characteristic.BULK_READ, NonBlockingStore.Characteristic.EXPIRATION, NonBlockingStore.Characteristic.SEGMENTABLE);
    }

    public CompletionStage<Boolean> isAvailable() {
        return this.blockingManager.supplyBlocking(() -> this.getLocation().toFile().exists() && this.getExpirationLocation().toFile().exists(), (Object)"rocksdb-available");
    }

    public CompletionStage<Void> clear() {
        return this.handler.clear();
    }

    public CompletionStage<Long> size(IntSet segments) {
        return this.handler.size(segments);
    }

    public CompletionStage<Long> approximateSize(IntSet segments) {
        return this.handler.approximateSize(segments);
    }

    public CompletionStage<Boolean> containsKey(int segment, Object key) {
        return this.load(segment, key).thenApply(Objects::nonNull);
    }

    public Publisher<K> publishKeys(IntSet segments, Predicate<? super K> filter) {
        return Flowable.fromPublisher(this.handler.publishEntries(segments, filter, false)).map(MarshallableEntry::getKey);
    }

    public Publisher<MarshallableEntry<K, V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues) {
        return this.handler.publishEntries(segments, filter, includeValues);
    }

    public CompletionStage<Boolean> delete(int segment, Object key) {
        return this.handler.delete(segment, key);
    }

    public CompletionStage<Void> write(int segment, MarshallableEntry<? extends K, ? extends V> entry) {
        return this.handler.write(segment, entry);
    }

    public CompletionStage<MarshallableEntry<K, V>> load(int segment, Object key) {
        return this.handler.load(segment, key);
    }

    public CompletionStage<Void> batch(int publisherCount, Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) {
        WriteBatch batch = new WriteBatch();
        HashSet expirableEntries = new HashSet();
        Flowable.fromPublisher(removePublisher).subscribe(sp -> {
            ColumnFamilyHandle handle = this.handler.getHandle(sp.getSegment());
            Flowable.fromPublisher((Publisher)sp).subscribe(removed -> batch.delete(handle, this.marshall(removed)));
        });
        Flowable.fromPublisher(writePublisher).subscribe(sp -> {
            ColumnFamilyHandle handle = this.handler.getHandle(sp.getSegment());
            Flowable.fromPublisher((Publisher)sp).subscribe(me -> {
                batch.put(handle, this.marshall(me.getKey()), this.marshall(me.getMarshalledValue()));
                if (me.expiryTime() > -1L) {
                    expirableEntries.add(me);
                }
            });
        });
        if (batch.count() <= 0) {
            batch.close();
            return CompletableFutures.completedNull();
        }
        return this.blockingManager.runBlocking(() -> {
            try {
                this.db.write(this.dataWriteOptions(), batch);
                for (MarshallableEntry me : expirableEntries) {
                    this.addNewExpiry(me);
                }
            }
            catch (RocksDBException e) {
                throw new PersistenceException((Throwable)e);
            }
        }, (Object)"rocksdb-batch").whenComplete((ignore, t) -> batch.close());
    }

    public Publisher<MarshallableEntry<K, V>> purgeExpired() {
        Publisher purgedBatches = this.blockingManager.blockingPublisher((Publisher)Flowable.defer(() -> {
            long now = this.timeService.wallClockTime();
            return this.actualPurgeExpired(now).buffer(16);
        }));
        return Flowable.fromPublisher((Publisher)purgedBatches).concatMap(Flowable::fromIterable);
    }

    private Flowable<MarshallableEntry<K, V>> actualPurgeExpired(final long now) {
        Flowable expiredFlowable = Flowable.using(() -> {
            ReadOptions readOptions = new ReadOptions().setFillCache(false);
            return new AbstractMap.SimpleImmutableEntry<ReadOptions, RocksIterator>(readOptions, this.expiredDb.newIterator(readOptions));
        }, entry -> {
            if (entry.getValue() == null) {
                return Flowable.empty();
            }
            final RocksIterator iterator = (RocksIterator)entry.getValue();
            iterator.seekToFirst();
            return Flowable.fromIterable(() -> new AbstractIterator<byte[]>(){

                protected byte[] getNext() {
                    if (!iterator.isValid()) {
                        return null;
                    }
                    byte[] keyBytes = iterator.key();
                    Long time = (Long)RocksDBStore.this.unmarshall(keyBytes);
                    if (time > now) {
                        return null;
                    }
                    try {
                        RocksDBStore.this.expiredDb.delete(keyBytes);
                    }
                    catch (RocksDBException e) {
                        throw new PersistenceException((Throwable)e);
                    }
                    byte[] value = iterator.value();
                    iterator.next();
                    return value;
                }
            });
        }, entry -> {
            ((ReadOptions)entry.getKey()).close();
            RocksIterator rocksIterator = (RocksIterator)entry.getValue();
            if (rocksIterator != null) {
                rocksIterator.close();
            }
        });
        Flowable expiredEntryFlowable = expiredFlowable.flatMap(expiredBytes -> {
            Object bucketKey = this.unmarshall((byte[])expiredBytes);
            if (bucketKey instanceof ExpiryBucket) {
                return Flowable.fromIterable(((ExpiryBucket)bucketKey).entries).flatMapMaybe(marshalledKey -> {
                    ColumnFamilyHandle columnFamilyHandle = this.handler.getHandleForMarshalledKey((byte[])marshalledKey);
                    MarshalledValue mv = this.handlePossiblyExpiredKey(columnFamilyHandle, (byte[])marshalledKey, now);
                    return mv == null ? Maybe.empty() : Maybe.just((Object)this.entryFactory.create(this.unmarshall((byte[])marshalledKey), mv));
                });
            }
            ColumnFamilyHandle columnFamilyHandle = this.handler.getHandle(bucketKey);
            MarshalledValue mv = this.handlePossiblyExpiredKey(columnFamilyHandle, this.marshall(bucketKey), now);
            return mv == null ? Flowable.empty() : Flowable.just((Object)this.entryFactory.create(bucketKey, mv));
        });
        if (trace) {
            UnicastProcessor mirrorEntries = UnicastProcessor.create();
            expiredEntryFlowable = expiredEntryFlowable.doOnEach((Subscriber)mirrorEntries).doOnSubscribe(subscription -> log.tracef("Purging entries from RocksDBStore", new Object[0]));
            mirrorEntries.count().subscribe(count -> log.tracef("Purged %d entries from RocksDBStore", new Object[0]));
        }
        return expiredEntryFlowable;
    }

    private MarshalledValue handlePossiblyExpiredKey(ColumnFamilyHandle columnFamilyHandle, byte[] marshalledKey, long now) throws RocksDBException {
        Metadata metadata;
        byte[] valueBytes = this.db.get(columnFamilyHandle, marshalledKey);
        if (valueBytes == null) {
            return null;
        }
        MarshalledValue mv = (MarshalledValue)this.unmarshall(valueBytes);
        if (mv != null && MarshallableEntryImpl.isExpired((Metadata)(metadata = (Metadata)this.unmarshall(MarshallUtil.toByteArray((ByteBuffer)mv.getMetadataBytes()))), (long)now, (long)mv.getCreated(), (long)mv.getLastUsed())) {
            this.db.delete(columnFamilyHandle, marshalledKey);
            return mv;
        }
        return null;
    }

    public CompletionStage<Void> addSegments(IntSet segments) {
        return this.handler.addSegments(segments);
    }

    public CompletionStage<Void> removeSegments(IntSet segments) {
        return this.handler.removeSegments(segments);
    }

    private byte[] marshall(Object entry) {
        try {
            return this.marshaller.objectToByteBuffer(entry);
        }
        catch (IOException e) {
            throw new PersistenceException((Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PersistenceException((Throwable)e);
        }
    }

    private <E> E unmarshall(byte[] bytes) {
        if (bytes == null) {
            return null;
        }
        try {
            return (E)this.marshaller.objectFromByteBuffer(bytes);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new PersistenceException((Throwable)e);
        }
    }

    private MarshallableEntry<K, V> unmarshallEntry(Object key, byte[] valueBytes) {
        MarshalledValue value = (MarshalledValue)this.unmarshall(valueBytes);
        if (value == null) {
            return null;
        }
        return this.entryFactory.create(key, value.getValueBytes(), value.getMetadataBytes(), value.getInternalMetadataBytes(), value.getCreated(), value.getLastUsed());
    }

    private void addNewExpiry(MarshallableEntry entry) throws RocksDBException {
        long expiry = entry.expiryTime();
        long maxIdle = entry.getMetadata().maxIdle();
        if (maxIdle > 0L) {
            expiry = maxIdle + this.ctx.getTimeService().wallClockTime();
        }
        byte[] keyBytes = entry.getKeyBytes().copy().getBuf();
        this.putExpireDbData(new ExpiryEntry(expiry, keyBytes));
    }

    private void putExpireDbData(ExpiryEntry entry) throws RocksDBException {
        byte[] expiryBytes = this.marshall(entry.expiry);
        byte[] existingBytes = this.expiredDb.get(expiryBytes);
        if (existingBytes != null) {
            Object existing = this.unmarshall(existingBytes);
            if (existing instanceof ExpiryBucket) {
                ((ExpiryBucket)existing).entries.add(entry.keyBytes);
                this.expiredDb.put(expiryBytes, this.marshall(existing));
            } else {
                ExpiryBucket bucket = new ExpiryBucket(existingBytes, entry.keyBytes);
                this.expiredDb.put(expiryBytes, this.marshall(bucket));
            }
        } else {
            this.expiredDb.put(expiryBytes, entry.keyBytes);
        }
    }

    private class SegmentedRocksDBHandler
    extends RocksDBHandler {
        private final AtomicReferenceArray<ColumnFamilyHandle> handles;

        private SegmentedRocksDBHandler(int segmentCount) {
            this.handles = new AtomicReferenceArray(segmentCount);
        }

        byte[] byteArrayFromInt(int val) {
            return new byte[]{(byte)(val >>> 24), (byte)(val >>> 16), (byte)(val >>> 8), (byte)val};
        }

        @Override
        ColumnFamilyHandle getHandle(int segment) {
            return this.handles.get(segment);
        }

        @Override
        ColumnFamilyHandle getHandle(Object key) {
            return this.handles.get(RocksDBStore.this.keyPartitioner.getSegment(key));
        }

        @Override
        ColumnFamilyHandle getHandleForMarshalledKey(byte[] marshalledKey) {
            return this.getHandle(RocksDBStore.this.unmarshall(marshalledKey));
        }

        @Override
        RocksDB open(Path location, DBOptions options) throws RocksDBException {
            File dir = location.toFile();
            dir.mkdirs();
            int segmentCount = this.handles.length();
            ArrayList<ColumnFamilyDescriptor> descriptors = new ArrayList<ColumnFamilyDescriptor>(segmentCount + 1);
            ArrayList outHandles = new ArrayList(segmentCount + 1);
            descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, new ColumnFamilyOptions()));
            for (int i = 0; i < segmentCount; ++i) {
                descriptors.add(this.newDescriptor(this.byteArrayFromInt(i)));
            }
            RocksDB rocksDB = RocksDB.open((DBOptions)options, (String)location.toString(), descriptors, outHandles);
            for (int i = 0; i < segmentCount; ++i) {
                this.handles.set(i, (ColumnFamilyHandle)outHandles.get(i + 1));
            }
            return rocksDB;
        }

        @Override
        CompletionStage<Void> clear() {
            return RocksDBStore.this.blockingManager.runBlocking(() -> {
                for (int i = 0; i < this.handles.length(); ++i) {
                    if (this.clearForSegment(i)) continue;
                    this.recreateColumnFamily(i);
                }
            }, (Object)"rocksdb-clear");
        }

        /*
         * Exception decompiling
         */
        private boolean clearForSegment(int segment) {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [3[TRYBLOCK]], but top level block is 19[WHILELOOP]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        @Override
        void close() {
            for (int i = 0; i < this.handles.length(); ++i) {
                ColumnFamilyHandle handle = this.handles.getAndSet(i, null);
                if (handle == null) continue;
                handle.close();
            }
            RocksDBStore.this.db.close();
        }

        private void recreateColumnFamily(int segment) {
            ColumnFamilyHandle handle = this.handles.get(segment);
            if (handle != null) {
                try {
                    RocksDBStore.this.db.dropColumnFamily(handle);
                    handle = RocksDBStore.this.db.createColumnFamily(this.newDescriptor(this.byteArrayFromInt(segment)));
                    this.handles.set(segment, handle);
                }
                catch (RocksDBException e) {
                    throw new PersistenceException((Throwable)e);
                }
            }
        }

        @Override
        Publisher<MarshallableEntry<K, V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean fetchValue) {
            Function function = it -> Flowable.fromIterable(() -> {
                long now = RocksDBStore.this.timeService.wallClockTime();
                return new RocksEntryIterator((RocksIterator)it, filter, now);
            });
            return this.handleIteratorFunction(function, segments);
        }

        @Override
        CompletionStage<Long> approximateSize(IntSet segments) {
            return RocksDBStore.this.blockingManager.subscribeBlockingCollector((Publisher)Flowable.fromIterable((Iterable)segments), Collectors.summingLong(segment -> {
                ColumnFamilyHandle handle = this.getHandle(segment);
                try {
                    return Long.parseLong(RocksDBStore.this.db.getProperty(handle, "rocksdb.estimate-num-keys"));
                }
                catch (RocksDBException e) {
                    throw new PersistenceException((Throwable)e);
                }
            }), (Object)"rocksdb-approximateSize");
        }

        <R> Publisher<R> handleIteratorFunction(Function<RocksIterator, Flowable<R>> function, IntSet segments) {
            if (segments != null && segments.size() == 1) {
                return this.publish(segments.iterator().nextInt(), function);
            }
            IntSet segmentsToUse = segments == null ? IntSets.immutableRangeSet((int)this.handles.length()) : segments;
            return Flowable.fromStream(segmentsToUse.intStream().mapToObj(i -> this.publish(i, function))).concatMap(RxJavaInterop.identityFunction());
        }

        @Override
        RocksIterator wrapIterator(RocksDB db, ReadOptions readOptions, int segment) {
            ColumnFamilyHandle handle = this.handles.get(segment);
            if (handle != null) {
                return db.newIterator(handle, readOptions);
            }
            return null;
        }

        @Override
        CompletionStage<Void> addSegments(IntSet segments) {
            Flowable segmentFlowable = Flowable.fromIterable((Iterable)segments).filter(segment -> this.handles.get((int)segment) == null);
            return RocksDBStore.this.blockingManager.subscribeBlockingConsumer((Publisher)segmentFlowable, segment -> {
                if (trace) {
                    log.tracef("Creating column family for segment %d", segment);
                }
                byte[] cfName = this.byteArrayFromInt((int)segment);
                try {
                    ColumnFamilyHandle handle = RocksDBStore.this.db.createColumnFamily(this.newDescriptor(cfName));
                    this.handles.set((int)segment, handle);
                }
                catch (RocksDBException e) {
                    throw new PersistenceException((Throwable)e);
                }
            }, (Object)"testng-addSegments");
        }

        @Override
        CompletionStage<Void> removeSegments(IntSet segments) {
            Flowable handleFlowable = Flowable.fromIterable((Iterable)segments).map(segment -> {
                ColumnFamilyHandle cf = this.handles.getAndSet((int)segment, (ColumnFamilyHandle)null);
                return cf != null ? cf : this;
            }).ofType(ColumnFamilyHandle.class);
            return RocksDBStore.this.blockingManager.subscribeBlockingConsumer((Publisher)handleFlowable, handle -> {
                if (trace) {
                    log.tracef("Dropping column family %s", handle);
                }
                try {
                    RocksDBStore.this.db.dropColumnFamily(handle);
                }
                catch (RocksDBException e) {
                    throw new PersistenceException((Throwable)e);
                }
                handle.close();
            }, (Object)"testng-removeSegments");
        }
    }

    private final class NonSegmentedRocksDBHandler
    extends RocksDBHandler {
        private final KeyPartitioner keyPartitioner;
        private ColumnFamilyHandle defaultColumnFamilyHandle;

        private NonSegmentedRocksDBHandler(KeyPartitioner keyPartitioner) {
            this.keyPartitioner = keyPartitioner;
        }

        @Override
        ColumnFamilyHandle getHandle(int segment) {
            return this.defaultColumnFamilyHandle;
        }

        @Override
        ColumnFamilyHandle getHandle(Object key) {
            return this.defaultColumnFamilyHandle;
        }

        @Override
        ColumnFamilyHandle getHandleForMarshalledKey(byte[] marshalledKey) {
            return this.defaultColumnFamilyHandle;
        }

        @Override
        RocksDB open(Path location, DBOptions options) throws RocksDBException {
            File dir = location.toFile();
            dir.mkdirs();
            ArrayList handles = new ArrayList(1);
            RocksDB rocksDB = RocksDB.open((DBOptions)options, (String)location.toString(), Collections.singletonList(this.newDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)), handles);
            this.defaultColumnFamilyHandle = (ColumnFamilyHandle)handles.get(0);
            return rocksDB;
        }

        @Override
        CompletionStage<Void> clear() {
            return this.clear(null);
        }

        CompletionStage<Void> clear(IntSet segments) {
            return RocksDBStore.this.blockingManager.runBlocking(() -> {
                boolean destroyDatabase;
                block26: {
                    long count = 0L;
                    destroyDatabase = false;
                    try (ReadOptions readOptions = new ReadOptions().setFillCache(false);){
                        RocksIterator optionalIterator = this.wrapIterator(RocksDBStore.this.db, readOptions, -1);
                        if (optionalIterator != null && (RocksDBStore.this.configuration.clearThreshold() > 0 || segments == null)) {
                            try (RocksIterator it = optionalIterator;){
                                it.seekToFirst();
                                while (it.isValid()) {
                                    byte[] keyBytes = it.key();
                                    if (segments != null) {
                                        Object key = RocksDBStore.this.unmarshall(keyBytes);
                                        int segment = this.keyPartitioner.getSegment(key);
                                        if (segments.contains(segment)) {
                                            RocksDBStore.this.db.delete(this.defaultColumnFamilyHandle, keyBytes);
                                        }
                                    } else {
                                        RocksDBStore.this.db.delete(this.defaultColumnFamilyHandle, keyBytes);
                                        if (++count > (long)RocksDBStore.this.configuration.clearThreshold()) {
                                            destroyDatabase = true;
                                            break block26;
                                        }
                                    }
                                    it.next();
                                }
                                break block26;
                            }
                            catch (RocksDBException e) {
                                if (segments != null) {
                                    throw e;
                                }
                                destroyDatabase = true;
                            }
                            break block26;
                        }
                        destroyDatabase = true;
                    }
                    catch (Exception e) {
                        throw new PersistenceException((Throwable)e);
                    }
                }
                if (destroyDatabase) {
                    try {
                        this.reinitAllDatabases();
                    }
                    catch (Exception e) {
                        throw new PersistenceException((Throwable)e);
                    }
                }
            }, (Object)"rocksdb-clear");
        }

        @Override
        void close() {
            this.defaultColumnFamilyHandle.close();
            RocksDBStore.this.db.close();
        }

        protected void reinitAllDatabases() throws RocksDBException {
            RocksDBStore.this.db.close();
            RocksDBStore.this.expiredDb.close();
            if (System.getProperty("os.name").startsWith("Windows")) {
                System.gc();
            }
            Path dataLocation = RocksDBStore.this.getLocation();
            Util.recursiveFileRemove((File)dataLocation.toFile());
            RocksDBStore.this.db = this.open(RocksDBStore.this.getLocation(), RocksDBStore.this.dataDbOptions());
            Path expirationLocation = RocksDBStore.this.getExpirationLocation();
            Util.recursiveFileRemove((File)expirationLocation.toFile());
            RocksDBStore.this.expiredDb = RocksDBStore.this.openDatabase(expirationLocation, RocksDBStore.this.expiredDbOptions());
        }

        @Override
        protected RocksIterator wrapIterator(RocksDB db, ReadOptions readOptions, int segment) {
            return db.newIterator(this.defaultColumnFamilyHandle, readOptions);
        }

        @Override
        Publisher<MarshallableEntry<K, V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean fetchValue) {
            Predicate combinedFilter = org.infinispan.persistence.internal.PersistenceUtil.combinePredicate((IntSet)segments, (KeyPartitioner)this.keyPartitioner, filter);
            return this.publish(-1, it -> Flowable.fromIterable(() -> {
                long now = RocksDBStore.this.timeService.wallClockTime();
                return new RocksEntryIterator((RocksIterator)it, combinedFilter, now);
            }));
        }

        @Override
        CompletionStage<Long> approximateSize(IntSet segments) {
            return this.size(segments);
        }

        @Override
        CompletionStage<Void> addSegments(IntSet segments) {
            return CompletableFutures.completedNull();
        }

        @Override
        CompletionStage<Void> removeSegments(IntSet segments) {
            return this.clear(segments);
        }
    }

    private abstract class RocksDBHandler {
        private RocksDBHandler() {
        }

        abstract RocksDB open(Path var1, DBOptions var2) throws RocksDBException;

        abstract void close();

        abstract ColumnFamilyHandle getHandle(int var1);

        abstract ColumnFamilyHandle getHandle(Object var1);

        abstract ColumnFamilyHandle getHandleForMarshalledKey(byte[] var1);

        ColumnFamilyDescriptor newDescriptor(byte[] name) {
            ColumnFamilyOptions columnFamilyOptions;
            if (RocksDBStore.this.columnFamilyProperties != null) {
                columnFamilyOptions = ColumnFamilyOptions.getColumnFamilyOptionsFromProps((Properties)RocksDBStore.this.columnFamilyProperties);
                if (columnFamilyOptions == null) {
                    throw log.rocksDBUnknownPropertiesSupplied(RocksDBStore.this.columnFamilyProperties.toString());
                }
            } else {
                columnFamilyOptions = new ColumnFamilyOptions();
            }
            return new ColumnFamilyDescriptor(name, columnFamilyOptions.setCompressionType(CompressionType.getCompressionType((String)RocksDBStore.this.configuration.compressionType().toString())));
        }

        CompletionStage<MarshallableEntry<K, V>> load(int segment, Object key) {
            ColumnFamilyHandle handle = this.getHandle(segment);
            if (handle == null) {
                log.trace("Ignoring load as handle is not currently configured");
                return CompletableFutures.completedNull();
            }
            try {
                CompletionStage entryByteStage = RocksDBStore.this.blockingManager.supplyBlocking(() -> {
                    try {
                        return RocksDBStore.this.db.get(handle, RocksDBStore.this.marshall(key));
                    }
                    catch (RocksDBException e) {
                        throw new CompletionException(e);
                    }
                }, (Object)"rocksdb-load");
                return entryByteStage.thenApply(entryBytes -> {
                    MarshallableEntry me = RocksDBStore.this.unmarshallEntry(key, entryBytes);
                    if (me == null || me.isExpired(RocksDBStore.this.timeService.wallClockTime())) {
                        return null;
                    }
                    return me;
                });
            }
            catch (Exception e) {
                throw new PersistenceException((Throwable)e);
            }
        }

        CompletionStage<Void> write(int segment, MarshallableEntry<? extends K, ? extends V> me) {
            ColumnFamilyHandle handle = this.getHandle(segment);
            if (handle == null) {
                log.trace("Ignoring write as handle is not currently configured");
                return CompletableFutures.completedNull();
            }
            try {
                byte[] marshalledKey = MarshallUtil.toByteArray((ByteBuffer)me.getKeyBytes());
                byte[] marshalledValue = RocksDBStore.this.marshall(me.getMarshalledValue());
                return RocksDBStore.this.blockingManager.runBlocking(() -> {
                    try {
                        RocksDBStore.this.db.put(handle, marshalledKey, marshalledValue);
                        if (me.expiryTime() > -1L) {
                            RocksDBStore.this.addNewExpiry(me);
                        }
                    }
                    catch (RocksDBException e) {
                        throw new PersistenceException((Throwable)e);
                    }
                }, (Object)"rocksdb-write");
            }
            catch (Exception e) {
                throw new PersistenceException((Throwable)e);
            }
        }

        CompletionStage<Boolean> delete(int segment, Object key) {
            try {
                byte[] keyBytes = RocksDBStore.this.marshall(key);
                ColumnFamilyHandle handle = this.getHandle(segment);
                return RocksDBStore.this.blockingManager.supplyBlocking(() -> {
                    try {
                        if (RocksDBStore.this.db.get(handle, keyBytes) == null) {
                            return Boolean.FALSE;
                        }
                        RocksDBStore.this.db.delete(handle, keyBytes);
                        return Boolean.TRUE;
                    }
                    catch (RocksDBException e) {
                        throw new PersistenceException((Throwable)e);
                    }
                }, (Object)"rocksdb-delete");
            }
            catch (Exception e) {
                throw new PersistenceException((Throwable)e);
            }
        }

        abstract CompletionStage<Void> clear();

        abstract Publisher<MarshallableEntry<K, V>> publishEntries(IntSet var1, Predicate<? super K> var2, boolean var3);

        CompletionStage<Long> size(IntSet segments) {
            return Flowable.fromPublisher(RocksDBStore.this.publishKeys(segments, null)).count().toCompletionStage();
        }

        abstract CompletionStage<Long> approximateSize(IntSet var1);

        <P> Publisher<P> publish(int segment, Function<RocksIterator, Flowable<P>> function) {
            ReadOptions readOptions = new ReadOptions().setFillCache(false);
            return RocksDBStore.this.blockingManager.blockingPublisher((Publisher)Flowable.using(() -> this.wrapIterator(RocksDBStore.this.db, readOptions, segment), iterator -> {
                if (iterator == null) {
                    return Flowable.empty();
                }
                iterator.seekToFirst();
                return (Publisher)function.apply((RocksIterator)iterator);
            }, iterator -> {
                if (iterator != null) {
                    iterator.close();
                }
                readOptions.close();
            }));
        }

        abstract RocksIterator wrapIterator(RocksDB var1, ReadOptions var2, int var3);

        abstract CompletionStage<Void> addSegments(IntSet var1);

        abstract CompletionStage<Void> removeSegments(IntSet var1);
    }

    private class RocksEntryIterator
    extends AbstractIterator<MarshallableEntry<K, V>> {
        private final RocksIterator it;
        private final Predicate<? super K> filter;
        private final long now;

        RocksEntryIterator(RocksIterator it, Predicate<? super K> filter, long now) {
            this.it = it;
            this.filter = filter;
            this.now = now;
        }

        protected MarshallableEntry<K, V> getNext() {
            MarshallableEntry entry = null;
            while (entry == null && this.it.isValid()) {
                MarshallableEntry me;
                Object key = RocksDBStore.this.unmarshall(this.it.key());
                if ((this.filter == null || this.filter.test(key)) && (me = RocksDBStore.this.unmarshallEntry(key, this.it.value())) != null && !me.isExpired(this.now)) {
                    entry = me;
                }
                this.it.next();
            }
            return entry;
        }
    }

    private static final class ExpiryEntry {
        final long expiry;
        final byte[] keyBytes;

        ExpiryEntry(long expiry, byte[] keyBytes) {
            this.expiry = expiry;
            this.keyBytes = keyBytes;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ExpiryEntry that = (ExpiryEntry)o;
            return this.expiry == that.expiry && Arrays.equals(this.keyBytes, that.keyBytes);
        }

        public int hashCode() {
            int result = Objects.hash(this.expiry);
            result = 31 * result + Arrays.hashCode(this.keyBytes);
            return result;
        }
    }

    @ProtoTypeId(value=5100)
    static final class ExpiryBucket {
        @ProtoField(number=1, collectionImplementation=ArrayList.class)
        List<byte[]> entries;

        ExpiryBucket() {
        }

        ExpiryBucket(byte[] existingKey, byte[] newKey) {
            this.entries = new ArrayList<byte[]>(2);
            this.entries.add(existingKey);
            this.entries.add(newKey);
        }
    }
}

