package org.infinispan.distexec;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distexec.LocalDistributedExecutorTest;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.AbstractInfinispanTest;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "distexec.DistributedExecutorTest")
/* loaded from: input_file:org/infinispan/distexec/DistributedExecutorTest.class */
public class DistributedExecutorTest extends LocalDistributedExecutorTest {
    public static AtomicInteger counter;
    protected boolean supportsConcurrentUpdates = true;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/infinispan/distexec/DistributedExecutorTest$LongRunningCallable.class */
    static class LongRunningCallable implements Callable<Integer>, Serializable {
        private static final long serialVersionUID = -6110011263261397071L;

        LongRunningCallable() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            DistributedExecutorTest.counter.incrementAndGet();
            try {
                countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                DistributedExecutorTest.counter.incrementAndGet();
            }
            return 1;
        }
    }

    public DistributedExecutorTest() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override // org.infinispan.distexec.LocalDistributedExecutorTest, org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(getCacheMode(), false);
        defaultClusteredCacheConfig.locking().supportsConcurrentUpdates(this.supportsConcurrentUpdates);
        createClusteredCaches(2, cacheName(), defaultClusteredCacheConfig);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.distexec.LocalDistributedExecutorTest
    public String cacheName() {
        return "DistributedExecutorTest-DIST_SYNC";
    }

    @Override // org.infinispan.distexec.LocalDistributedExecutorTest
    protected CacheMode getCacheMode() {
        return CacheMode.DIST_SYNC;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.distexec.LocalDistributedExecutorTest
    public Cache<Object, Object> getCache() {
        return cache(0, cacheName());
    }

    @Test(expectedExceptions = {ExecutionException.class})
    public void testBasicTargetLocalDistributedCallableWithTimeout() throws Exception {
        Cache<Object, Object> cache = getCache();
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache.getAdvancedCache().getRpcManager().getAddress();
        DistributedTaskBuilder createDistributedTaskBuilder = createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable());
        createDistributedTaskBuilder.timeout(1000L, TimeUnit.MILLISECONDS);
        createDES.submit(address, createDistributedTaskBuilder.build()).get();
    }

    @Test(expectedExceptions = {ExecutionException.class})
    public void testBasicTargetRemoteDistributedCallableWithException() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        Cache cache2 = cache(1, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        createDES.submit(cache2.getAdvancedCache().getRpcManager().getAddress(), createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.ExceptionThrowingCallable()).build()).get();
    }

    @Test(expectedExceptions = {TimeoutException.class})
    public void testBasicTargetLocalDistributedCallableWithHighFutureAndLowTaskTimeout() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache.getAdvancedCache().getRpcManager().getAddress();
        DistributedTaskBuilder createDistributedTaskBuilder = createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable());
        createDistributedTaskBuilder.timeout(1000L, TimeUnit.MILLISECONDS);
        createDES.submit(address, createDistributedTaskBuilder.build()).get(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(expectedExceptions = {TimeoutException.class})
    public void testBasicTargetLocalDistributedCallableWithLowFutureAndHighTaskTimeout() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache.getAdvancedCache().getRpcManager().getAddress();
        DistributedTaskBuilder createDistributedTaskBuilder = createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable());
        createDistributedTaskBuilder.timeout(10000L, TimeUnit.MILLISECONDS);
        createDES.submit(address, createDistributedTaskBuilder.build()).get(1000L, TimeUnit.MILLISECONDS);
    }

    @Test(expectedExceptions = {TimeoutException.class})
    public void testBasicTargetRemoteDistributedCallableWithHighFutureAndLowTaskTimeout() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        Cache cache2 = cache(1, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache2.getAdvancedCache().getRpcManager().getAddress();
        DistributedTaskBuilder createDistributedTaskBuilder = createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable());
        createDistributedTaskBuilder.timeout(1000L, TimeUnit.MILLISECONDS);
        createDES.submit(address, createDistributedTaskBuilder.build()).get(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(expectedExceptions = {TimeoutException.class})
    public void testBasicTargetRemoteDistributedCallableWithLowFutureAndHighTaskTimeout() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        Cache cache2 = cache(1, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache2.getAdvancedCache().getRpcManager().getAddress();
        DistributedTaskBuilder createDistributedTaskBuilder = createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable());
        createDistributedTaskBuilder.timeout(10000L, TimeUnit.MILLISECONDS);
        createDES.submit(address, createDistributedTaskBuilder.build()).get(1000L, TimeUnit.MILLISECONDS);
    }

    public void testBasicTargetLocalDistributedCallableWithoutSpecTimeout() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        AssertJUnit.assertEquals(1, createDES.submit(cache.getAdvancedCache().getRpcManager().getAddress(), createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable()).build()).get());
    }

    public void testTaskCancellation() throws Exception {
        DistributedExecutorService createDES = createDES(getCache());
        ArrayList arrayList = new ArrayList(getCache().getAdvancedCache().getRpcManager().getMembers());
        AssertJUnit.assertEquals(caches(cacheName()).size(), arrayList.size());
        arrayList.remove(getCache().getAdvancedCache().getRpcManager().getAddress());
        Future submit = createDES.submit((Address) arrayList.get(0), createDES.createDistributedTaskBuilder(new LongRunningCallable()).build());
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.distexec.DistributedExecutorTest.1
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return DistributedExecutorTest.counter.get() >= 1;
            }
        });
        submit.cancel(true);
        boolean z = false;
        try {
            submit.get();
        } catch (Exception e) {
            z = e instanceof CancellationException;
        }
        if (!$assertionsDisabled && !z) {
            throw new AssertionError("Dist task not cancelled ");
        }
        if (!$assertionsDisabled && counter.get() < 2) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !submit.isCancelled()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !submit.isDone()) {
            throw new AssertionError();
        }
        boolean cancel = submit.cancel(true);
        if (!$assertionsDisabled && cancel) {
            throw new AssertionError();
        }
    }

    @Test(expectedExceptions = {CancellationException.class})
    public void testCancelAndGet() throws Exception {
        DistributedExecutorService createDES = createDES(getCache());
        ArrayList arrayList = new ArrayList(getCache().getAdvancedCache().getRpcManager().getMembers());
        AssertJUnit.assertEquals(caches(cacheName()).size(), arrayList.size());
        arrayList.remove(getCache().getAdvancedCache().getRpcManager().getAddress());
        Future submit = createDES.submit((Address) arrayList.get(0), createDES.createDistributedTaskBuilder(new LongRunningCallable()).build());
        submit.cancel(true);
        submit.get();
    }

    @Test(expectedExceptions = {TimeoutException.class})
    public void testTimeoutOnLocalNode() throws Exception {
        AdvancedCache advancedCache = getCache().getAdvancedCache();
        createDES(advancedCache).submit(advancedCache.getRpcManager().getAddress(), new LocalDistributedExecutorTest.SleepingSimpleCallable()).get(2000L, TimeUnit.MILLISECONDS);
    }

    public void testBasicTargetDistributedCallableTargetSameNode() throws Exception {
        Cache<Object, Object> cache = getCache();
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache.getAdvancedCache().getRpcManager().getAddress();
        Boolean bool = (Boolean) createDES.submit(address, new LocalDistributedExecutorTest.SimpleDistributedCallable(false)).get();
        if (!$assertionsDisabled && !bool.booleanValue()) {
            throw new AssertionError();
        }
        Boolean bool2 = (Boolean) createDES.submit(address, createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SimpleDistributedCallable(false)).build()).get();
        if (!$assertionsDisabled && !bool2.booleanValue()) {
            throw new AssertionError();
        }
    }

    public void testBasicTargetDistributedCallable() throws Exception {
        Cache<?, ?> cache = cache(0, cacheName());
        Cache cache2 = cache(1, cacheName());
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache2.getAdvancedCache().getRpcManager().getAddress();
        Boolean bool = (Boolean) createDES.submit(address, new LocalDistributedExecutorTest.SimpleDistributedCallable(false)).get();
        if (!$assertionsDisabled && !bool.booleanValue()) {
            throw new AssertionError();
        }
        Boolean bool2 = (Boolean) createDES.submit(address, createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SimpleDistributedCallable(false)).build()).get();
        if (!$assertionsDisabled && !bool2.booleanValue()) {
            throw new AssertionError();
        }
    }

    @Test(expectedExceptions = {ExecutionException.class})
    public void testBasicTargetDistributedCallableWithTimeout() throws Exception {
        Cache<Object, Object> cache = getCache();
        DistributedExecutorService createDES = createDES(cache);
        Address address = cache.getAdvancedCache().getRpcManager().getAddress();
        DistributedTaskBuilder createDistributedTaskBuilder = createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SleepingSimpleCallable());
        createDistributedTaskBuilder.timeout(10L, TimeUnit.MILLISECONDS);
        createDES.submit(address, createDistributedTaskBuilder.build()).get();
    }

    @Test(expectedExceptions = {IllegalArgumentException.class})
    public void testBasicTargetCallableWithNullTask() {
        Cache<Object, Object> cache = getCache();
        createDES(cache).submit(cache.getAdvancedCache().getRpcManager().getAddress(), (Callable) null);
    }

    @Test(expectedExceptions = {NullPointerException.class})
    public void testBasicTargetDistributedTaskWithNullTask() {
        Cache<Object, Object> cache = getCache();
        createDES(cache).submit(cache.getAdvancedCache().getRpcManager().getAddress(), (DistributedTask) null);
    }

    public void testDistributedCallableEverywhereWithKeysOnBothNodes() throws Exception {
        Cache<Object, Object> cache = getCache();
        cache.put("key1", "Manik");
        cache.put("key2", "Mircea");
        cache.put("key3", "Galder");
        cache.put("key4", "Sanne");
        Cache cache2 = cache(1, cacheName());
        cache2.put("key5", "test");
        cache2.put("key6", "test1");
        DistributedExecutorService createDES = createDES(getCache());
        List<Future> submitEverywhere = createDES.submitEverywhere(new LocalDistributedExecutorTest.SimpleDistributedCallable(true), new String[]{"key1", "key2", "key5", "key6"});
        if (!$assertionsDisabled && (submitEverywhere == null || submitEverywhere.isEmpty())) {
            throw new AssertionError();
        }
        for (Future future : submitEverywhere) {
            if (!$assertionsDisabled && !((Boolean) future.get()).booleanValue()) {
                throw new AssertionError();
            }
        }
        List<Future> submitEverywhere2 = createDES.submitEverywhere(createDES.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SimpleDistributedCallable(true)).build(), new String[]{"key1", "key2"});
        if (!$assertionsDisabled && (submitEverywhere2 == null || submitEverywhere2.isEmpty())) {
            throw new AssertionError();
        }
        for (Future future2 : submitEverywhere2) {
            if (!$assertionsDisabled && !((Boolean) future2.get()).booleanValue()) {
                throw new AssertionError();
            }
        }
    }

    static {
        $assertionsDisabled = !DistributedExecutorTest.class.desiredAssertionStatus();
        counter = new AtomicInteger();
    }
}
