/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.rocksdb;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.persistence.Store;
import org.infinispan.commons.util.Util;
import org.infinispan.executors.ExecutorAllCompletionService;
import org.infinispan.filter.KeyFilter;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.metadata.InternalMetadata;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.TaskContextImpl;
import org.infinispan.persistence.rocksdb.configuration.RocksDBStoreConfiguration;
import org.infinispan.persistence.rocksdb.logging.Log;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.persistence.spi.AdvancedCacheWriter;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.util.logging.LogFactory;
import org.rocksdb.CompressionType;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

@Store
@ConfiguredBy(value=RocksDBStoreConfiguration.class)
public class RocksDBStore<K, V>
implements AdvancedLoadWriteStore<K, V> {
    private static final Log log = LogFactory.getLog(RocksDBStore.class, Log.class);
    private RocksDBStoreConfiguration configuration;
    private BlockingQueue<ExpiryEntry> expiryEntryQueue;
    private RocksDB db;
    private RocksDB expiredDb;
    private InitializationContext ctx;
    private Semaphore semaphore;
    private WriteOptions dataWriteOptions;
    private volatile boolean stopped = true;

    @Override
    public void init(InitializationContext ctx) {
        this.configuration = (RocksDBStoreConfiguration)ctx.getConfiguration();
        this.ctx = ctx;
        this.semaphore = new Semaphore(Integer.MAX_VALUE, true);
    }

    @Override
    public void start() {
        this.expiryEntryQueue = new LinkedBlockingQueue<ExpiryEntry>(this.configuration.expiryQueueSize());
        try {
            this.db = this.openDatabase(this.getQualifiedLocation(), this.dataDbOptions());
            this.expiredDb = this.openDatabase(this.getQualifiedExpiredLocation(), this.expiredDbOptions());
            this.stopped = false;
        }
        catch (Exception e) {
            throw new CacheConfigurationException("Unable to open database", e);
        }
    }

    private String sanitizedCacheName() {
        return this.ctx.getCache().getName().replaceAll("[^a-zA-Z0-9-_\\.]", "_");
    }

    private String getQualifiedLocation() {
        return this.configuration.location() + this.sanitizedCacheName();
    }

    private String getQualifiedExpiredLocation() {
        return this.configuration.expiredLocation() + this.sanitizedCacheName();
    }

    private WriteOptions dataWriteOptions() {
        if (this.dataWriteOptions == null) {
            this.dataWriteOptions = new WriteOptions().setDisableWAL(false);
        }
        return this.dataWriteOptions;
    }

    private Options dataDbOptions() {
        Options options = new Options().setCreateIfMissing(true);
        options.setCompressionType(CompressionType.getCompressionType(this.configuration.compressionType().toString()));
        return options;
    }

    private Options expiredDbOptions() {
        return new Options().setCreateIfMissing(true);
    }

    protected RocksDB openDatabase(String location, Options options) throws IOException, RocksDBException {
        File dir = new File(location);
        dir.mkdirs();
        return RocksDB.open(options, location);
    }

    protected void destroyDatabase(String location) throws IOException {
        System.gc();
        Util.recursiveFileRemove(new File(location));
    }

    protected RocksDB reinitDatabase(String location, Options options) throws IOException, RocksDBException {
        this.destroyDatabase(location);
        return this.openDatabase(location, options);
    }

    protected void reinitAllDatabases() throws IOException, RocksDBException {
        try {
            this.semaphore.acquire(Integer.MAX_VALUE);
        }
        catch (InterruptedException e) {
            throw new PersistenceException("Cannot acquire semaphore", e);
        }
        try {
            if (this.stopped) {
                throw new PersistenceException("RocksDB is stopped");
            }
            this.db.close();
            this.expiredDb.close();
            this.db = this.reinitDatabase(this.getQualifiedLocation(), this.dataDbOptions());
            this.expiredDb = this.reinitDatabase(this.getQualifiedExpiredLocation(), this.expiredDbOptions());
        }
        finally {
            this.semaphore.release(Integer.MAX_VALUE);
        }
    }

    @Override
    public void stop() {
        try {
            this.semaphore.acquire(Integer.MAX_VALUE);
        }
        catch (InterruptedException e) {
            throw new PersistenceException("Cannot acquire semaphore", e);
        }
        try {
            this.db.close();
            this.expiredDb.close();
        }
        finally {
            this.stopped = true;
            this.semaphore.release(Integer.MAX_VALUE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clear() {
        boolean destroyDatabase;
        block26: {
            long count = 0L;
            destroyDatabase = false;
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                throw new PersistenceException("Cannot acquire semaphore", e);
            }
            try {
                if (this.stopped) {
                    throw new PersistenceException("RocksDB is stopped");
                }
                Optional<RocksIterator> optionalIterator = RocksDBStore.wrapIterator(this.db);
                if (optionalIterator.isPresent() && this.configuration.clearThreshold() <= 0) {
                    try (RocksIterator it = optionalIterator.get();){
                        it.seekToFirst();
                        while (it.isValid()) {
                            this.db.delete(it.key());
                            if (++count > (long)this.configuration.clearThreshold()) {
                                destroyDatabase = true;
                                break block26;
                            }
                            it.next();
                        }
                        break block26;
                    }
                    catch (RocksDBException e) {
                        destroyDatabase = true;
                    }
                    break block26;
                }
                destroyDatabase = true;
            }
            finally {
                this.semaphore.release();
            }
        }
        if (destroyDatabase) {
            try {
                this.reinitAllDatabases();
            }
            catch (Exception e) {
                throw new PersistenceException(e);
            }
        }
    }

    private static Optional<RocksIterator> wrapIterator(RocksDB db) {
        return Optional.of(db.newIterator(new ReadOptions().setFillCache(false)));
    }

    @Override
    public int size() {
        return PersistenceUtil.count(this, null);
    }

    @Override
    public boolean contains(Object key) {
        try {
            return this.load(key) != null;
        }
        catch (Exception e) {
            throw new PersistenceException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void process(KeyFilter keyFilter, AdvancedCacheLoader.CacheLoaderTask cacheLoaderTask, Executor executor, boolean loadValues, boolean loadMetadata) {
        block24: {
            int batchSize = 100;
            ExecutorAllCompletionService eacs = new ExecutorAllCompletionService(executor);
            TaskContextImpl taskContext = new TaskContextImpl();
            ArrayList<Entry> entries = new ArrayList<Entry>(batchSize);
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                throw new PersistenceException("Cannot acquire semaphore: CacheStore is likely stopped.", e);
            }
            try {
                if (this.stopped) {
                    throw new PersistenceException("RocksDB is stopped");
                }
                Optional<RocksIterator> optionalIterator = RocksDBStore.wrapIterator(this.db);
                if (!optionalIterator.isPresent()) break block24;
                try (RocksIterator it = optionalIterator.get();){
                    it.seekToFirst();
                    while (it.isValid()) {
                        Entry entry = new Entry(it.key(), it.value());
                        entries.add(entry);
                        if (entries.size() == batchSize) {
                            ArrayList<Entry> batch = entries;
                            entries = new ArrayList(batchSize);
                            this.submitProcessTask(cacheLoaderTask, keyFilter, eacs, taskContext, batch, loadValues, loadMetadata);
                        }
                        it.next();
                    }
                    if (!entries.isEmpty()) {
                        this.submitProcessTask(cacheLoaderTask, keyFilter, eacs, taskContext, entries, loadValues, loadMetadata);
                    }
                    eacs.waitUntilAllCompleted();
                    if (eacs.isExceptionThrown()) {
                        throw new PersistenceException("Execution exception!", eacs.getFirstException());
                    }
                }
                catch (Exception e) {
                    throw new PersistenceException(e);
                }
            }
            finally {
                this.semaphore.release();
            }
        }
    }

    private void submitProcessTask(AdvancedCacheLoader.CacheLoaderTask cacheLoaderTask, KeyFilter filter, CompletionService ecs, AdvancedCacheLoader.TaskContext taskContext, List<Entry> batch, boolean loadValues, boolean loadMetadata) {
        ecs.submit(() -> {
            try {
                long now = this.ctx.getTimeService().wallClockTime();
                for (Entry pair : batch) {
                    MarshalledEntry entry;
                    boolean isExpired;
                    if (taskContext.isStopped()) break;
                    Object key = this.unmarshall(pair.key);
                    if (filter != null && !filter.accept(key) || (isExpired = (entry = loadValues || loadMetadata ? (MarshalledEntry)this.unmarshall(pair.value) : null) != null && entry.getMetadata() != null && entry.getMetadata().isExpired(now))) continue;
                    if (!loadValues || !loadMetadata) {
                        entry = this.ctx.getMarshalledEntryFactory().newMarshalledEntry(key, loadValues ? entry.getValue() : null, loadMetadata ? entry.getMetadata() : null);
                    }
                    cacheLoaderTask.processEntry(entry, taskContext);
                }
                return null;
            }
            catch (Exception e) {
                log.errorExecutingParallelStoreTask(e);
                throw e;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean delete(Object key) {
        try {
            byte[] keyBytes = this.marshall(key);
            this.semaphore.acquire();
            try {
                if (this.stopped) {
                    throw new PersistenceException("RocksDB is stopped");
                }
                if (this.db.get(keyBytes) == null) {
                    boolean bl = false;
                    return bl;
                }
                this.db.delete(keyBytes);
                return true;
            }
            finally {
                this.semaphore.release();
            }
        }
        catch (Exception e) {
            throw new PersistenceException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void write(MarshalledEntry me) {
        try {
            byte[] marshelledKey = this.marshall(me.getKey());
            byte[] marshalledEntry = this.marshall(me);
            this.semaphore.acquire();
            try {
                if (this.stopped) {
                    throw new PersistenceException("RocksDB is stopped");
                }
                this.db.put(marshelledKey, marshalledEntry);
            }
            finally {
                this.semaphore.release();
            }
            InternalMetadata meta = me.getMetadata();
            if (meta != null && meta.expiryTime() > -1L) {
                this.addNewExpiry(me);
            }
        }
        catch (Exception e) {
            throw new PersistenceException(e);
        }
    }

    @Override
    public MarshalledEntry load(Object key) {
        try {
            byte[] marshalledEntry;
            this.semaphore.acquire();
            try {
                if (this.stopped) {
                    throw new PersistenceException("RocksDB is stopped");
                }
                marshalledEntry = this.db.get(this.marshall(key));
            }
            finally {
                this.semaphore.release();
            }
            MarshalledEntry me = (MarshalledEntry)this.unmarshall(marshalledEntry);
            if (me == null) {
                return null;
            }
            InternalMetadata meta = me.getMetadata();
            if (meta != null && meta.isExpired(this.ctx.getTimeService().wallClockTime())) {
                return null;
            }
            return me;
        }
        catch (Exception e) {
            throw new PersistenceException(e);
        }
    }

    @Override
    public void writeBatch(Iterable<MarshalledEntry<? extends K, ? extends V>> marshalledEntries) {
        try {
            int batchSize = 0;
            WriteBatch batch = new WriteBatch();
            for (MarshalledEntry<Object, Object> entry : marshalledEntries) {
                batch.put(this.marshall(entry.getKey()), this.marshall(entry));
                if (++batchSize != this.configuration.maxBatchSize()) continue;
                batchSize = 0;
                this.writeBatch(batch);
                batch = new WriteBatch();
            }
            if (batchSize != 0) {
                this.writeBatch(batch);
            }
            for (MarshalledEntry<Object, Object> entry : marshalledEntries) {
                InternalMetadata meta = entry.getMetadata();
                if (meta == null || meta.expiryTime() <= -1L) continue;
                this.addNewExpiry(entry);
            }
        }
        catch (Exception e) {
            throw new PersistenceException(e);
        }
    }

    private void writeBatch(WriteBatch batch) throws InterruptedException, RocksDBException {
        this.semaphore.acquire();
        try {
            if (this.stopped) {
                throw new PersistenceException("RocksDB is stopped");
            }
            this.db.write(this.dataWriteOptions(), batch);
        }
        finally {
            this.semaphore.release();
        }
    }

    @Override
    public void purge(Executor executor, AdvancedCacheWriter.PurgeListener purgeListener) {
        block33: {
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                throw new PersistenceException("Cannot acquire semaphore: CacheStore is likely stopped.", e);
            }
            try {
                if (this.stopped) {
                    throw new PersistenceException("RocksDB is stopped");
                }
                ArrayList entries = new ArrayList();
                this.expiryEntryQueue.drainTo(entries);
                for (ExpiryEntry entry : entries) {
                    byte[] expiryBytes = this.marshall(entry.expiry);
                    byte[] keyBytes = this.marshall(entry.key);
                    byte[] existingBytes = this.expiredDb.get(expiryBytes);
                    if (existingBytes != null) {
                        Object existing = this.unmarshall(existingBytes);
                        if (existing instanceof List) {
                            ((List)existing).add(entry.key);
                            this.expiredDb.put(expiryBytes, this.marshall(existing));
                            continue;
                        }
                        ArrayList<Object> al = new ArrayList<Object>(2);
                        al.add(existing);
                        al.add(entry.key);
                        this.expiredDb.put(expiryBytes, this.marshall(al));
                        continue;
                    }
                    this.expiredDb.put(expiryBytes, keyBytes);
                }
                ArrayList<Object> times = new ArrayList<Object>();
                ArrayList<Object> keys = new ArrayList<Object>();
                long now = this.ctx.getTimeService().wallClockTime();
                Optional<RocksIterator> optionalIterator = RocksDBStore.wrapIterator(this.expiredDb);
                if (!optionalIterator.isPresent()) break block33;
                try (RocksIterator it = optionalIterator.get();){
                    Object time;
                    it.seekToFirst();
                    while (it.isValid() && (Long)(time = (Long)this.unmarshall(it.key())) <= now) {
                        times.add(time);
                        Object object = this.unmarshall(it.value());
                        if (object instanceof List) {
                            keys.addAll((List)object);
                        } else {
                            keys.add(object);
                        }
                        it.next();
                    }
                    for (Long l : times) {
                        this.expiredDb.delete(this.marshall(l));
                    }
                    if (!keys.isEmpty()) {
                        log.debugf("purge (up to) %d entries", keys.size());
                    }
                    int count = 0;
                    for (Object e : keys) {
                        MarshalledEntry me;
                        byte[] keyBytes = this.marshall(e);
                        byte[] b = this.db.get(keyBytes);
                        if (b == null || (me = (MarshalledEntry)this.ctx.getMarshaller().objectFromByteBuffer(b)).getMetadata() == null || !me.getMetadata().isExpired(now)) continue;
                        this.db.delete(keyBytes);
                        purgeListener.entryPurged(e);
                        ++count;
                    }
                    if (count != 0) {
                        log.debugf("purged %d entries", count);
                    }
                }
                catch (Exception e) {
                    throw new PersistenceException(e);
                }
            }
            catch (PersistenceException e) {
                throw e;
            }
            catch (Exception e) {
                throw new PersistenceException(e);
            }
            finally {
                this.semaphore.release();
            }
        }
    }

    private byte[] marshall(Object entry) throws IOException, InterruptedException {
        return this.ctx.getMarshaller().objectToByteBuffer(entry);
    }

    private Object unmarshall(byte[] bytes) throws IOException, ClassNotFoundException {
        if (bytes == null) {
            return null;
        }
        return this.ctx.getMarshaller().objectFromByteBuffer(bytes);
    }

    private void addNewExpiry(MarshalledEntry entry) throws IOException {
        long expiry = entry.getMetadata().expiryTime();
        long maxIdle = entry.getMetadata().maxIdle();
        if (maxIdle > 0L) {
            expiry = maxIdle + this.ctx.getTimeService().wallClockTime();
        }
        Long at = expiry;
        Object key = entry.getKey();
        try {
            this.expiryEntryQueue.put(new ExpiryEntry(at, key));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private static final class Entry {
        final byte[] key;
        final byte[] value;

        Entry(byte[] key, byte[] value) {
            this.key = key;
            this.value = value;
        }
    }

    private static final class ExpiryEntry {
        private final Long expiry;
        private final Object key;

        private ExpiryEntry(long expiry, Object key) {
            this.expiry = expiry;
            this.key = key;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.key == null ? 0 : this.key.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            ExpiryEntry other = (ExpiryEntry)obj;
            return !(this.key == null ? other.key != null : !this.key.equals(other.key));
        }
    }
}

