package org.jgroups.tests;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.FD_ALL3;
import org.jgroups.protocols.FRAG2;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.SHARED_LOOPBACK_PING;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Util;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FLUSH, Global.EAP_EXCLUDED}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/ReconciliationTest.class */
public class ReconciliationTest {
    protected List<JChannel> channels;
    protected List<MyReceiver> receivers;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/ReconciliationTest$Cache.class */
    protected static class Cache implements Receiver {
        protected final Map<Object, Object> data = new HashMap();
        protected JChannel ch;
        protected String name;

        public Cache(JChannel jChannel, String str) {
            this.ch = jChannel;
            this.name = str;
            this.ch.setReceiver(this);
        }

        protected Object get(Object obj) {
            Object obj2;
            synchronized (this.data) {
                obj2 = this.data.get(obj);
            }
            return obj2;
        }

        protected void put(Object obj, Object obj2) throws Exception {
            this.ch.send(new BytesMessage((Address) null, new Object[]{obj, obj2}));
        }

        protected int size() {
            int size;
            synchronized (this.data) {
                size = this.data.size();
            }
            return size;
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            Object[] objArr = (Object[]) message.getObject();
            synchronized (this.data) {
                this.data.put(objArr[0], objArr[1]);
            }
        }

        @Override // org.jgroups.Receiver
        public void getState(OutputStream outputStream) throws Exception {
            synchronized (this.data) {
                Util.objectToStream(this.data, new DataOutputStream(outputStream));
            }
        }

        @Override // org.jgroups.Receiver
        public void setState(InputStream inputStream) throws Exception {
            Map<? extends Object, ? extends Object> map = (Map) Util.objectFromStream(new DataInputStream(inputStream));
            synchronized (this.data) {
                this.data.clear();
                this.data.putAll(map);
            }
        }

        public String toString() {
            String obj;
            synchronized (this.data) {
                obj = new TreeMap(this.data).keySet().toString();
            }
            return obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jgroups/tests/ReconciliationTest$FlushTrigger.class */
    public interface FlushTrigger {
        void triggerFlush();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jgroups/tests/ReconciliationTest$MyReceiver.class */
    public static class MyReceiver implements Receiver {
        protected final Map<Address, List<Integer>> msgs = new HashMap(10);
        protected final JChannel channel;
        protected final String name;

        public MyReceiver(JChannel jChannel, String str) {
            this.channel = jChannel;
            this.name = str;
        }

        public Map<Address, List<Integer>> getMsgs() {
            return this.msgs;
        }

        public void reset() {
            this.msgs.clear();
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            this.msgs.computeIfAbsent(message.getSrc(), address -> {
                return new ArrayList();
            }).add((Integer) message.getObject());
            System.out.println(this.name + ": <-- " + message.getObject() + " from " + message.getSrc());
        }
    }

    @AfterMethod
    void tearDown() throws Exception {
        if (this.channels != null) {
            this.channels.forEach((v0) -> {
                Util.close(v0);
            });
        }
    }

    public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception {
        reconciliationHelper(new String[]{"A", "B", "C"}, () -> {
            System.out.println("Joining D, this will trigger FLUSH and a subsequent view change to {A,B,C,D}");
            try {
                JChannel createChannel = createChannel("X");
                createChannel.connect("ReconciliationTest");
                this.channels.add(createChannel);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public void testReconciliationFlushTriggeredByManualFlush() throws Exception {
        reconciliationHelper(new String[]{"A", "B", "C"}, () -> {
            JChannel jChannel = this.channels.get(0);
            System.out.println("manual flush success=" + Util.startFlush(jChannel));
            jChannel.stopFlush();
        });
    }

    public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception {
        reconciliationHelper(new String[]{"A", "B", "C"}, () -> {
            try {
                Util.shutdown(this.channels.remove(this.channels.size() - 1));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    protected void reconciliationHelper(String[] strArr, FlushTrigger flushTrigger) throws Exception {
        int length = strArr.length;
        this.channels = new ArrayList(strArr.length);
        this.receivers = new ArrayList(strArr.length);
        int i = 0;
        while (i < length) {
            JChannel createChannel = createChannel(strArr[i]);
            modifyNAKACK(createChannel);
            MyReceiver myReceiver = new MyReceiver(createChannel, strArr[i]);
            this.receivers.add(myReceiver);
            this.channels.add(createChannel);
            createChannel.setReceiver(myReceiver);
            createChannel.connect("ReconciliationTest");
            Util.sleep(i == 0 ? 1000L : 250L);
            i++;
        }
        View view = this.channels.get(this.channels.size() - 1).getView();
        System.out.println("view: " + view);
        if (!$assertionsDisabled && view.size() != this.channels.size()) {
            throw new AssertionError();
        }
        JChannel jChannel = this.channels.get(this.channels.size() - 1);
        JChannel jChannel2 = this.channels.get(this.channels.size() - 2);
        System.out.println(jChannel2.getAddress() + " is now discarding messages from " + jChannel.getAddress());
        insertDISCARD(jChannel2, jChannel.getAddress());
        String str = strArr[strArr.length - 1];
        String str2 = strArr[strArr.length - 2];
        printDigests(this.channels, "\nDigests before " + str + " sends any messages:");
        System.out.println("\n" + str + " sending 5 messages; " + str2 + " will ignore them, but others will receive them");
        for (int i2 = 1; i2 <= 5; i2++) {
            jChannel.send((Address) null, Integer.valueOf(i2));
        }
        Util.sleep(1000L);
        printDigests(this.channels, "\nDigests after " + str + " sent messages:");
        MyReceiver myReceiver2 = this.receivers.get(this.receivers.size() - 1);
        MyReceiver myReceiver3 = this.receivers.get(this.receivers.size() - 2);
        Map<Address, List<Integer>> msgs = myReceiver2.getMsgs();
        Assert.assertEquals(msgs.size(), 1, "we should have only 1 sender, namely C at this time");
        List<Integer> list = msgs.get(jChannel.getAddress());
        System.out.println("\n" + str + ": messages received from " + str + ": " + list);
        Assert.assertEquals(list.size(), 5, "correct msgs: " + list);
        Map<Address, List<Integer>> msgs2 = myReceiver3.getMsgs();
        Assert.assertEquals(msgs2.size(), 0, "we should have no sender at this time");
        List<Integer> list2 = msgs2.get(jChannel.getAddress());
        System.out.println(str2 + ": messages received from " + str + ": " + list2);
        if (!$assertionsDisabled && list2 != null) {
            throw new AssertionError();
        }
        for (MyReceiver myReceiver4 : this.receivers.subList(0, this.receivers.size() - 2)) {
            Map<Address, List<Integer>> msgs3 = myReceiver4.getMsgs();
            Assert.assertEquals(msgs3.size(), 1, "we should have only 1 sender");
            List<Integer> list3 = msgs3.get(jChannel.getAddress());
            System.out.println(myReceiver4.name + ": messages received from " + str + ": " + list3);
            Assert.assertEquals(list3.size(), 5, "correct msgs" + list3);
        }
        removeDISCARD(jChannel2);
        Address address = jChannel.getAddress();
        flushTrigger.triggerFlush();
        int i3 = 20;
        while (true) {
            View view2 = this.channels.get(0).getView();
            if (view2 == null || i3 <= 0) {
                break;
            }
            i3--;
            if (view2.size() == this.channels.size()) {
                break;
            } else {
                Util.sleep(1000L);
            }
        }
        if (!$assertionsDisabled && this.channels.get(0).getView().size() != this.channels.size()) {
            throw new AssertionError();
        }
        printDigests(this.channels, "\nDigests after reconciliation (B should have received the 5 messages from B now):");
        Map<Address, List<Integer>> msgs4 = myReceiver3.getMsgs();
        Assert.assertEquals(msgs4.size(), 1, "we should have 1 sender at this time");
        List<Integer> list4 = msgs4.get(address);
        System.out.println("\n" + str2 + ": messages received from " + str + " : " + list4);
        Assert.assertEquals(5, list4.size());
    }

    protected static JChannel createChannel(String str) throws Exception {
        return new JChannel(new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), new FD_ALL3().setTimeout(Global.THREADPOOL_SHUTDOWN_WAIT_TIME).setInterval(1000L), new NAKACK2(), new UNICAST3(), new STABLE(), new GMS(), new FRAG2().setFragSize(8000), new FLUSH()).name(str);
    }

    protected static void modifyNAKACK(JChannel jChannel) {
        NAKACK2 nakack2;
        if (jChannel == null || (nakack2 = (NAKACK2) jChannel.getProtocolStack().findProtocol(NAKACK2.class)) == null) {
            return;
        }
        nakack2.setDiscardDeliveredMsgs(false);
    }

    private static void printDigests(List<JChannel> list, String str) {
        System.out.println(str);
        for (JChannel jChannel : list) {
            System.out.println("[" + jChannel.getAddress() + "] " + jChannel.down(Event.GET_DIGEST_EVT).toString());
        }
    }

    private static void insertDISCARD(JChannel jChannel, Address address) throws Exception {
        DISCARD address2 = new DISCARD().setAddress(jChannel.getAddress());
        address2.excludeItself(true);
        address2.addIgnoreMember(address);
        jChannel.getProtocolStack().insertProtocol(address2, ProtocolStack.Position.BELOW, NAKACK2.class);
    }

    private static void removeDISCARD(JChannel... jChannelArr) throws Exception {
        for (JChannel jChannel : jChannelArr) {
            jChannel.getProtocolStack().removeProtocol(DISCARD.class);
        }
    }

    public void testVirtualSynchrony() throws Exception {
        JChannel createChannel = createChannel("A");
        Cache cache = new Cache(createChannel, "cache-1");
        createChannel.connect("testVirtualSynchrony");
        JChannel createChannel2 = createChannel("B");
        Cache cache2 = new Cache(createChannel2, "cache-2");
        createChannel2.connect("testVirtualSynchrony");
        Util.waitUntilAllChannelsHaveSameView(10000L, 500L, createChannel, createChannel2);
        flush(createChannel);
        for (int i = 1; i <= 20; i++) {
            if (i % 2 == 0) {
                cache.put(Integer.valueOf(i), true);
            } else {
                cache2.put(Integer.valueOf(i), true);
            }
        }
        System.out.println("Starting flush on C1");
        flush(createChannel);
        System.out.println("Starting flush on C2");
        flush(createChannel2);
        System.out.println("flush done");
        System.out.println("cache_1 (" + cache.size() + " elements): " + cache + "\ncache_2 (" + cache2.size() + " elements): " + cache2);
        Assert.assertEquals(cache.size(), 20, "cache 1: " + cache);
        Assert.assertEquals(cache2.size(), 20, "cache 2: " + cache2);
        Util.close(createChannel2, createChannel);
    }

    protected static void flush(JChannel jChannel) {
        try {
            if ($assertionsDisabled || Util.startFlush(jChannel)) {
            } else {
                throw new AssertionError();
            }
        } finally {
            jChannel.stopFlush();
        }
    }

    static {
        $assertionsDisabled = !ReconciliationTest.class.desiredAssertionStatus();
    }
}
