package org.apache.cassandra.streaming;

import com.google.common.base.Throwables;
import com.ning.compress.lzf.LZFInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
import java.util.UUID;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/cassandra-all-2.2.0.jar:org/apache/cassandra/streaming/StreamReader.class */
public class StreamReader {
    private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
    protected final UUID cfId;
    protected final long estimatedKeys;
    protected final Collection<Pair<Long, Long>> sections;
    protected final StreamSession session;
    protected final Version inputVersion;
    protected final long repairedAt;
    protected final SSTableFormat.Type format;
    protected final int sstableLevel;
    protected Descriptor desc;

    public StreamReader(FileMessageHeader fileMessageHeader, StreamSession streamSession) {
        this.session = streamSession;
        this.cfId = fileMessageHeader.cfId;
        this.estimatedKeys = fileMessageHeader.estimatedKeys;
        this.sections = fileMessageHeader.sections;
        this.inputVersion = fileMessageHeader.format.f205info.getVersion(fileMessageHeader.version);
        this.repairedAt = fileMessageHeader.repairedAt;
        this.format = fileMessageHeader.format;
        this.sstableLevel = fileMessageHeader.sstableLevel;
    }

    public SSTableWriter read(ReadableByteChannel readableByteChannel) throws IOException {
        logger.debug("reading file from {}, repairedAt = {}, level = {}", this.session.peer, Long.valueOf(this.repairedAt), Integer.valueOf(this.sstableLevel));
        long j = totalSize();
        Pair<String, String> cf = Schema.instance.getCF(this.cfId);
        if (cf == null) {
            throw new IOException("CF " + this.cfId + " was dropped during streaming");
        }
        ColumnFamilyStore columnFamilyStore = Keyspace.open(cf.left).getColumnFamilyStore(cf.right);
        SSTableWriter createWriter = createWriter(columnFamilyStore, j, this.repairedAt, this.format);
        DataInputStream dataInputStream = new DataInputStream(new LZFInputStream(Channels.newInputStream(readableByteChannel)));
        BytesReadTracker bytesReadTracker = new BytesReadTracker(dataInputStream);
        while (bytesReadTracker.getBytesRead() < j) {
            try {
                writeRow(createWriter, bytesReadTracker, columnFamilyStore);
                this.session.progress(this.desc, ProgressInfo.Direction.IN, bytesReadTracker.getBytesRead(), j);
            } catch (Throwable th) {
                createWriter.abort();
                drain(dataInputStream, bytesReadTracker.getBytesRead());
                if (th instanceof IOException) {
                    throw ((IOException) th);
                }
                throw Throwables.propagate(th);
            }
        }
        return createWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SSTableWriter createWriter(ColumnFamilyStore columnFamilyStore, long j, long j2, SSTableFormat.Type type) throws IOException {
        Directories.DataDirectory writeableLocation = columnFamilyStore.directories.getWriteableLocation(j);
        if (writeableLocation == null) {
            throw new IOException("Insufficient disk space to store " + j + " bytes");
        }
        this.desc = Descriptor.fromFilename(columnFamilyStore.getTempSSTablePath(columnFamilyStore.directories.getLocationForDisk(writeableLocation), type));
        return SSTableWriter.create(this.desc, this.estimatedKeys, j2, this.sstableLevel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void drain(InputStream inputStream, long j) throws IOException {
        long j2 = totalSize() - j;
        long skip = inputStream.skip(j2);
        if (skip == -1) {
            return;
        }
        do {
            j2 -= skip;
            if (j2 <= 0) {
                return;
            } else {
                skip = inputStream.skip(j2);
            }
        } while (skip != -1);
    }

    protected long totalSize() {
        long j = 0;
        for (Pair<Long, Long> pair : this.sections) {
            j += pair.right.longValue() - pair.left.longValue();
        }
        return j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeRow(SSTableWriter sSTableWriter, DataInput dataInput, ColumnFamilyStore columnFamilyStore) throws IOException {
        DecoratedKey decorateKey = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dataInput));
        sSTableWriter.appendFromStream(decorateKey, columnFamilyStore.metadata, dataInput, this.inputVersion);
        columnFamilyStore.invalidateCachedRow(decorateKey);
    }
}
