/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.test.core;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.test.core.BlockedThreadWarning;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Rule;
import org.junit.Test;

public class NamedWorkerPoolTest
extends VertxTestBase {
    @Rule
    public BlockedThreadWarning blockedThreadWarning = new BlockedThreadWarning();

    @Test
    public void testMaxExecuteWorkerTime() throws Exception {
        String poolName = TestUtils.randomAlphaString(10);
        long maxWorkerExecuteTime = TimeUnit.NANOSECONDS.convert(3L, TimeUnit.SECONDS);
        DeploymentOptions deploymentOptions = new DeploymentOptions().setWorkerPoolName(poolName).setMaxWorkerExecuteTime(maxWorkerExecuteTime);
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start(Future<Void> startFuture) throws Exception {
                this.vertx.executeBlocking(fut -> {
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                        fut.complete();
                    }
                    catch (InterruptedException e) {
                        fut.fail((Throwable)e);
                    }
                }, startFuture);
            }
        }, deploymentOptions, this.onSuccess(did -> this.testComplete()));
        this.await();
        this.blockedThreadWarning.expectMessage(poolName, maxWorkerExecuteTime);
    }

    @Test
    public void testThread() {
        String poolName = TestUtils.randomAlphaString(10);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName);
        AtomicBoolean onVertxThread = new AtomicBoolean();
        AtomicBoolean onWorkerThread = new AtomicBoolean();
        AtomicBoolean onEventLoopThread = new AtomicBoolean();
        AtomicReference threadName = new AtomicReference();
        worker.executeBlocking(fut -> {
            onVertxThread.set(Context.isOnVertxThread());
            onWorkerThread.set(Context.isOnWorkerThread());
            onEventLoopThread.set(Context.isOnEventLoopThread());
            threadName.set(Thread.currentThread().getName());
            fut.complete(null);
        }, ar -> this.testComplete());
        NamedWorkerPoolTest.assertWaitUntil(() -> threadName.get() != null);
        this.assertTrue(onVertxThread.get());
        this.assertTrue(onWorkerThread.get());
        this.assertFalse(onEventLoopThread.get());
        this.assertTrue(((String)threadName.get()).startsWith(poolName + "-"));
    }

    @Test
    public void testOrdered() {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName);
        int num = 1000;
        AtomicReference t = new AtomicReference();
        CountDownLatch submitted = new CountDownLatch(1);
        Context ctx = this.vertx.getOrCreateContext();
        ctx.runOnContext(v -> {
            for (int i = 0; i < num; ++i) {
                boolean first = i == 0;
                boolean last = i == num - 1;
                worker.executeBlocking(fut -> {
                    if (first) {
                        try {
                            this.awaitLatch(submitted);
                        }
                        catch (InterruptedException e) {
                            this.fail(e);
                            return;
                        }
                        this.assertNull(t.get());
                        t.set(Thread.currentThread());
                    } else {
                        this.assertEquals(t.get(), Thread.currentThread());
                    }
                    this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                    fut.complete(null);
                }, ar -> {
                    if (last) {
                        this.testComplete();
                    }
                });
            }
            submitted.countDown();
        });
        this.await();
    }

    @Test
    public void testUnordered() throws Exception {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int num = 5;
        this.waitFor(num);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName);
        CountDownLatch latch1 = new CountDownLatch(num);
        CountDownLatch latch2 = new CountDownLatch(1);
        Context ctx = this.vertx.getOrCreateContext();
        ctx.runOnContext(v -> {
            for (int i = 0; i < num; ++i) {
                worker.executeBlocking(fut -> {
                    latch1.countDown();
                    try {
                        this.awaitLatch(latch2);
                    }
                    catch (InterruptedException e) {
                        this.fail(e);
                        return;
                    }
                    this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                    fut.complete(null);
                }, false, ar -> this.complete());
            }
        });
        this.awaitLatch(latch1);
        latch2.countDown();
        this.await();
    }

    @Test
    public void testUseDifferentExecutorWithSameTaskQueue() throws Exception {
        final int count = 10;
        this.waitFor(count);
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() throws Exception {
                WorkerExecutor exec = this.vertx.createSharedWorkerExecutor("vert.x-the-executor");
                Thread startThread = Thread.currentThread();
                AtomicReference currentThread = new AtomicReference();
                int i = 0;
                while (i < count) {
                    int val = i++;
                    exec.executeBlocking(fut -> {
                        Thread current = Thread.currentThread();
                        NamedWorkerPoolTest.this.assertNotSame(startThread, current);
                        if (val == 0) {
                            NamedWorkerPoolTest.this.assertNull(currentThread.getAndSet(current));
                        } else {
                            NamedWorkerPoolTest.this.assertSame(current, currentThread.get());
                        }
                        fut.complete();
                    }, true, NamedWorkerPoolTest.this.onSuccess(v -> NamedWorkerPoolTest.this.complete()));
                }
            }
        }, new DeploymentOptions().setWorker(true), this.onSuccess(id -> {}));
        this.await();
    }

    @Test
    public void testPoolSize() throws Exception {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int poolSize = 5;
        this.waitFor(poolSize);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName, poolSize);
        CountDownLatch latch1 = new CountDownLatch(poolSize * 100);
        Set names = Collections.synchronizedSet(new HashSet());
        for (int i = 0; i < poolSize * 100; ++i) {
            worker.executeBlocking(fut -> {
                names.add(Thread.currentThread().getName());
                latch1.countDown();
            }, false, ar -> {});
        }
        this.awaitLatch(latch1);
        this.assertEquals(5L, names.size());
    }

    @Test
    public void testCloseWorkerPool() throws Exception {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        AtomicReference thread = new AtomicReference();
        WorkerExecutor worker1 = this.vertx.createSharedWorkerExecutor(poolName);
        WorkerExecutor worker2 = this.vertx.createSharedWorkerExecutor(poolName);
        worker1.executeBlocking(fut -> thread.set(Thread.currentThread()), ar -> {});
        NamedWorkerPoolTest.assertWaitUntil(() -> thread.get() != null);
        worker1.close();
        this.assertNotSame((Object)((Thread)thread.get()).getState(), (Object)Thread.State.TERMINATED);
        worker2.close();
        NamedWorkerPoolTest.assertWaitUntil(() -> ((Thread)thread.get()).getState() == Thread.State.TERMINATED);
    }

    @Test
    public void testDestroyWorkerPoolWhenVerticleUndeploys() throws Exception {
        final String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        CompletableFuture deploymentIdRef = new CompletableFuture();
        final AtomicReference pool = new AtomicReference();
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() throws Exception {
                pool.set(this.vertx.createSharedWorkerExecutor(poolName));
            }
        }, this.onSuccess(deploymentIdRef::complete));
        String deploymentId = (String)deploymentIdRef.get(20L, TimeUnit.SECONDS);
        this.vertx.undeploy(deploymentId, this.onSuccess(v -> {
            try {
                ((WorkerExecutor)pool.get()).executeBlocking(fut -> this.fail(), null);
                this.fail();
            }
            catch (IllegalStateException ignore) {
                this.testComplete();
            }
        }));
        this.await();
    }

    @Test
    public void testDeployUsingNamedPool() throws Exception {
        final AtomicReference thread = new AtomicReference();
        final String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() throws Exception {
                this.vertx.executeBlocking(fut -> {
                    thread.set(Thread.currentThread());
                    NamedWorkerPoolTest.this.assertTrue(Context.isOnVertxThread());
                    NamedWorkerPoolTest.this.assertTrue(Context.isOnWorkerThread());
                    NamedWorkerPoolTest.this.assertFalse(Context.isOnEventLoopThread());
                    NamedWorkerPoolTest.this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                    fut.complete();
                }, NamedWorkerPoolTest.this.onSuccess(v -> this.vertx.undeploy(this.context.deploymentID())));
            }
        }, new DeploymentOptions().setWorkerPoolName(poolName), this.onSuccess(v -> {}));
        NamedWorkerPoolTest.assertWaitUntil(() -> thread.get() != null && ((Thread)thread.get()).getState() == Thread.State.TERMINATED);
    }

    @Test
    public void testDeployWorkerUsingNamedPool() throws Exception {
        final AtomicReference thread = new AtomicReference();
        AtomicReference deployment = new AtomicReference();
        final String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() throws Exception {
                thread.set(Thread.currentThread());
                NamedWorkerPoolTest.this.assertTrue(Context.isOnVertxThread());
                NamedWorkerPoolTest.this.assertTrue(Context.isOnWorkerThread());
                NamedWorkerPoolTest.this.assertFalse(Context.isOnEventLoopThread());
                NamedWorkerPoolTest.this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                this.context.runOnContext(v -> this.vertx.undeploy(this.context.deploymentID()));
            }
        }, new DeploymentOptions().setWorker(true).setWorkerPoolName(poolName), this.onSuccess(deployment::set));
        NamedWorkerPoolTest.assertWaitUntil(() -> thread.get() != null && ((Thread)thread.get()).getState() == Thread.State.TERMINATED);
    }

    @Test
    public void testCloseWorkerPoolsWhenVertxCloses() {
        Vertx vertx = Vertx.vertx();
        WorkerExecutor exec = vertx.createSharedWorkerExecutor("vert.x-123");
        vertx.close(v -> {
            try {
                vertx.executeBlocking(fut -> this.fail(), ar -> this.fail());
                this.fail();
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
            try {
                exec.executeBlocking(fut -> this.fail(), ar -> this.fail());
                this.fail();
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
            exec.close();
            this.testComplete();
        });
        this.await();
    }
}

