package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.hash.MurmurHash3;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.conflict.impl.InternalConflictManager;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.TriangleOrderManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.rpc.RpcOptionsBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.StateConsumerTest")
/* loaded from: input_file:org/infinispan/statetransfer/StateConsumerTest.class */
public class StateConsumerTest extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(StateConsumerTest.class);
    private ExecutorService pooledExecutorService;

    @AfterMethod
    public void tearDown() {
        if (this.pooledExecutorService != null) {
            this.pooledExecutorService.shutdownNow();
        }
    }

    public void test1() throws Exception {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().invocationBatching().enable().clustering().cacheMode(CacheMode.DIST_SYNC).clustering().stateTransfer().timeout(10000L).locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis()).locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        Configuration build = configurationBuilder.build();
        PersistentUUIDManagerImpl persistentUUIDManagerImpl = new PersistentUUIDManagerImpl();
        Address[] addressArr = new Address[4];
        for (int i = 0; i < 4; i++) {
            addressArr[i] = new TestAddress(i);
            persistentUUIDManagerImpl.addPersistentAddressMapping(addressArr[i], PersistentUUID.randomUUID());
        }
        List asList = Arrays.asList(addressArr[0], addressArr[1], addressArr[2], addressArr[3]);
        List asList2 = Arrays.asList(addressArr[0], addressArr[1], addressArr[2]);
        DefaultConsistentHashFactory defaultConsistentHashFactory = new DefaultConsistentHashFactory();
        DefaultConsistentHash create = defaultConsistentHashFactory.create(MurmurHash3.getInstance(), 2, 40, asList, (Map) null);
        DefaultConsistentHash updateMembers = defaultConsistentHashFactory.updateMembers(create, asList2, (Map) null);
        DefaultConsistentHash rebalance = defaultConsistentHashFactory.rebalance(updateMembers);
        DefaultConsistentHash union = defaultConsistentHashFactory.union(updateMembers, rebalance);
        log.debug(create);
        log.debug(updateMembers);
        Cache cache = (Cache) Mockito.mock(Cache.class);
        Mockito.when(cache.getName()).thenReturn("testCache");
        Mockito.when(cache.getStatus()).thenReturn(ComponentStatus.RUNNING);
        this.pooledExecutorService = new ThreadPoolExecutor(0, 20, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), getTestThreadFactory("Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
        StateTransferManager stateTransferManager = (StateTransferManager) Mockito.mock(StateTransferManager.class);
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) Mockito.mock(LocalTopologyManager.class);
        CacheNotifier cacheNotifier = (CacheNotifier) Mockito.mock(CacheNotifier.class);
        RpcManager rpcManager = (RpcManager) Mockito.mock(RpcManager.class);
        Transport transport = (Transport) Mockito.mock(Transport.class);
        CommandsFactory commandsFactory = (CommandsFactory) Mockito.mock(CommandsFactory.class);
        PersistenceManager persistenceManager = (PersistenceManager) Mockito.mock(PersistenceManager.class);
        DataContainer dataContainer = (DataContainer) Mockito.mock(DataContainer.class);
        TransactionTable transactionTable = (TransactionTable) Mockito.mock(TransactionTable.class);
        StateTransferLock stateTransferLock = (StateTransferLock) Mockito.mock(StateTransferLock.class);
        AsyncInterceptorChain asyncInterceptorChain = (AsyncInterceptorChain) Mockito.mock(AsyncInterceptorChain.class);
        InvocationContextFactory invocationContextFactory = (InvocationContextFactory) Mockito.mock(InvocationContextFactory.class);
        TotalOrderManager totalOrderManager = (TotalOrderManager) Mockito.mock(TotalOrderManager.class);
        BlockingTaskAwareExecutorService blockingTaskAwareExecutorService = (BlockingTaskAwareExecutorService) Mockito.mock(BlockingTaskAwareExecutorService.class);
        InternalConflictManager internalConflictManager = (InternalConflictManager) Mockito.mock(InternalConflictManager.class);
        Mockito.when(commandsFactory.buildStateRequestCommand((StateRequestCommand.Type) Matchers.any(StateRequestCommand.Type.class), (Address) Matchers.any(Address.class), Matchers.anyInt(), (Set) Matchers.any(SmallIntSet.class))).thenAnswer(invocationOnMock -> {
            return new StateRequestCommand(ByteString.fromString("cache1"), (StateRequestCommand.Type) invocationOnMock.getArguments()[0], (Address) invocationOnMock.getArguments()[1], ((Integer) invocationOnMock.getArguments()[2]).intValue(), (Set) invocationOnMock.getArguments()[3]);
        });
        Mockito.when(Integer.valueOf(transport.getViewId())).thenReturn(1);
        Mockito.when(rpcManager.getAddress()).thenReturn(addressArr[0]);
        Mockito.when(rpcManager.getTransport()).thenReturn(transport);
        ConcurrentMap makeConcurrentMap = CollectionFactory.makeConcurrentMap();
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        Mockito.when(rpcManager.invokeRemotely((Collection) Matchers.any(Collection.class), (ReplicableCommand) Matchers.any(StateRequestCommand.class), (RpcOptions) Matchers.any(RpcOptions.class))).thenAnswer(invocationOnMock2 -> {
            Address address = (Address) ((Collection) invocationOnMock2.getArguments()[0]).iterator().next();
            StateRequestCommand stateRequestCommand = (StateRequestCommand) invocationOnMock2.getArguments()[1];
            HashMap hashMap = new HashMap(1);
            if (stateRequestCommand.getType().equals(StateRequestCommand.Type.GET_TRANSACTIONS)) {
                hashMap.put(address, SuccessfulResponse.create(new ArrayList()));
                Set segments = stateRequestCommand.getSegments();
                makeConcurrentMap.put(address, segments);
                concurrentSkipListSet.addAll(segments);
            } else if (stateRequestCommand.getType().equals(StateRequestCommand.Type.START_STATE_TRANSFER) || stateRequestCommand.getType().equals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER)) {
                hashMap.put(address, SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
            }
            return hashMap;
        });
        Mockito.when(rpcManager.getRpcOptionsBuilder((ResponseMode) Matchers.any(ResponseMode.class))).thenAnswer(invocationOnMock3 -> {
            return new RpcOptionsBuilder(10000L, TimeUnit.MILLISECONDS, (ResponseMode) invocationOnMock3.getArguments()[0], DeliverOrder.PER_SENDER);
        });
        StateConsumerImpl stateConsumerImpl = new StateConsumerImpl();
        stateConsumerImpl.init(cache, this.pooledExecutorService, stateTransferManager, localTopologyManager, asyncInterceptorChain, invocationContextFactory, build, rpcManager, (TransactionManager) null, commandsFactory, persistenceManager, dataContainer, transactionTable, stateTransferLock, cacheNotifier, totalOrderManager, blockingTaskAwareExecutorService, new CommitManager(), new CommandAckCollector(), new TriangleOrderManager(0), (DistributionManager) null, new HashFunctionPartitioner(), internalConflictManager);
        stateConsumerImpl.start();
        ArrayList arrayList = new ArrayList();
        TestKey testKey = new TestKey("key1", 0, create);
        TestKey testKey2 = new TestKey("key2", 0, create);
        arrayList.add(new ImmortalCacheEntry(testKey, "value1"));
        arrayList.add(new ImmortalCacheEntry(testKey2, "value2"));
        Mockito.when(dataContainer.iterator()).thenAnswer(invocationOnMock4 -> {
            return arrayList.iterator();
        });
        Mockito.when(transactionTable.getLocalTransactions()).thenReturn(Collections.emptyList());
        Mockito.when(transactionTable.getRemoteTransactions()).thenReturn(Collections.emptyList());
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        stateConsumerImpl.onTopologyUpdate(new CacheTopology(1, 1, updateMembers, (ConsistentHash) null, CacheTopology.Phase.NO_REBALANCE, updateMembers.getMembers(), persistentUUIDManagerImpl.mapAddresses(updateMembers.getMembers())), false);
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        stateConsumerImpl.onTopologyUpdate(new CacheTopology(2, 2, updateMembers, rebalance, union, CacheTopology.Phase.READ_OLD_WRITE_ALL, union.getMembers(), persistentUUIDManagerImpl.mapAddresses(union.getMembers())), true);
        AssertJUnit.assertTrue(stateConsumerImpl.hasActiveTransfers());
        Set segmentsForOwner = updateMembers.getSegmentsForOwner(addressArr[0]);
        Set segmentsForOwner2 = rebalance.getSegmentsForOwner(addressArr[0]);
        segmentsForOwner2.removeAll(segmentsForOwner);
        log.debugf("Rebalancing. Added segments=%s, old segments=%s", segmentsForOwner2, segmentsForOwner);
        Assert.assertEquals(concurrentSkipListSet, segmentsForOwner2);
        Future fork = fork(() -> {
            stateConsumerImpl.onTopologyUpdate(new CacheTopology(3, 2, updateMembers, (ConsistentHash) null, CacheTopology.Phase.NO_REBALANCE, updateMembers.getMembers(), persistentUUIDManagerImpl.mapAddresses(updateMembers.getMembers())), false);
            return null;
        });
        stateConsumerImpl.onTopologyUpdate(new CacheTopology(3, 2, updateMembers, (ConsistentHash) null, CacheTopology.Phase.NO_REBALANCE, updateMembers.getMembers(), persistentUUIDManagerImpl.mapAddresses(updateMembers.getMembers())), false);
        fork.get();
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        makeConcurrentMap.clear();
        stateConsumerImpl.onTopologyUpdate(new CacheTopology(4, 4, updateMembers, rebalance, union, CacheTopology.Phase.READ_OLD_WRITE_ALL, union.getMembers(), persistentUUIDManagerImpl.mapAddresses(union.getMembers())), true);
        AssertJUnit.assertTrue(stateConsumerImpl.hasActiveTransfers());
        Assert.assertEquals(concurrentSkipListSet, segmentsForOwner2);
        ArrayList arrayList2 = new ArrayList();
        Iterator it = segmentsForOwner2.iterator();
        while (it.hasNext()) {
            arrayList2.add(new StateChunk(((Integer) it.next()).intValue(), Collections.emptyList(), true));
        }
        stateConsumerImpl.applyState(addressArr[1], 2, arrayList2);
        stateConsumerImpl.stop();
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
    }
}
