package org.jgroups.blocks;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.blocks.executor.ExecutionCompletionService;
import org.jgroups.blocks.executor.ExecutionRunner;
import org.jgroups.blocks.executor.ExecutionService;
import org.jgroups.blocks.executor.Executions;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.CENTRAL_EXECUTOR;
import org.jgroups.protocols.Executing;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.NotifyingFuture;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.STACK_DEPENDENT}, sequential = true)
/* loaded from: input_file:org/jgroups/blocks/ExecutingServiceTest.class */
public class ExecutingServiceTest extends ChannelTestBase {
    protected static Log log;
    protected static AtomicReference<CyclicBarrier> requestBlocker;
    protected JChannel c1;
    protected JChannel c2;
    protected JChannel c3;
    protected ExecutionService e1;
    protected ExecutionService e2;
    protected ExecutionService e3;
    protected ExecutionRunner er1;
    protected ExecutionRunner er2;
    protected ExecutionRunner er3;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/blocks/ExecutingServiceTest$ExposedExecutingProtocol.class */
    public static class ExposedExecutingProtocol extends CENTRAL_EXECUTOR {
        static final /* synthetic */ boolean $assertionsDisabled;

        public ExposedExecutingProtocol() {
            this.id = ClassConfigurator.getProtocolId(CENTRAL_EXECUTOR.class);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.jgroups.protocols.Executing
        public void sendRequest(Address address, Executing.Type type, long j, Object obj) {
            CyclicBarrier cyclicBarrier = ExecutingServiceTest.requestBlocker.get();
            if (cyclicBarrier != null) {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Exception while waiting: " + e.toString());
                    }
                } catch (BrokenBarrierException e2) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Exception while waiting: " + e2.toString());
                    }
                }
            }
            super.sendRequest(address, type, j, obj);
        }

        public Queue<Runnable> getAwaitingConsumerQueue() {
            return this._awaitingConsumer;
        }

        public Queue<Executing.Owner> getRequestsFromCoordinator() {
            return this._runRequests;
        }

        public Lock getLock() {
            return this._consumerLock;
        }

        static {
            $assertionsDisabled = !ExecutingServiceTest.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/jgroups/blocks/ExecutingServiceTest$SimpleCallable.class */
    protected static class SimpleCallable<V> implements Callable<V> {
        final V _object;

        public SimpleCallable(String str) {
            throw new UnsupportedOperationException();
        }

        public SimpleCallable(Integer num) {
            throw new UnsupportedOperationException();
        }

        public SimpleCallable(V v) {
            this._object = v;
        }

        public SimpleCallable() {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.Callable
        public V call() throws Exception {
            return this._object;
        }
    }

    /* loaded from: input_file:org/jgroups/blocks/ExecutingServiceTest$SimpleStreamableCallable.class */
    protected static class SimpleStreamableCallable<V> implements Callable<V>, Streamable {
        V _object;

        public SimpleStreamableCallable() {
        }

        public SimpleStreamableCallable(V v) {
            this._object = v;
        }

        @Override // java.util.concurrent.Callable
        public V call() throws Exception {
            return this._object;
        }

        public String toString() {
            return "SimpleSerializableCallable [value=" + this._object + "]";
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutputStream dataOutputStream) throws IOException {
            try {
                Util.writeObject(this._object, dataOutputStream);
            } catch (IOException e) {
                throw e;
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInputStream dataInputStream) throws IOException, IllegalAccessException, InstantiationException {
            try {
                this._object = (V) Util.readObject(dataInputStream);
            } catch (IOException e) {
                throw e;
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jgroups/blocks/ExecutingServiceTest$SleepingStreamableCallable.class */
    public static class SleepingStreamableCallable implements Callable<Void>, Streamable {
        long millis;
        public static BlockingQueue<Thread> canceledThreads = new LinkedBlockingQueue();
        public static CyclicBarrier barrier = new CyclicBarrier(2);

        public SleepingStreamableCallable() {
        }

        public SleepingStreamableCallable(long j) {
            this.millis = j;
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutputStream dataOutputStream) throws IOException {
            dataOutputStream.writeLong(this.millis);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInputStream dataInputStream) throws IOException, IllegalAccessException, InstantiationException {
            this.millis = dataInputStream.readLong();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            barrier.await();
            try {
                Thread.sleep(this.millis);
                return null;
            } catch (InterruptedException e) {
                Thread currentThread = Thread.currentThread();
                if (ExecutingServiceTest.log.isTraceEnabled()) {
                    ExecutingServiceTest.log.trace("Submitted cancelled thread - " + currentThread);
                }
                canceledThreads.offer(currentThread);
                return null;
            }
        }

        public String toString() {
            return "SleepingStreamableCallable [timeout=" + this.millis + "]";
        }
    }

    @BeforeClass
    protected void init() throws Exception {
        this.c1 = createChannel(true, 3, "A");
        addExecutingProtocol(this.c1);
        this.er1 = new ExecutionRunner(this.c1);
        this.c1.connect("ExecutionServiceTest");
        this.c2 = createChannel(this.c1, "B");
        this.er2 = new ExecutionRunner(this.c2);
        this.c2.connect("ExecutionServiceTest");
        this.c3 = createChannel(this.c1, "C");
        this.er3 = new ExecutionRunner(this.c3);
        this.c3.connect("ExecutionServiceTest");
        LogFactory.getLog(ExecutionRunner.class).setLevel("trace");
    }

    @AfterClass
    protected void cleanup() {
        Util.close(this.c3, this.c2, this.c1);
    }

    @BeforeMethod
    protected void createExecutors() {
        this.e1 = new ExecutionService(this.c1);
        this.e2 = new ExecutionService(this.c2);
        this.e3 = new ExecutionService(this.c3);
        SleepingStreamableCallable.canceledThreads.clear();
        SleepingStreamableCallable.barrier.reset();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testSimpleSerializableCallableSubmit() throws InterruptedException, ExecutionException, TimeoutException {
        SimpleStreamableCallable simpleStreamableCallable = new SimpleStreamableCallable(100L);
        Thread thread = new Thread(this.er2);
        thread.start();
        Long l = (Long) this.e1.submit((Callable) simpleStreamableCallable).get(10L, TimeUnit.SECONDS);
        thread.interrupt();
        if (!$assertionsDisabled && 100L != l) {
            throw new AssertionError("The value returned doesn't match");
        }
        thread.join(2000L);
        if (!$assertionsDisabled && thread.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
    }

    @Test
    public void testSimpleSerializableCallableConcurrently() throws InterruptedException, ExecutionException, TimeoutException {
        Thread[] threadArr = {new Thread(this.er1), new Thread(this.er2), new Thread(this.er3)};
        for (Thread thread : threadArr) {
            thread.start();
        }
        Random random = new Random();
        Future[] futureArr = new Future[100];
        Future[] futureArr2 = new Future[100];
        Future[] futureArr3 = new Future[100];
        StringBuilder sb = new StringBuilder("base");
        for (int i = 0; i < 100; i++) {
            sb.append(random.nextInt(10));
            String sb2 = sb.toString();
            futureArr[i] = this.e1.submit((Callable) new SimpleStreamableCallable(sb2));
            futureArr2[i] = this.e2.submit((Callable) new SimpleStreamableCallable(sb2));
            futureArr3[i] = this.e3.submit((Callable) new SimpleStreamableCallable(sb2));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            Object obj = futureArr[i2].get(10L, TimeUnit.SECONDS);
            if (!$assertionsDisabled && !obj.equals(futureArr2[i2].get(10L, TimeUnit.SECONDS))) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !obj.equals(futureArr3[i2].get(10L, TimeUnit.SECONDS))) {
                throw new AssertionError();
            }
            CharSequence subSequence = sb.subSequence(0, 5 + i2);
            if (!$assertionsDisabled && !obj.equals(subSequence)) {
                throw new AssertionError();
            }
        }
        for (Thread thread2 : threadArr) {
            thread2.interrupt();
            thread2.join(2000L);
            if (!$assertionsDisabled && thread2.isAlive()) {
                throw new AssertionError("Consumer did not stop correctly");
            }
        }
    }

    @Test
    public void testInterruptWhileRunningAlot() throws InterruptedException, BrokenBarrierException, TimeoutException {
        for (int i = 0; i < 500; i++) {
            testInterruptTaskRequestWhileRunning();
        }
    }

    protected void testInterruptTaskRequestWhileRunning() throws InterruptedException, BrokenBarrierException, TimeoutException {
        SleepingStreamableCallable sleepingStreamableCallable = new SleepingStreamableCallable(10000L);
        Thread thread = new Thread(this.er2);
        thread.start();
        NotifyingFuture submit = this.e1.submit((Callable) sleepingStreamableCallable);
        SleepingStreamableCallable.barrier.await(5L, TimeUnit.SECONDS);
        if (log.isTraceEnabled()) {
            log.trace("Cancelling future by interrupting");
        }
        submit.cancel(true);
        Thread poll = SleepingStreamableCallable.canceledThreads.poll(2L, TimeUnit.SECONDS);
        if (log.isTraceEnabled()) {
            log.trace("Cancelling task by interrupting");
        }
        thread.interrupt();
        if (!$assertionsDisabled && poll == null) {
            throw new AssertionError("There was no cancelled thread");
        }
        thread.join(2000L);
        if (!$assertionsDisabled && thread.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
    }

    @Test
    public void testInterruptTaskRequestBeforeRunning() throws InterruptedException, TimeoutException {
        NotifyingFuture submit = this.e1.submit((Callable) new SleepingStreamableCallable(10000L));
        ExposedExecutingProtocol exposedExecutingProtocol = (ExposedExecutingProtocol) this.c1.getProtocolStack().findProtocol(ExposedExecutingProtocol.class);
        Queue<Runnable> awaitingConsumerQueue = exposedExecutingProtocol.getAwaitingConsumerQueue();
        Lock lock = exposedExecutingProtocol.getLock();
        lock.lock();
        try {
            if (!$assertionsDisabled && awaitingConsumerQueue.peek() == null) {
                throw new AssertionError("The object in queue doesn't match");
            }
            submit.cancel(false);
            lock.lock();
            try {
                if (!$assertionsDisabled && awaitingConsumerQueue.peek() != null) {
                    throw new AssertionError("There should be no more objects in the queue");
                }
                lock.unlock();
            } finally {
                lock.unlock();
            }
        } finally {
        }
    }

    @Test
    public void testExecutorAwaitTerminationNoInterrupt() throws InterruptedException, BrokenBarrierException, TimeoutException {
        testExecutorAwaitTermination(false);
    }

    @Test
    public void testExecutorAwaitTerminationInterrupt() throws InterruptedException, BrokenBarrierException, TimeoutException {
        testExecutorAwaitTermination(true);
    }

    protected void testExecutorAwaitTermination(boolean z) throws InterruptedException, BrokenBarrierException, TimeoutException {
        Thread thread = new Thread(this.er2);
        thread.start();
        SleepingStreamableCallable sleepingStreamableCallable = new SleepingStreamableCallable(101L);
        this.e1.submit((Callable) sleepingStreamableCallable);
        SleepingStreamableCallable.barrier.await(2L, TimeUnit.SECONDS);
        if (z) {
            if (log.isTraceEnabled()) {
                log.trace("Cancelling futures by interrupting");
            }
            this.e1.shutdownNow();
            if (!$assertionsDisabled && SleepingStreamableCallable.canceledThreads.poll(2L, TimeUnit.SECONDS) == null) {
                throw new AssertionError("Thread wasn't interrupted due to our request");
            }
        } else {
            this.e1.shutdown();
        }
        if (!$assertionsDisabled && !this.e1.awaitTermination(2L, TimeUnit.SECONDS)) {
            throw new AssertionError("Executor didn't terminate fast enough");
        }
        try {
            this.e1.submit((Callable) sleepingStreamableCallable);
        } catch (RejectedExecutionException e) {
        }
        if (!$assertionsDisabled) {
            throw new AssertionError("Task was submitted, where as it should have been rejected");
        }
        if (log.isTraceEnabled()) {
            log.trace("Cancelling task by interrupting");
        }
        thread.interrupt();
        thread.join(2000L);
        if (!$assertionsDisabled && thread.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testNonSerializableCallable() throws SecurityException, NoSuchMethodException, InterruptedException, ExecutionException, TimeoutException {
        Thread thread = new Thread(this.er2);
        thread.start();
        Constructor constructor = SimpleCallable.class.getConstructor(Object.class);
        constructor.getGenericParameterTypes();
        Long l = (Long) this.e1.submit((Callable) Executions.serializableCallable(constructor, 100L)).get(10L, TimeUnit.SECONDS);
        thread.interrupt();
        if (!$assertionsDisabled && 100L != l) {
            throw new AssertionError("The value returned doesn't match");
        }
        thread.join(2000L);
        if (!$assertionsDisabled && thread.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
    }

    @Test
    public void testExecutionCompletionService() throws InterruptedException {
        Thread thread = new Thread(this.er2);
        thread.start();
        Thread thread2 = new Thread(this.er3);
        thread2.start();
        ExecutionCompletionService executionCompletionService = new ExecutionCompletionService(this.e1);
        Future submit = executionCompletionService.submit(new SleepingStreamableCallable(300L));
        Future submit2 = executionCompletionService.submit(new SleepingStreamableCallable(100L));
        if (!$assertionsDisabled && executionCompletionService.poll(2L, TimeUnit.SECONDS) != submit2) {
            throw new AssertionError("The task either didn't come back or was in wrong order");
        }
        if (!$assertionsDisabled && executionCompletionService.poll(2L, TimeUnit.SECONDS) != submit) {
            throw new AssertionError("The task either didn't come back or was in wrong order");
        }
        thread.interrupt();
        thread2.interrupt();
        thread.join(2000L);
        if (!$assertionsDisabled && thread.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
        thread2.join(2000L);
        if (!$assertionsDisabled && thread2.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
    }

    @Test
    public void testCoordinatorWentDownWhileSendingMessage() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        requestBlocker.set(cyclicBarrier);
        final SimpleStreamableCallable simpleStreamableCallable = new SimpleStreamableCallable(23);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.submit(new Runnable() { // from class: org.jgroups.blocks.ExecutingServiceTest.1
            @Override // java.lang.Runnable
            public void run() {
                ExecutingServiceTest.this.e2.submit(simpleStreamableCallable);
            }
        });
        newCachedThreadPool.submit(new Runnable() { // from class: org.jgroups.blocks.ExecutingServiceTest.2
            @Override // java.lang.Runnable
            public void run() {
                Util.close(ExecutingServiceTest.this.c1);
            }
        });
        cyclicBarrier.await(2L, TimeUnit.SECONDS);
        requestBlocker.getAndSet(null).reset();
        this.c1 = createChannel(this.c2, "A");
        addExecutingProtocol(this.c1);
        this.er1 = new ExecutionRunner(this.c1);
        this.c1.connect("ExecutionServiceTest");
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(2L, TimeUnit.SECONDS);
        ExposedExecutingProtocol exposedExecutingProtocol = (ExposedExecutingProtocol) this.c2.getProtocolStack().findProtocol(ExposedExecutingProtocol.class);
        Queue<Runnable> awaitingConsumerQueue = exposedExecutingProtocol.getAwaitingConsumerQueue();
        if (!$assertionsDisabled && awaitingConsumerQueue.size() != 1) {
            throw new AssertionError("There is no runnable in the queue");
        }
        Runnable next = awaitingConsumerQueue.iterator().next();
        if (!$assertionsDisabled && !(next instanceof ExecutionService.DistributedFuture)) {
            throw new AssertionError("The task wasn't a distributed future like we thought");
        }
        if (!$assertionsDisabled && simpleStreamableCallable != ((ExecutionService.DistributedFuture) next).getCallable()) {
            throw new AssertionError("The inner callable wasn't the same");
        }
        Queue<Executing.Owner> requestsFromCoordinator = exposedExecutingProtocol.getRequestsFromCoordinator();
        if (!$assertionsDisabled && requestsFromCoordinator.size() != 1) {
            throw new AssertionError("There is no request in the coordinator queue - " + requestsFromCoordinator.size());
        }
        Executing.Owner next2 = requestsFromCoordinator.iterator().next();
        if (!$assertionsDisabled && !next2.getAddress().equals(this.c2.getAddress())) {
            throw new AssertionError("The request Address doesn't match");
        }
        if (!$assertionsDisabled && next2.getRequestId() != 0) {
            throw new AssertionError("We only had 1 request so it should be zero still");
        }
    }

    @Test
    public void testInvokeAnyCalls() throws InterruptedException, ExecutionException {
        Thread thread = new Thread(this.er2);
        thread.start();
        Thread thread2 = new Thread(this.er3);
        thread2.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SimpleStreamableCallable(10L));
        arrayList.add(new SimpleStreamableCallable(100L));
        Long l = (Long) this.e1.invokeAny(arrayList);
        if (!$assertionsDisabled && l.longValue() != 10 && l.longValue() != 100) {
            throw new AssertionError("The task didn't return the right value");
        }
        thread.interrupt();
        thread2.interrupt();
        thread.join(2000L);
        if (!$assertionsDisabled && thread.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
        thread2.join(2000L);
        if (!$assertionsDisabled && thread2.isAlive()) {
            throw new AssertionError("Consumer did not stop correctly");
        }
    }

    protected void addExecutingProtocol(JChannel jChannel) {
        ProtocolStack protocolStack = jChannel.getProtocolStack();
        ExposedExecutingProtocol exposedExecutingProtocol = new ExposedExecutingProtocol();
        exposedExecutingProtocol.setLevel("trace");
        protocolStack.insertProtocolAtTop(exposedExecutingProtocol);
    }

    static {
        $assertionsDisabled = !ExecutingServiceTest.class.desiredAssertionStatus();
        log = LogFactory.getLog(ExecutingServiceTest.class);
        requestBlocker = new AtomicReference<>();
    }
}
