package org.apache.cassandra.streaming;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CleanupTest;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionInfo;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.TreeMapBackedSortedColumns;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.HeapAllocator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(OrderedJUnit4ClassRunner.class)
/* loaded from: input_file:org/apache/cassandra/streaming/StreamingTransferTest.class */
public class StreamingTransferTest extends SchemaLoader {
    private static final Logger logger;
    public static final InetAddress LOCAL;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamingTransferTest$Mutator.class */
    public interface Mutator {
        void mutate(String str, String str2, long j) throws Exception;
    }

    @BeforeClass
    public static void setup() throws Exception {
        StorageService.instance.initServer();
    }

    @Test
    public void testEmptyStreamPlan() throws Exception {
        StreamResultFuture execute = new StreamPlan("StreamingTransferTest").execute();
        final UUID uuid = execute.planId;
        Futures.addCallback(execute, new FutureCallback<StreamState>() { // from class: org.apache.cassandra.streaming.StreamingTransferTest.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void onSuccess(StreamState streamState) {
                if (!$assertionsDisabled && !uuid.equals(streamState.planId)) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && !streamState.description.equals("StreamingTransferTest")) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && !streamState.sessions.isEmpty()) {
                    throw new AssertionError();
                }
            }

            public void onFailure(Throwable th) {
                Assert.fail();
            }

            static {
                $assertionsDisabled = !StreamingTransferTest.class.desiredAssertionStatus();
            }
        });
        execute.get(100L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testRequestEmpty() throws Exception {
        IPartitioner partitioner = StorageService.getPartitioner();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Range(partitioner.getMinimumToken(), partitioner.getToken(ByteBufferUtil.bytes("key1"))));
        arrayList.add(new Range(partitioner.getToken(ByteBufferUtil.bytes("key2")), partitioner.getMinimumToken()));
        StreamResultFuture execute = new StreamPlan("StreamingTransferTest").requestRanges(LOCAL, "Keyspace2", arrayList).execute();
        UUID uuid = execute.planId;
        StreamState streamState = (StreamState) execute.get();
        if (!$assertionsDisabled && !uuid.equals(streamState.planId)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !streamState.description.equals("StreamingTransferTest")) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && streamState.sessions.size() != 1) {
            throw new AssertionError();
        }
        SessionInfo sessionInfo = (SessionInfo) Iterables.get(streamState.sessions, 0);
        if (!$assertionsDisabled && !sessionInfo.peer.equals(LOCAL)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sessionInfo.getTotalFilesReceived() != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sessionInfo.getTotalFilesSent() != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sessionInfo.getTotalSizeReceived() != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sessionInfo.getTotalSizeSent() != 0) {
            throw new AssertionError();
        }
    }

    private List<String> createAndTransfer(ColumnFamilyStore columnFamilyStore, Mutator mutator, boolean z) throws Exception {
        int[] iArr;
        logger.debug("Mutating " + columnFamilyStore.name);
        for (int i = 1; i <= 3; i++) {
            mutator.mutate(CFMetaData.DEFAULT_KEY_ALIAS + i, "col" + i, 1234L);
        }
        columnFamilyStore.forceBlockingFlush();
        Util.compactAll(columnFamilyStore, CompactionManager.GC_ALL).get();
        Assert.assertEquals(1L, columnFamilyStore.getSSTables().size());
        logger.debug("Transferring " + columnFamilyStore.name);
        if (z) {
            SSTableReader next = columnFamilyStore.getSSTables().iterator().next();
            columnFamilyStore.clearUnsafe();
            transferSSTables(next);
            iArr = new int[]{1, 3};
        } else {
            long currentTimeMillis = System.currentTimeMillis();
            transferRanges(columnFamilyStore);
            columnFamilyStore.discardSSTables(currentTimeMillis);
            iArr = new int[]{2, 3};
        }
        Assert.assertEquals(1L, columnFamilyStore.getSSTables().size());
        List<Row> rangeSlice = Util.getRangeSlice(columnFamilyStore);
        Assert.assertEquals(iArr.length, rangeSlice.size());
        for (int i2 = 0; i2 < iArr.length; i2++) {
            String str = CFMetaData.DEFAULT_KEY_ALIAS + iArr[i2];
            String str2 = "col" + iArr[i2];
            if (!$assertionsDisabled && columnFamilyStore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(str), columnFamilyStore.name, System.currentTimeMillis())) == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !rangeSlice.get(i2).key.key.equals(ByteBufferUtil.bytes(str))) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && rangeSlice.get(i2).cf.getColumn(ByteBufferUtil.bytes(str2)) == null) {
                throw new AssertionError();
            }
        }
        Assert.assertEquals(1234L, columnFamilyStore.getSSTables().iterator().next().getMaxTimestamp());
        ArrayList arrayList = new ArrayList();
        for (int i3 : iArr) {
            arrayList.add(CFMetaData.DEFAULT_KEY_ALIAS + i3);
        }
        logger.debug("... everything looks good for " + columnFamilyStore.name);
        return arrayList;
    }

    private void transferSSTables(SSTableReader sSTableReader) throws Exception {
        IPartitioner partitioner = StorageService.getPartitioner();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Range<>(partitioner.getMinimumToken(), partitioner.getToken(ByteBufferUtil.bytes("key1"))));
        arrayList.add(new Range<>(partitioner.getToken(ByteBufferUtil.bytes("key2")), partitioner.getMinimumToken()));
        transfer(sSTableReader, arrayList);
    }

    private void transferRanges(ColumnFamilyStore columnFamilyStore) throws Exception {
        IPartitioner partitioner = StorageService.getPartitioner();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Range(partitioner.getToken(ByteBufferUtil.bytes("key1")), partitioner.getToken(ByteBufferUtil.bytes("key0"))));
        new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, columnFamilyStore.keyspace.getName(), arrayList, columnFamilyStore.getColumnFamilyName()).execute().get();
    }

    private void transfer(SSTableReader sSTableReader, List<Range<Token>> list) throws Exception {
        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(list, Arrays.asList(sSTableReader))).execute().get();
    }

    private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> list, Collection<SSTableReader> collection) {
        ArrayList arrayList = new ArrayList();
        for (SSTableReader sSTableReader : collection) {
            arrayList.add(new StreamSession.SSTableStreamingSections(sSTableReader, sSTableReader.getPositionsForRanges(list), sSTableReader.estimatedKeysForRanges(list)));
        }
        return arrayList;
    }

    private void doTransferTable(boolean z) throws Exception {
        final Keyspace open = Keyspace.open("Keyspace1");
        final ColumnFamilyStore columnFamilyStore = open.getColumnFamilyStore(CleanupTest.CF1);
        for (String str : createAndTransfer(columnFamilyStore, new Mutator() { // from class: org.apache.cassandra.streaming.StreamingTransferTest.2
            @Override // org.apache.cassandra.streaming.StreamingTransferTest.Mutator
            public void mutate(String str2, String str3, long j) throws Exception {
                long hashCode = str2.hashCode();
                TreeMapBackedSortedColumns create = TreeMapBackedSortedColumns.factory.create(open.getName(), columnFamilyStore.name);
                create.addColumn(Util.column(str3, "v", j));
                create.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(hashCode), j));
                RowMutation rowMutation = new RowMutation("Keyspace1", ByteBufferUtil.bytes(str2), create);
                StreamingTransferTest.logger.debug("Applying row to transfer " + rowMutation);
                rowMutation.apply();
            }
        }, z)) {
            List<Row> search = columnFamilyStore.search(Util.range("", ""), Arrays.asList(new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(str.hashCode()))), new IdentityQueryFilter(), 100);
            Assert.assertEquals(1L, search.size());
            if (!$assertionsDisabled && !search.get(0).key.key.equals(ByteBufferUtil.bytes(str))) {
                throw new AssertionError();
            }
        }
    }

    @Test
    public void testTransferRangeTombstones() throws Exception {
        ColumnFamilyStore columnFamilyStore = Keyspace.open("Keyspace1").getColumnFamilyStore("StandardInteger1");
        RowMutation rowMutation = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
        rowMutation.add("StandardInteger1", ByteBufferUtil.bytes(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2L);
        rowMutation.add("StandardInteger1", ByteBufferUtil.bytes(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2L);
        ColumnFamily addOrGet = rowMutation.addOrGet("StandardInteger1");
        addOrGet.delete(new DeletionInfo(ByteBufferUtil.bytes(2), ByteBufferUtil.bytes(3), addOrGet.getComparator(), 1L, (int) (System.currentTimeMillis() / 1000)));
        addOrGet.delete(new DeletionInfo(ByteBufferUtil.bytes(5), ByteBufferUtil.bytes(7), addOrGet.getComparator(), 1L, (int) (System.currentTimeMillis() / 1000)));
        rowMutation.apply();
        columnFamilyStore.forceBlockingFlush();
        SSTableReader next = columnFamilyStore.getSSTables().iterator().next();
        columnFamilyStore.clearUnsafe();
        transferSSTables(next);
        Assert.assertEquals(1L, columnFamilyStore.getSSTables().size());
        Assert.assertEquals(1L, Util.getRangeSlice(columnFamilyStore).size());
    }

    @Test
    public void testTransferTableViaRanges() throws Exception {
        doTransferTable(false);
    }

    @Test
    public void testTransferTableViaSSTables() throws Exception {
        doTransferTable(true);
    }

    @Test
    public void testTransferTableCounter() throws Exception {
        final Keyspace open = Keyspace.open("Keyspace1");
        final ColumnFamilyStore columnFamilyStore = open.getColumnFamilyStore("Counter1");
        final CounterContext counterContext = new CounterContext();
        final HashMap hashMap = new HashMap();
        hashMap.keySet().retainAll(createAndTransfer(columnFamilyStore, new Mutator() { // from class: org.apache.cassandra.streaming.StreamingTransferTest.3
            @Override // org.apache.cassandra.streaming.StreamingTransferTest.Mutator
            public void mutate(String str, String str2, long j) throws Exception {
                HashMap hashMap2 = new HashMap();
                TreeMapBackedSortedColumns create = TreeMapBackedSortedColumns.factory.create(columnFamilyStore.metadata);
                TreeMapBackedSortedColumns create2 = TreeMapBackedSortedColumns.factory.create(columnFamilyStore.metadata);
                CounterContext.ContextState allocate = CounterContext.ContextState.allocate(0, 1, 3, HeapAllocator.instance);
                allocate.writeLocal(CounterId.fromInt(2), 9L, 3L);
                allocate.writeRemote(CounterId.fromInt(4), 4L, 2L);
                allocate.writeRemote(CounterId.fromInt(6), 3L, 3L);
                allocate.writeRemote(CounterId.fromInt(8), 2L, 4L);
                create.addColumn(new CounterColumn(ByteBufferUtil.bytes(str2), allocate.context, j));
                create2.addColumn(new CounterColumn(ByteBufferUtil.bytes(str2), counterContext.clearAllLocal(allocate.context), j));
                hashMap2.put(str, create);
                hashMap.put(str, create2);
                columnFamilyStore.addSSTable(SSTableUtils.prepare().ks(open.getName()).cf(columnFamilyStore.name).generation(0).write(hashMap2));
            }
        }, true));
        SSTableReader write = SSTableUtils.prepare().ks(open.getName()).cf(columnFamilyStore.name).generation(0).write(hashMap);
        SSTableReader next = columnFamilyStore.getSSTables().iterator().next();
        SSTableUtils.assertContentEquals(write, next);
        columnFamilyStore.clearUnsafe();
        transferSSTables(next);
        SSTableUtils.assertContentEquals(next, columnFamilyStore.getSSTables().iterator().next());
    }

    @Test
    public void testTransferTableMultiple() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add("test");
        hashSet.add("test2");
        hashSet.add("test3");
        SSTableReader write = SSTableUtils.prepare().write(hashSet);
        String keyspaceName = write.getKeyspaceName();
        String columnFamilyName = write.getColumnFamilyName();
        HashSet hashSet2 = new HashSet();
        hashSet2.add("transfer1");
        hashSet2.add("transfer2");
        hashSet2.add("transfer3");
        SSTableReader write2 = SSTableUtils.prepare().write(hashSet2);
        IPartitioner partitioner = StorageService.getPartitioner();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Range<>(partitioner.getMinimumToken(), partitioner.getToken(ByteBufferUtil.bytes("test"))));
        arrayList.add(new Range<>(partitioner.getToken(ByteBufferUtil.bytes("transfer2")), partitioner.getMinimumToken()));
        write.acquireReference();
        write2.acquireReference();
        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(arrayList, Arrays.asList(write, write2))).execute().get();
        ColumnFamilyStore columnFamilyStore = Keyspace.open(keyspaceName).getColumnFamilyStore(columnFamilyName);
        List<Row> rangeSlice = Util.getRangeSlice(columnFamilyStore);
        Assert.assertEquals(2L, rangeSlice.size());
        if (!$assertionsDisabled && !rangeSlice.get(0).key.key.equals(ByteBufferUtil.bytes("test"))) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !rangeSlice.get(1).key.key.equals(ByteBufferUtil.bytes("transfer3"))) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && rangeSlice.get(0).cf.getColumnCount() != 1) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && rangeSlice.get(1).cf.getColumnCount() != 1) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && columnFamilyStore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), "Standard1", System.currentTimeMillis())) != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && columnFamilyStore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), "Standard1", System.currentTimeMillis())) != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && columnFamilyStore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), "Standard1", System.currentTimeMillis())) != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && columnFamilyStore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), "Standard1", System.currentTimeMillis())) != null) {
            throw new AssertionError();
        }
    }

    @Test
    public void testTransferOfMultipleColumnFamilies() throws Exception {
        IPartitioner partitioner = StorageService.getPartitioner();
        ArrayList arrayList = new ArrayList();
        TreeMap treeMap = new TreeMap();
        for (String str : new String[]{"Standard1", "Standard2", "Standard3"}) {
            HashSet hashSet = new HashSet();
            hashSet.add("data-" + str + "-1");
            hashSet.add("data-" + str + "-2");
            hashSet.add("data-" + str + "-3");
            arrayList.add(SSTableUtils.prepare().ks("KeyCacheSpace").cf(str).write(hashSet));
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                treeMap.put(Util.dk((String) it.next()), str);
            }
        }
        Map.Entry firstEntry = treeMap.firstEntry();
        Map.Entry lastEntry = treeMap.lastEntry();
        Map.Entry lowerEntry = treeMap.lowerEntry(lastEntry.getKey());
        List<Range<Token>> arrayList2 = new ArrayList<>();
        arrayList2.add(new Range<>(partitioner.getMinimumToken(), ((DecoratedKey) firstEntry.getKey()).token));
        arrayList2.add(new Range<>(((DecoratedKey) lowerEntry.getKey()).token, partitioner.getMinimumToken()));
        if (!SSTableReader.acquireReferences(arrayList)) {
            throw new AssertionError();
        }
        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(arrayList2, arrayList)).execute().get();
        for (Map.Entry entry : Arrays.asList(firstEntry, lastEntry)) {
            List<Row> rangeSlice = Util.getRangeSlice(Keyspace.open("KeyCacheSpace").getColumnFamilyStore((String) entry.getValue()));
            Assert.assertEquals(rangeSlice.toString(), 1L, rangeSlice.size());
            Assert.assertEquals(entry.getKey(), rangeSlice.get(0).key);
        }
    }

    @Test
    public void testRandomSSTableTransfer() throws Exception {
        final Keyspace open = Keyspace.open("Keyspace1");
        final ColumnFamilyStore columnFamilyStore = open.getColumnFamilyStore("Standard1");
        Mutator mutator = new Mutator() { // from class: org.apache.cassandra.streaming.StreamingTransferTest.4
            @Override // org.apache.cassandra.streaming.StreamingTransferTest.Mutator
            public void mutate(String str, String str2, long j) throws Exception {
                TreeMapBackedSortedColumns create = TreeMapBackedSortedColumns.factory.create(open.getName(), columnFamilyStore.name);
                create.addColumn(Util.column(str2, CFMetaData.DEFAULT_VALUE_ALIAS, j));
                create.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(new Date(j).toString()), j));
                RowMutation rowMutation = new RowMutation("Keyspace1", ByteBufferUtil.bytes(str), create);
                StreamingTransferTest.logger.debug("Applying row to transfer " + rowMutation);
                rowMutation.apply();
            }
        };
        for (int i = 1; i <= 6000; i++) {
            mutator.mutate(CFMetaData.DEFAULT_KEY_ALIAS + i, "col" + i, System.currentTimeMillis());
        }
        columnFamilyStore.forceBlockingFlush();
        Util.compactAll(columnFamilyStore, CompactionManager.GC_ALL).get();
        SSTableReader next = columnFamilyStore.getSSTables().iterator().next();
        columnFamilyStore.clearUnsafe();
        IPartitioner partitioner = StorageService.getPartitioner();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Range<>(partitioner.getToken(ByteBufferUtil.bytes("key1")), partitioner.getToken(ByteBufferUtil.bytes("key1000"))));
        arrayList.add(new Range<>(partitioner.getToken(ByteBufferUtil.bytes("key5")), partitioner.getToken(ByteBufferUtil.bytes("key500"))));
        arrayList.add(new Range<>(partitioner.getToken(ByteBufferUtil.bytes("key9")), partitioner.getToken(ByteBufferUtil.bytes("key900"))));
        transfer(next, arrayList);
        Assert.assertEquals(1L, columnFamilyStore.getSSTables().size());
        Assert.assertEquals(7L, Util.getRangeSlice(columnFamilyStore).size());
    }

    static {
        $assertionsDisabled = !StreamingTransferTest.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StreamingTransferTest.class);
        LOCAL = FBUtilities.getBroadcastAddress();
    }
}
