package org.jgroups.tests;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.protocols.pbcast.STATE;
import org.jgroups.protocols.pbcast.STATE_SOCK;
import org.jgroups.protocols.pbcast.STATE_TRANSFER;
import org.jgroups.protocols.pbcast.StreamingStateTransfer;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.ArrayIterator;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {Global.STACK_DEPENDENT, Global.EAP_EXCLUDED}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/StateTransferTest.class */
public class StateTransferTest extends ChannelTestBase {
    static final int MSG_SEND_COUNT = 1000;
    static final String[] names;
    static final int APP_COUNT;
    static final Class<?>[] NAK_PROTS;
    static final short[] ids;
    protected StateTransferApplication[] apps = new StateTransferApplication[APP_COUNT];
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jgroups/tests/StateTransferTest$StateTransferApplication.class */
    public class StateTransferApplication implements Receiver, Runnable {
        protected final int from;
        protected final int to;
        protected final Semaphore semaphore;
        protected final JChannel channel;
        protected long start_time;
        protected final Map<String, List<Long>> map = new HashMap(1000 * StateTransferTest.APP_COUNT);
        protected ConcurrentMap<Address, AtomicInteger> count = new ConcurrentHashMap();

        public StateTransferApplication(Semaphore semaphore, String str, int i, int i2) throws Exception {
            this.from = i;
            this.to = i2;
            this.semaphore = semaphore;
            init();
            this.channel = StateTransferTest.this.createChannel().name(str);
            this.channel.setReceiver(this);
        }

        protected void init() {
            for (String str : StateTransferTest.names) {
                this.map.put(str, new ArrayList(1000 * StateTransferTest.APP_COUNT));
            }
        }

        public JChannel getChannel() {
            return this.channel;
        }

        public void cleanup() {
            Util.close(this.channel);
        }

        public Map<String, List<Long>> getMap() {
            Map<String, List<Long>> map;
            synchronized (this.map) {
                map = this.map;
            }
            return map;
        }

        public ConcurrentMap<Address, AtomicInteger> getCount() {
            return this.count;
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            String str = (String) message.getObject();
            Address src = message.getSrc();
            AtomicInteger atomicInteger = this.count.get(src);
            if (atomicInteger == null) {
                atomicInteger = new AtomicInteger(0);
                AtomicInteger putIfAbsent = this.count.putIfAbsent(src, atomicInteger);
                if (putIfAbsent != null) {
                    atomicInteger = putIfAbsent;
                }
            }
            atomicInteger.incrementAndGet();
            long seqno = StateTransferTest.getSeqno(message);
            if (seqno == -1) {
                throw new IllegalArgumentException("NAKACK{2} seqno could not be fetched from message");
            }
            synchronized (this.map) {
                List<Long> list = this.map.get(str);
                if (!list.contains(Long.valueOf(seqno))) {
                    list.add(Long.valueOf(seqno));
                }
            }
        }

        @Override // org.jgroups.Receiver
        public void getState(OutputStream outputStream) throws Exception {
            BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
            synchronized (this.map) {
                Util.objectToStream(this.map, new DataOutputStream(bufferedOutputStream));
                bufferedOutputStream.flush();
            }
        }

        @Override // org.jgroups.Receiver
        public void setState(InputStream inputStream) throws Exception {
            Map<? extends String, ? extends List<Long>> map = (Map) Util.objectFromStream(new DataInputStream(inputStream));
            synchronized (this.map) {
                this.map.clear();
                this.map.putAll(map);
                this.count.clear();
                long currentTimeMillis = System.currentTimeMillis() - this.start_time;
                StringBuilder sb = new StringBuilder("\n++++++++++++++++++++++++++++++++++++++\n");
                Address address = this.channel.getAddress();
                StateTransferTest.getSize(this.map);
                sb.append(address + " <--- received state (in " + currentTimeMillis + " ms), map has " + sb + " elements:\n");
                for (Map.Entry<String, List<Long>> entry : this.map.entrySet()) {
                    sb.append(entry.getKey() + ": " + StateTransferTest.print(entry.getValue()) + "\n");
                }
                sb.append("++++++++++++++++++++++++++++++++++++++");
                System.out.println(sb);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!this.semaphore.tryAcquire(10L, TimeUnit.SECONDS)) {
                    throw new Exception(this.channel.getAddress() + " cannot acquire semaphore");
                }
                useChannel();
            } catch (Exception e) {
                StateTransferTest.this.log.error(this.channel.getAddress() + ": ", e);
            }
        }

        protected void useChannel() throws Exception {
            this.start_time = System.currentTimeMillis();
            this.channel.connect("StateTransferTest", null, 20000L);
            int i = 0;
            for (int i2 = this.from; i2 < this.to; i2++) {
                try {
                    this.channel.send((Address) null, this.channel.getName());
                    i++;
                    if (i % 100 == 0) {
                        Util.sleep(50L);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @DataProvider(name = "createChannels")
    protected Iterator<Object[]> createChannels() {
        return new ArrayIterator(new Class[]{new Class[]{STATE_TRANSFER.class}, new Class[]{STATE.class}, new Class[]{STATE_SOCK.class}});
    }

    @AfterMethod
    protected void destroy() {
        for (StateTransferApplication stateTransferApplication : this.apps) {
            if (stateTransferApplication != null) {
                stateTransferApplication.getChannel().setReceiver(null);
                stateTransferApplication.cleanup();
            }
        }
    }

    @Test(dataProvider = "createChannels")
    public void testStateTransferFromSelfWithRegularChannel(Class<? extends Protocol> cls) throws Exception {
        JChannel createChannel = createChannel();
        replaceStateTransferProtocolWith(createChannel, cls);
        createChannel.connect("StateTransferTest");
        try {
            Address address = createChannel.getAddress();
            if (!$assertionsDisabled && address == null) {
                throw new AssertionError();
            }
            createChannel.getState(address, 20000L);
            Util.close(createChannel);
        } catch (Throwable th) {
            Util.close(createChannel);
            throw th;
        }
    }

    @Test(dataProvider = "createChannels")
    public void testStateTransferWhileSending(Class<? extends Protocol> cls) throws Exception {
        Semaphore semaphore = new Semaphore(APP_COUNT, true);
        semaphore.acquire(APP_COUNT);
        Thread[] threadArr = new Thread[APP_COUNT];
        int i = 0;
        int i2 = 1000;
        for (int i3 = 0; i3 < this.apps.length; i3++) {
            this.apps[i3] = new StateTransferApplication(semaphore, names[i3], 0, 1000);
            replaceStateTransferProtocolWith(this.apps[i3].getChannel(), cls);
        }
        makeUnique((List<JChannel>) Stream.of((Object[]) this.apps).map((v0) -> {
            return v0.getChannel();
        }).collect(Collectors.toList()));
        for (int i4 = 0; i4 < this.apps.length; i4++) {
            threadArr[i4] = new Thread(this.apps[i4], "thread-" + names[i4]);
            threadArr[i4].start();
            i += 1000;
            i2 += 1000;
        }
        int i5 = 0;
        while (i5 < threadArr.length) {
            semaphore.release();
            Util.sleep(i5 == 0 ? 4000L : 100L);
            i5++;
        }
        JChannel[] jChannelArr = new JChannel[this.apps.length];
        for (int i6 = 0; i6 < this.apps.length; i6++) {
            jChannelArr[i6] = this.apps[i6].getChannel();
        }
        Util.waitUntilAllChannelsHaveSameView(20000L, 1000L, jChannelArr);
        for (Thread thread : threadArr) {
            thread.join(20000L);
        }
        for (Thread thread2 : threadArr) {
            if (thread2.isAlive()) {
                throw new Exception("Thread " + thread2.getName() + " is still alive");
            }
        }
        System.out.println("Waiting for all channels to have " + (1000 * APP_COUNT) + " elements:");
        long currentTimeMillis = System.currentTimeMillis() + 20000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            boolean z = true;
            StateTransferApplication[] stateTransferApplicationArr = this.apps;
            int length = stateTransferApplicationArr.length;
            int i7 = 0;
            while (true) {
                if (i7 >= length) {
                    break;
                }
                if (getSize(stateTransferApplicationArr[i7].getMap()) != 1000 * APP_COUNT) {
                    z = false;
                    break;
                }
                i7++;
            }
            if (z) {
                break;
            }
            resumeStableAndGC();
            Util.sleep(500L);
        }
        for (int i8 = 0; i8 < this.apps.length; i8++) {
            ConcurrentMap<Address, AtomicInteger> count = this.apps[i8].getCount();
            System.out.println("msgs for " + names[i8] + ":");
            for (Map.Entry<Address, AtomicInteger> entry : count.entrySet()) {
                System.out.println("from " + entry.getKey() + " --> " + entry.getValue() + " msgs");
            }
        }
        System.out.println("++++++++++++++++++++++++++++++++++++++");
        for (int i9 = 0; i9 < this.apps.length; i9++) {
            StateTransferApplication stateTransferApplication = this.apps[i9];
            Map<String, List<Long>> map = stateTransferApplication.getMap();
            System.out.println("\n" + names[i9] + " (" + getSize(map) + "): digest=" + stateTransferApplication.getChannel().down(Event.GET_DIGEST_EVT));
            for (String str : names) {
                System.out.println("map " + str + ": " + print(map.get(str)));
            }
        }
        System.out.println("++++++++++++++++++++++++++++++++++++++");
        for (int i10 = 0; i10 < this.apps.length; i10++) {
            Map<String, List<Long>> map2 = this.apps[i10].getMap();
            if (!$assertionsDisabled && getSize(map2) != 1000 * APP_COUNT) {
                throw new AssertionError("map " + names[i10] + " has " + getSize(map2) + " elements (expected: " + (1000 * APP_COUNT) + ")");
            }
        }
        for (String str2 : names) {
            List<Long> list = this.apps[0].getMap().get(str2);
            for (int i11 = 1; i11 < this.apps.length; i11++) {
                List<Long> list2 = this.apps[i11].getMap().get(str2);
                if (!$assertionsDisabled && !list.equals(list2)) {
                    throw new AssertionError();
                }
            }
        }
    }

    protected void resumeStableAndGC() {
        for (StateTransferApplication stateTransferApplication : this.apps) {
            STABLE stable = (STABLE) stateTransferApplication.getChannel().getProtocolStack().findProtocol(STABLE.class);
            stable.down(new Event(66));
            stable.gc();
        }
    }

    protected static String print(List<Long> list) {
        if (list.isEmpty()) {
            return "[] (0 elements)";
        }
        long longValue = list.get(0).longValue();
        return "[" + longValue + " .. " + longValue + "] (" + list.get(list.size() - 1).longValue() + " elements)";
    }

    protected static int getSize(Map<String, List<Long>> map) {
        int i = 0;
        Iterator<List<Long>> it = map.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    protected static long getSeqno(Message message) {
        for (short s : ids) {
            Header header = message.getHeader(s);
            if (header != null) {
                return getSeqnoFromHeader(header);
            }
        }
        return -1L;
    }

    protected static long getSeqnoFromHeader(Header header) {
        return ((Long) Util.getField(Util.getField(header.getClass(), "seqno"), header)).longValue();
    }

    protected static void replaceStateTransferProtocolWith(JChannel jChannel, Class<? extends Protocol> cls) throws Exception {
        ProtocolStack protocolStack = jChannel.getProtocolStack();
        if (protocolStack.findProtocol(cls) != null) {
            return;
        }
        Protocol findProtocol = protocolStack.findProtocol(STATE_TRANSFER.class, StreamingStateTransfer.class);
        Protocol newInstance = cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        if (findProtocol != null) {
            protocolStack.replaceProtocol(findProtocol, newInstance);
        } else if (protocolStack.findProtocol(FLUSH.class) != null) {
            protocolStack.insertProtocol(newInstance, ProtocolStack.Position.BELOW, FLUSH.class);
        } else {
            protocolStack.insertProtocolAtTop(newInstance);
        }
    }

    static {
        $assertionsDisabled = !StateTransferTest.class.desiredAssertionStatus();
        names = new String[]{"A", "B", "C", "D"};
        APP_COUNT = names.length;
        NAK_PROTS = new Class[]{NAKACK2.class};
        ids = new short[NAK_PROTS.length];
        for (int i = 0; i < NAK_PROTS.length; i++) {
            ids[i] = ClassConfigurator.getProtocolId(NAK_PROTS[i]);
        }
    }
}
