/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hawkular.metrics.scheduler.impl.Lock;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import rx.Observable;

class LockManager {
    private static Logger logger = Logger.getLogger(LockManager.class);
    public static final long LOCK_RENEWAL_RATE = 10L;
    private RxSession session;
    private PreparedStatement acquireLock;
    private PreparedStatement releaseLock;
    private PreparedStatement renewLock;
    private PreparedStatement getTTL;
    private ScheduledExecutorService locksExecutor;
    private Map<String, Lock> activeLocks;
    private ReentrantReadWriteLock activeLocksLock;

    public LockManager(RxSession session) {
        this.session = session;
        this.acquireLock = session.getSession().prepare("UPDATE locks USING TTL ? SET value = ? WHERE name = ? IF value IN (NULL, ?)");
        this.releaseLock = session.getSession().prepare("UPDATE locks SET value = NULL WHERE name = ? IF value = ?");
        this.renewLock = session.getSession().prepare("UPDATE locks USING TTL ? SET value = ? WHERE name = ? IF value = ?");
        this.getTTL = session.getSession().prepare("SELECT TTL(value) FROM locks WHERE name = ?");
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("locks-thread-pool-%d").build();
        this.locksExecutor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
        this.activeLocks = new HashMap<String, Lock>();
        this.activeLocksLock = new ReentrantReadWriteLock();
        this.locksExecutor.scheduleAtFixedRate(this::renewLocks, 0L, 10L, TimeUnit.SECONDS);
    }

    public void shutdown() {
        try {
            this.locksExecutor.shutdown();
            this.locksExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            CountDownLatch latch = new CountDownLatch(1);
            this.activeLocksLock.writeLock().lock();
            Observable.from(this.activeLocks.entrySet()).map(Map.Entry::getValue).flatMap(lock -> this.releaseLock(lock.getName(), lock.getValue()).map(released -> new Lock(lock.getName(), lock.getValue(), lock.getExpiration(), lock.getRenewalRate(), released == false))).subscribe(lock -> {
                if (lock.isLocked()) {
                    logger.infof("Failed to release lock %s", (Object)lock.getName());
                }
            }, t -> {
                logger.info((Object)"There was an error while releasing locks", t);
                latch.countDown();
            }, latch::countDown);
            latch.await();
            logger.info((Object)"Shutdown complete");
        }
        catch (InterruptedException e) {
            logger.debug((Object)"Shutdown was interrupted. Some locks may not have been released but they will still expire.");
        }
    }

    private void renewLocks() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            logger.trace((Object)"Renewing locks");
            CountDownLatch latch = new CountDownLatch(1);
            this.activeLocksLock.writeLock().lock();
            Observable.from(this.activeLocks.entrySet()).filter(entry -> ((Lock)entry.getValue()).getExpiration() - System.currentTimeMillis() <= (long)((Lock)entry.getValue()).getRenewalRate()).map(Map.Entry::getValue).doOnNext(lock -> logger.debugf("Renewing %s", lock)).flatMap(this::renewLock).subscribe(lock -> {
                if (lock.isLocked()) {
                    logger.debugf("Renewed %s", lock);
                    this.activeLocks.put(lock.getName(), (Lock)lock);
                } else {
                    logger.warnf("Failed to renew %s", lock);
                    this.activeLocks.remove(lock.getName());
                }
            }, t -> {
                logger.warn((Object)"There was an error renewing locks", t);
                latch.countDown();
            }, latch::countDown);
            latch.await();
        }
        catch (Throwable t2) {
            logger.warn((Object)"There was an error trying to renew locks", t2);
        }
        finally {
            this.activeLocksLock.writeLock().unlock();
            stopwatch.stop();
            logger.tracef("Finished renewing locks in %d ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
        }
    }

    public Observable<Lock> acquireLock(String name, String value, int timeout, boolean autoRenew) {
        long expiration = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
        int renewalRate = timeout / 2;
        long expirationMicro = TimeUnit.MICROSECONDS.convert(expiration, TimeUnit.MILLISECONDS);
        return this.session.execute(this.acquireLock.bind(timeout, value, name, value).setDefaultTimestamp(expirationMicro)).map(ResultSet::wasApplied).map(locked -> {
            Lock lock = new Lock(name, value, expiration, renewalRate, (boolean)locked);
            if (locked.booleanValue() && autoRenew) {
                try {
                    this.activeLocksLock.writeLock().lock();
                    this.activeLocks.put(name, lock);
                }
                finally {
                    this.activeLocksLock.writeLock().unlock();
                }
            }
            return lock;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Observable<Boolean> releaseLock(String name, String value) {
        Lock lock = null;
        try {
            this.activeLocksLock.writeLock().lock();
            lock = this.activeLocks.remove(name);
        }
        finally {
            this.activeLocksLock.writeLock().unlock();
        }
        return this.session.execute((Statement)this.releaseLock.bind(name, value)).map(ResultSet::wasApplied);
    }

    public Observable<Lock> renewLock(Lock lock) {
        long nextExpiration = System.currentTimeMillis() + (long)(lock.getRenewalRate() * 1000);
        long nextExpirationMicro = TimeUnit.MICROSECONDS.convert(lock.getExpiration(), TimeUnit.MILLISECONDS);
        logger.debugf("Renewing %s with TTL of %d", (Object)lock.getName(), (Object)lock.getRenewalRate());
        return this.session.execute(this.acquireLock.bind(lock.getRenewalRate(), lock.getValue(), lock.getName(), lock.getValue()).setDefaultTimestamp(nextExpirationMicro)).map(ResultSet::wasApplied).map(locked -> new Lock(lock.getName(), lock.getValue(), nextExpiration, lock.getRenewalRate(), (boolean)locked));
    }
}

