package org.infinispan.distexec.mapreduce;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.util.concurrent.TimeoutException;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "distexec.mapreduce.SimpleMapReduceTaskTimeoutTest")
/* loaded from: input_file:org/infinispan/distexec/mapreduce/SimpleMapReduceTaskTimeoutTest.class */
public class SimpleMapReduceTaskTimeoutTest extends MultipleCacheManagersTest {
    private static final int REPLICATION_TIMEOUT = 5000;

    /* loaded from: input_file:org/infinispan/distexec/mapreduce/SimpleMapReduceTaskTimeoutTest$DummyReducer.class */
    public static class DummyReducer implements Reducer<String, Integer> {
        public Integer reduce(String str, Iterator<Integer> it) {
            int i = 0;
            while (true) {
                int i2 = i;
                if (!it.hasNext()) {
                    return Integer.valueOf(i2);
                }
                i = i2 + it.next().intValue();
            }
        }

        public /* bridge */ /* synthetic */ Object reduce(Object obj, Iterator it) {
            return reduce((String) obj, (Iterator<Integer>) it);
        }
    }

    /* loaded from: input_file:org/infinispan/distexec/mapreduce/SimpleMapReduceTaskTimeoutTest$SleepMapper.class */
    public static class SleepMapper implements Mapper<String, String, String, Integer> {
        private final long sleepTime;
        private final String sleepOnKey;
        private boolean alreadySlept = false;

        public SleepMapper(long j, String str) {
            this.sleepTime = j;
            this.sleepOnKey = str;
        }

        public void map(String str, String str2, Collector<String, Integer> collector) {
            if (!this.alreadySlept && str.equals(this.sleepOnKey)) {
                try {
                    Thread.sleep(this.sleepTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.alreadySlept = true;
            }
            collector.emit(str2, 1);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Collector collector) {
            map((String) obj, (String) obj2, (Collector<String, Integer>) collector);
        }
    }

    public SimpleMapReduceTaskTimeoutTest() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    public void testTimeout() {
        String init = init();
        MapReduceTask<String, String, String, Integer> createMapReduceTask = createMapReduceTask(cache(0));
        AssertJUnit.assertEquals("Wrong task timeout.", 5000L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        AssertJUnit.assertEquals("Wrong replication timeout.", 5000L, cache(0).getCacheConfiguration().clustering().sync().replTimeout());
        createMapReduceTask.timeout(20000L, TimeUnit.MILLISECONDS);
        AssertJUnit.assertEquals("Wrong new task timeout.", 20000L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        createMapReduceTask.mappedWith(new SleepMapper(10000L, init)).reducedWith(new DummyReducer());
        long nanoTime = System.nanoTime();
        createMapReduceTask.execute();
        AssertJUnit.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 10000);
    }

    public void testTimeout2() {
        String init = init();
        MapReduceTask<String, String, String, Integer> createMapReduceTask = createMapReduceTask(cache(0));
        AssertJUnit.assertEquals("Wrong task timeout.", 5000L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        AssertJUnit.assertEquals("Wrong replication timeout.", 5000L, cache(0).getCacheConfiguration().clustering().sync().replTimeout());
        createMapReduceTask.timeout(1250L, TimeUnit.MILLISECONDS);
        AssertJUnit.assertEquals("Wrong new task timeout.", 1250L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        createMapReduceTask.mappedWith(new SleepMapper(2500L, init)).reducedWith(new DummyReducer());
        long nanoTime = System.nanoTime();
        try {
            createMapReduceTask.execute();
        } catch (CacheException e) {
            AssertJUnit.assertTrue(hasTimeoutException(e));
        }
        AssertJUnit.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 1250);
    }

    public void testNegativeTimeout() {
        String init = init();
        MapReduceTask<String, String, String, Integer> createMapReduceTask = createMapReduceTask(cache(0));
        AssertJUnit.assertEquals("Wrong task timeout.", 5000L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        AssertJUnit.assertEquals("Wrong replication timeout.", 5000L, cache(0).getCacheConfiguration().clustering().sync().replTimeout());
        createMapReduceTask.timeout(-1L, TimeUnit.MILLISECONDS);
        AssertJUnit.assertEquals("Wrong new task timeout.", -1L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        createMapReduceTask.mappedWith(new SleepMapper(10000L, init)).reducedWith(new DummyReducer());
        long nanoTime = System.nanoTime();
        createMapReduceTask.execute();
        AssertJUnit.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 10000);
    }

    public void testZeroTimeout() {
        String init = init();
        MapReduceTask<String, String, String, Integer> createMapReduceTask = createMapReduceTask(cache(0));
        AssertJUnit.assertEquals("Wrong task timeout.", 5000L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        AssertJUnit.assertEquals("Wrong replication timeout.", 5000L, cache(0).getCacheConfiguration().clustering().sync().replTimeout());
        createMapReduceTask.timeout(0L, TimeUnit.MILLISECONDS);
        AssertJUnit.assertEquals("Wrong new task timeout.", 0L, createMapReduceTask.timeout(TimeUnit.MILLISECONDS));
        createMapReduceTask.mappedWith(new SleepMapper(10000L, init)).reducedWith(new DummyReducer());
        long nanoTime = System.nanoTime();
        createMapReduceTask.execute();
        AssertJUnit.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 10000);
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
        defaultClusteredCacheConfig.clustering().sync().replTimeout(5000L);
        createClusteredCaches(4, defaultClusteredCacheConfig);
    }

    protected MapReduceTask<String, String, String, Integer> createMapReduceTask(Cache<String, String> cache) {
        return new MapReduceTask<>(cache);
    }

    private boolean hasTimeoutException(CacheException cacheException) {
        CacheException cacheException2 = cacheException;
        while (true) {
            CacheException cacheException3 = cacheException2;
            if (cacheException3 == null) {
                return false;
            }
            if (cacheException3 instanceof TimeoutException) {
                return true;
            }
            cacheException2 = cacheException3.getCause();
        }
    }

    private String init() {
        String str = null;
        for (int i = 0; i < 100; i++) {
            String valueOf = String.valueOf(i);
            cache(0).put(valueOf, "v" + (i % 4));
            if (str == null && DistributionTestHelper.isFirstOwner(cache(1), valueOf)) {
                str = valueOf;
            }
        }
        return str;
    }
}
