/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming.compress;

import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.zip.Checksum;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.streaming.compress.CompressionInfo;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompressedInputStream
extends InputStream {
    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
    private final CompressionInfo info;
    private final BlockingQueue<byte[]> dataBuffer;
    private final Supplier<Double> crcCheckChanceSupplier;
    private final byte[] buffer;
    protected long bufferOffset = 0L;
    private long current = 0L;
    protected int validBufferBytes = -1;
    private final Checksum checksum;
    private final byte[] checksumBytes = new byte[4];
    private static final byte[] POISON_PILL = new byte[0];
    protected volatile IOException readException = null;
    private long totalCompressedBytesRead;

    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier) {
        this.info = info;
        this.checksum = checksumType.newInstance();
        this.buffer = new byte[info.parameters.chunkLength()];
        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
        new Thread(NamedThreadFactory.threadLocalDeallocator(new Reader(source, info, this.dataBuffer))).start();
    }

    @Override
    public int read() throws IOException {
        if (this.readException != null) {
            throw this.readException;
        }
        if (this.current >= this.bufferOffset + (long)this.buffer.length || this.validBufferBytes == -1) {
            try {
                byte[] compressedWithCRC = this.dataBuffer.take();
                if (compressedWithCRC == POISON_PILL) {
                    assert (this.readException != null);
                    throw this.readException;
                }
                this.decompress(compressedWithCRC);
            }
            catch (InterruptedException e) {
                throw new EOFException("No chunk available");
            }
        }
        assert (this.current >= this.bufferOffset && this.current < this.bufferOffset + (long)this.validBufferBytes);
        return this.buffer[(int)(this.current++ - this.bufferOffset)] & 0xFF;
    }

    public void position(long position) {
        assert (position >= this.current) : "stream can only read forward.";
        this.current = position;
    }

    private void decompress(byte[] compressed) throws IOException {
        this.validBufferBytes = this.info.parameters.getSstableCompressor().uncompress(compressed, 0, compressed.length - this.checksumBytes.length, this.buffer, 0);
        this.totalCompressedBytesRead += (long)compressed.length;
        if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) {
            this.checksum.update(compressed, 0, compressed.length - this.checksumBytes.length);
            System.arraycopy(compressed, compressed.length - this.checksumBytes.length, this.checksumBytes, 0, this.checksumBytes.length);
            if (Ints.fromByteArray(this.checksumBytes) != (int)this.checksum.getValue()) {
                throw new IOException("CRC unmatched");
            }
            this.checksum.reset();
        }
        this.bufferOffset = this.current & (long)(~(this.buffer.length - 1));
    }

    public long getTotalCompressedBytesRead() {
        return this.totalCompressedBytesRead;
    }

    class Reader
    extends WrappedRunnable {
        private final InputStream source;
        private final Iterator<CompressionMetadata.Chunk> chunks;
        private final BlockingQueue<byte[]> dataBuffer;

        Reader(InputStream source, CompressionInfo info, BlockingQueue<byte[]> dataBuffer) {
            this.source = source;
            this.chunks = Iterators.forArray(info.chunks);
            this.dataBuffer = dataBuffer;
        }

        @Override
        protected void runMayThrow() throws Exception {
            while (this.chunks.hasNext()) {
                int r;
                CompressionMetadata.Chunk chunk = this.chunks.next();
                int readLength = chunk.length + 4;
                byte[] compressedWithCRC = new byte[readLength];
                for (int bufferRead = 0; bufferRead < readLength; bufferRead += r) {
                    try {
                        r = this.source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                        if (r >= 0) continue;
                        CompressedInputStream.this.readException = new EOFException("No chunk available");
                        this.dataBuffer.put(POISON_PILL);
                        return;
                    }
                    catch (IOException e) {
                        logger.warn("Error while reading compressed input stream.", e);
                        CompressedInputStream.this.readException = e;
                        this.dataBuffer.put(POISON_PILL);
                        return;
                    }
                }
                this.dataBuffer.put(compressedWithCRC);
            }
        }
    }
}

