package org.apache.flink.runtime.io.network.api.serialization;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.testutils.serialization.types.IntType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.class */
public class SpanningRecordSerializationTest extends TestLogger {
    private static final Random RANDOM = new Random(42);

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest$BufferAndSerializerResult.class */
    public static class BufferAndSerializerResult {
        private final BufferBuilder bufferBuilder;
        private final BufferConsumer bufferConsumer;
        private final RecordSerializer.SerializationResult serializationResult;

        public BufferAndSerializerResult(BufferBuilder bufferBuilder, BufferConsumer bufferConsumer, RecordSerializer.SerializationResult serializationResult) {
            this.bufferBuilder = bufferBuilder;
            this.bufferConsumer = bufferConsumer;
            this.serializationResult = serializationResult;
        }

        public BufferBuilder getBufferBuilder() {
            return this.bufferBuilder;
        }

        public Buffer buildBuffer() {
            return BufferBuilderTestUtils.buildSingleBuffer(this.bufferConsumer);
        }

        public boolean isFullBuffer() {
            return this.serializationResult.isFullBuffer();
        }

        public boolean isFullRecord() {
            return this.serializationResult.isFullRecord();
        }
    }

    @Test
    public void testIntRecordsSpanningMultipleSegments() throws Exception {
        testSerializationRoundTrip(Util.randomRecords(10, SerializationTestTypeFactory.INT), 1);
    }

    @Test
    public void testIntRecordsWithAlignedBuffers() throws Exception {
        testSerializationRoundTrip(Util.randomRecords(64, SerializationTestTypeFactory.INT), 64);
    }

    @Test
    public void testIntRecordsWithUnalignedBuffers() throws Exception {
        testSerializationRoundTrip(Util.randomRecords(248, SerializationTestTypeFactory.INT), 31);
    }

    @Test
    public void testRandomRecords() throws Exception {
        testSerializationRoundTrip(Util.randomRecords(10000), 127);
    }

    @Test
    public void testHandleMixedLargeRecords() throws Exception {
        ArrayList arrayList = new ArrayList(50);
        LargeObjectType largeObjectType = new LargeObjectType();
        Random random = new Random();
        for (int i = 0; i < 99; i++) {
            if (i % 2 == 0) {
                arrayList.add(new IntType(42));
            } else {
                arrayList.add(largeObjectType.m153getRandom(random));
            }
        }
        testSerializationRoundTrip(arrayList, 32768);
    }

    private void testSerializationRoundTrip(Iterable<SerializationTestType> iterable, int i) throws Exception {
        testSerializationRoundTrip(iterable, i, new SpanningRecordSerializer(), new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()}));
    }

    private static void testSerializationRoundTrip(Iterable<SerializationTestType> iterable, int i, RecordSerializer<SerializationTestType> recordSerializer, RecordDeserializer<SerializationTestType> recordDeserializer) throws Exception {
        ArrayDeque arrayDeque = new ArrayDeque();
        BufferAndSerializerResult nextBufferForSerializer = setNextBufferForSerializer(recordSerializer, i);
        int i2 = 0;
        for (SerializationTestType serializationTestType : iterable) {
            arrayDeque.add(serializationTestType);
            i2++;
            recordSerializer.serializeRecord(serializationTestType);
            if (recordSerializer.copyToBufferBuilder(nextBufferForSerializer.getBufferBuilder()).isFullBuffer()) {
                recordDeserializer.setNextBuffer(nextBufferForSerializer.buildBuffer());
                i2 -= DeserializationUtils.deserializeRecords(arrayDeque, recordDeserializer);
                while (true) {
                    BufferAndSerializerResult nextBufferForSerializer2 = setNextBufferForSerializer(recordSerializer, i);
                    nextBufferForSerializer = nextBufferForSerializer2;
                    if (nextBufferForSerializer2.isFullBuffer()) {
                        recordDeserializer.setNextBuffer(nextBufferForSerializer.buildBuffer());
                    }
                }
            }
        }
        recordDeserializer.setNextBuffer(nextBufferForSerializer.buildBuffer());
        while (!arrayDeque.isEmpty()) {
            SerializationTestType serializationTestType2 = (SerializationTestType) arrayDeque.poll();
            SerializationTestType serializationTestType3 = (SerializationTestType) serializationTestType2.getClass().newInstance();
            Assert.assertTrue(recordDeserializer.getNextRecord(serializationTestType3).isFullRecord());
            Assert.assertEquals(serializationTestType2, serializationTestType3);
            i2--;
        }
        Assert.assertEquals(0L, i2);
        Assert.assertFalse(recordSerializer.hasSerializedData());
        Assert.assertFalse(recordDeserializer.hasUnfinishedData());
    }

    @Test
    public void testSmallRecordUnconsumedBuffer() throws Exception {
        testUnconsumedBuffer(new SpanningRecordSerializer(), new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()}), Util.randomRecord(SerializationTestTypeFactory.INT), 1024, new byte[0]);
    }

    @Test
    public void testSpanningRecordUnconsumedBuffer() throws Exception {
        testUnconsumedBuffer(new SpanningRecordSerializer(), new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()}), Util.randomRecord(SerializationTestTypeFactory.INT), 1, new byte[0]);
    }

    @Test
    public void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
        testUnconsumedBuffer(new SpanningRecordSerializer(), new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()}), Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1, new byte[0]);
    }

    @Test
    public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception {
        SpanningRecordSerializer spanningRecordSerializer = new SpanningRecordSerializer();
        SpillingAdaptiveSpanningRecordDeserializer spillingAdaptiveSpanningRecordDeserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        testUnconsumedBuffer(spanningRecordSerializer, spillingAdaptiveSpanningRecordDeserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1, 42, 43, 44);
        spillingAdaptiveSpanningRecordDeserializer.clear();
        testUnconsumedBuffer(spanningRecordSerializer, spillingAdaptiveSpanningRecordDeserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1, 42, 43, 44);
    }

    public void testUnconsumedBuffer(RecordSerializer<SerializationTestType> recordSerializer, RecordDeserializer<SerializationTestType> recordDeserializer, SerializationTestType serializationTestType, int i, byte... bArr) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            try {
                recordSerializer.serializeRecord(serializationTestType);
                BufferAndSerializerResult nextBufferForSerializer = setNextBufferForSerializer(recordSerializer, i);
                if (recordSerializer.copyToBufferBuilder(nextBufferForSerializer.getBufferBuilder()).isFullBuffer()) {
                    Buffer buildBuffer = nextBufferForSerializer.buildBuffer();
                    writeBuffer(buildBuffer.readOnlySlice().getNioBufferReadable(), byteArrayOutputStream);
                    recordDeserializer.setNextBuffer(buildBuffer);
                    assertUnconsumedBuffer(byteArrayOutputStream, recordDeserializer.getUnconsumedBuffer());
                    recordDeserializer.getNextRecord((IOReadableWritable) serializationTestType.getClass().newInstance());
                    while (true) {
                        BufferAndSerializerResult nextBufferForSerializer2 = setNextBufferForSerializer(recordSerializer, i);
                        if (!nextBufferForSerializer2.isFullBuffer()) {
                            break;
                        }
                        Buffer buildBuffer2 = nextBufferForSerializer2.buildBuffer();
                        if (nextBufferForSerializer2.isFullRecord()) {
                            buildBuffer2 = appendLeftOverBytes(buildBuffer2, bArr);
                        }
                        writeBuffer(buildBuffer2.readOnlySlice().getNioBufferReadable(), byteArrayOutputStream);
                        recordDeserializer.setNextBuffer(buildBuffer2);
                        assertUnconsumedBuffer(byteArrayOutputStream, recordDeserializer.getUnconsumedBuffer());
                        recordDeserializer.getNextRecord((IOReadableWritable) serializationTestType.getClass().newInstance());
                    }
                }
                if (byteArrayOutputStream != null) {
                    if (0 == 0) {
                        byteArrayOutputStream.close();
                        return;
                    }
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (byteArrayOutputStream != null) {
                if (th != null) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private static Buffer appendLeftOverBytes(Buffer buffer, byte[] bArr) {
        BufferBuilder bufferBuilder = new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(buffer.readableBytes() + bArr.length), FreeingBufferRecycler.INSTANCE);
        BufferConsumer createBufferConsumer = bufferBuilder.createBufferConsumer();
        Throwable th = null;
        try {
            try {
                bufferBuilder.append(buffer.getNioBufferReadable());
                bufferBuilder.appendAndCommit(ByteBuffer.wrap(bArr));
                Buffer build = createBufferConsumer.build();
                if (createBufferConsumer != null) {
                    if (0 != 0) {
                        try {
                            createBufferConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createBufferConsumer.close();
                    }
                }
                return build;
            } finally {
            }
        } catch (Throwable th3) {
            if (createBufferConsumer != null) {
                if (th != null) {
                    try {
                        createBufferConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createBufferConsumer.close();
                }
            }
            throw th3;
        }
    }

    private static void assertUnconsumedBuffer(ByteArrayOutputStream byteArrayOutputStream, CloseableIterator<Buffer> closeableIterator) throws Exception {
        if (!closeableIterator.hasNext()) {
            Assert.assertEquals(byteArrayOutputStream.size(), 0L);
        }
        Assert.assertEquals(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()), ((Buffer) closeableIterator.next()).getNioBufferReadable());
        closeableIterator.close();
    }

    private static void writeBuffer(ByteBuffer byteBuffer, OutputStream outputStream) throws IOException {
        Channels.newChannel(outputStream).write(byteBuffer);
    }

    private static BufferAndSerializerResult setNextBufferForSerializer(RecordSerializer<SerializationTestType> recordSerializer, int i) throws IOException {
        int nextInt = i > 2 ? RANDOM.nextInt(i / 2) : 0;
        BufferBuilder createFilledBufferBuilder = BufferBuilderTestUtils.createFilledBufferBuilder(i + nextInt, nextInt);
        BufferConsumer createBufferConsumer = createFilledBufferBuilder.createBufferConsumer();
        createBufferConsumer.build().recycleBuffer();
        return new BufferAndSerializerResult(createFilledBufferBuilder, createBufferConsumer, recordSerializer.copyToBufferBuilder(createFilledBufferBuilder));
    }
}
