/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.astyanax.shaded.org.apache.cassandra.streaming;

import com.google.common.base.Throwables;
import com.netflix.astyanax.shaded.org.apache.cassandra.config.Schema;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.ColumnFamilyStore;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.DecoratedKey;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.Directories;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.Keyspace;
import com.netflix.astyanax.shaded.org.apache.cassandra.io.sstable.Component;
import com.netflix.astyanax.shaded.org.apache.cassandra.io.sstable.Descriptor;
import com.netflix.astyanax.shaded.org.apache.cassandra.io.sstable.SSTableWriter;
import com.netflix.astyanax.shaded.org.apache.cassandra.service.StorageService;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.ProgressInfo;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamSession;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.FileMessageHeader;
import com.netflix.astyanax.shaded.org.apache.cassandra.utils.ByteBufferUtil;
import com.netflix.astyanax.shaded.org.apache.cassandra.utils.BytesReadTracker;
import com.netflix.astyanax.shaded.org.apache.cassandra.utils.Pair;
import com.ning.compress.lzf.LZFInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
import java.util.UUID;

public class StreamReader {
    protected final UUID cfId;
    protected final long estimatedKeys;
    protected final Collection<Pair<Long, Long>> sections;
    protected final StreamSession session;
    protected final Descriptor.Version inputVersion;
    protected Descriptor desc;

    public StreamReader(FileMessageHeader header, StreamSession session) {
        this.session = session;
        this.cfId = header.cfId;
        this.estimatedKeys = header.estimatedKeys;
        this.sections = header.sections;
        this.inputVersion = new Descriptor.Version(header.version);
    }

    public SSTableWriter read(ReadableByteChannel channel) throws IOException {
        long totalSize = this.totalSize();
        Pair<String, String> kscf = Schema.instance.getCF(this.cfId);
        if (kscf == null) {
            throw new IOException("CF " + this.cfId + " was dropped during streaming");
        }
        ColumnFamilyStore cfs = Keyspace.open((String)kscf.left).getColumnFamilyStore((String)kscf.right);
        SSTableWriter writer = this.createWriter(cfs, totalSize);
        DataInputStream dis = new DataInputStream((InputStream)new LZFInputStream(Channels.newInputStream(channel)));
        BytesReadTracker in = new BytesReadTracker(dis);
        try {
            while (in.getBytesRead() < totalSize) {
                this.writeRow(writer, in, cfs);
                this.session.progress(this.desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
            }
            return writer;
        }
        catch (Throwable e) {
            writer.abort();
            this.drain(dis, in.getBytesRead());
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            throw Throwables.propagate((Throwable)e);
        }
    }

    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException {
        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
        if (localDir == null) {
            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
        }
        this.desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
        return new SSTableWriter(this.desc.filenameFor(Component.DATA), this.estimatedKeys);
    }

    protected void drain(InputStream dis, long bytesRead) throws IOException {
        long toSkip = this.totalSize() - bytesRead;
        long skipped = dis.skip(toSkip);
        if (skipped == -1L) {
            return;
        }
        toSkip -= skipped;
        while (toSkip > 0L && (skipped = dis.skip(toSkip)) != -1L) {
            toSkip -= skipped;
        }
    }

    protected long totalSize() {
        long size = 0L;
        for (Pair<Long, Long> section : this.sections) {
            size += (Long)section.right - (Long)section.left;
        }
        return size;
    }

    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException {
        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
        writer.appendFromStream(key, cfs.metadata, in, this.inputVersion);
        cfs.invalidateCachedRow(key);
    }
}

