package org.infinispan.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.infinispan.expiration.impl.ExpirationWithClusteredWriteSkewTest;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "executors.LimitedExecutorTest")
/* loaded from: input_file:org/infinispan/executors/LimitedExecutorTest.class */
public class LimitedExecutorTest extends AbstractInfinispanTest {
    public static final String NAME = "Test";
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue(), getTestThreadFactory(NAME));

    @AfterClass(alwaysRun = true)
    public void stopExecutors() {
        this.executor.shutdownNow();
    }

    public void testBasicWithinThread() throws Exception {
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, new WithinThreadExecutor(), 1);
        CompletableFuture completableFuture = new CompletableFuture();
        limitedExecutor.execute(() -> {
            completableFuture.complete(ExpirationWithClusteredWriteSkewTest.VALUE);
        });
        AssertJUnit.assertEquals(ExpirationWithClusteredWriteSkewTest.VALUE, (String) completableFuture.getNow("task did not run synchronously"));
    }

    public void testConcurrencyLimit() throws Exception {
        ThreadPoolExecutor threadPoolExecutor = this.executor;
        threadPoolExecutor.getClass();
        eventuallyEquals(0, threadPoolExecutor::getActiveCount);
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, this.executor, 1);
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        CompletableFuture<String> completableFuture2 = new CompletableFuture<>();
        limitedExecutor.execute(() -> {
            try {
                completableFuture2.complete(completableFuture.get(10L, TimeUnit.SECONDS));
            } catch (Exception e) {
                completableFuture2.completeExceptionally(e);
            }
        });
        verifyTaskIsBlocked(limitedExecutor, completableFuture, completableFuture2);
    }

    public void testConcurrencyLimitExecuteAsync() throws Exception {
        ThreadPoolExecutor threadPoolExecutor = this.executor;
        threadPoolExecutor.getClass();
        eventuallyEquals(0, threadPoolExecutor::getActiveCount);
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, this.executor, 1);
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        CompletableFuture<String> completableFuture2 = new CompletableFuture<>();
        limitedExecutor.executeAsync(() -> {
            completableFuture2.getClass();
            return completableFuture.thenAccept((v1) -> {
                r1.complete(v1);
            });
        });
        verifyTaskIsBlocked(limitedExecutor, completableFuture, completableFuture2);
    }

    public void testConcurrencyLimitWithinThread() throws Exception {
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, new WithinThreadExecutor(), 1);
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture<String> completableFuture3 = new CompletableFuture<>();
        Future<?> fork = fork(() -> {
            limitedExecutor.execute(() -> {
                completableFuture2.complete("blocking");
                try {
                    completableFuture3.complete(completableFuture.get(10L, TimeUnit.SECONDS));
                } catch (Exception e) {
                    completableFuture3.completeExceptionally(e);
                }
            });
        });
        AssertJUnit.assertEquals("blocking", (String) completableFuture2.get(10L, TimeUnit.SECONDS));
        verifyTaskIsBlocked(limitedExecutor, completableFuture, completableFuture3);
        fork.get(10L, TimeUnit.SECONDS);
    }

    public void testConcurrencyLimitExecuteAsyncWithinThread() throws Exception {
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, new WithinThreadExecutor(), 1);
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        CompletableFuture<String> completableFuture2 = new CompletableFuture<>();
        limitedExecutor.executeAsync(() -> {
            completableFuture2.getClass();
            return completableFuture.thenAccept((v1) -> {
                r1.complete(v1);
            });
        });
        verifyTaskIsBlocked(limitedExecutor, completableFuture, completableFuture2);
    }

    private void verifyTaskIsBlocked(LimitedExecutor limitedExecutor, CompletableFuture<String> completableFuture, CompletableFuture<String> completableFuture2) throws Exception {
        CompletableFuture completableFuture3 = new CompletableFuture();
        CompletableFuture completableFuture4 = new CompletableFuture();
        Future<?> fork = fork(() -> {
            limitedExecutor.execute(() -> {
                try {
                    completableFuture4.complete(((String) completableFuture2.getNow("task 2 ran too early")) + " " + ((String) completableFuture3.get(10L, TimeUnit.SECONDS)));
                } catch (Exception e) {
                    completableFuture4.completeExceptionally(e);
                }
            });
        });
        AssertJUnit.assertFalse(completableFuture2.isDone());
        AssertJUnit.assertFalse(completableFuture4.isDone());
        completableFuture.complete("value1");
        AssertJUnit.assertEquals("value1", completableFuture2.get(10L, TimeUnit.SECONDS));
        AssertJUnit.assertFalse(completableFuture4.isDone());
        completableFuture3.complete("value2");
        AssertJUnit.assertEquals("value1 value2", (String) completableFuture4.get(10L, TimeUnit.SECONDS));
        fork.get(10L, TimeUnit.SECONDS);
        ThreadPoolExecutor threadPoolExecutor = this.executor;
        threadPoolExecutor.getClass();
        eventuallyEquals(0, threadPoolExecutor::getActiveCount);
    }
}
