/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

public class TestReconstructStripedFile {
    public static final Log LOG = LogFactory.getLog(TestReconstructStripedFile.class);
    private final ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
    private final int dataBlkNum = this.ecPolicy.getNumDataUnits();
    private final int parityBlkNum = this.ecPolicy.getNumParityUnits();
    private final int cellSize = this.ecPolicy.getCellSize();
    private final int blockSize = this.cellSize * 3;
    private final int groupSize = this.dataBlkNum + this.parityBlkNum;
    private final int dnNum = this.groupSize + this.parityBlkNum;
    private Configuration conf;
    private MiniDFSCluster cluster;
    private DistributedFileSystem fs;
    private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
    private final Random random = new Random();

    @Before
    public void setup() throws IOException {
        this.conf = new Configuration();
        this.conf.setLong("dfs.blocksize", (long)this.blockSize);
        this.conf.setInt("dfs.datanode.ec.reconstruction.stripedread.buffer.size", this.cellSize - 1);
        this.conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        this.conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        if (ErasureCodeNative.isNativeCodeLoaded()) {
            this.conf.set("io.erasurecode.codec.rs.rawcoders", "rs_native");
        }
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(this.dnNum).build();
        this.cluster.waitActive();
        this.fs = this.cluster.getFileSystem();
        this.fs.enableErasureCodingPolicy(StripedFileTestUtil.getDefaultECPolicy().getName());
        this.fs.getClient().setErasureCodingPolicy("/", StripedFileTestUtil.getDefaultECPolicy().getName());
        ArrayList<DataNode> datanodes = this.cluster.getDataNodes();
        for (int i = 0; i < this.dnNum; ++i) {
            this.dnMap.put(((DataNode)datanodes.get(i)).getDatanodeId(), i);
        }
    }

    @After
    public void tearDown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test(timeout=120000L)
    public void testRecoverOneParityBlock() throws Exception {
        int fileLen = (this.dataBlkNum + 1) * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverOneParityBlock", fileLen, ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverOneParityBlock1() throws Exception {
        int fileLen = this.cellSize + this.cellSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverOneParityBlock1", fileLen, ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverOneParityBlock2() throws Exception {
        int fileLen = 1;
        this.assertFileBlocksReconstruction("/testRecoverOneParityBlock2", fileLen, ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverOneParityBlock3() throws Exception {
        int fileLen = (this.dataBlkNum - 1) * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverOneParityBlock3", fileLen, ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverAllParityBlocks() throws Exception {
        int fileLen = this.dataBlkNum * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverAllParityBlocks", fileLen, ReconstructionType.ParityOnly, this.parityBlkNum);
    }

    @Test(timeout=120000L)
    public void testRecoverAllDataBlocks() throws Exception {
        int fileLen = (this.dataBlkNum + this.parityBlkNum) * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverAllDataBlocks", fileLen, ReconstructionType.DataOnly, this.parityBlkNum);
    }

    @Test(timeout=120000L)
    public void testRecoverAllDataBlocks1() throws Exception {
        int fileLen = this.parityBlkNum * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverAllDataBlocks1", fileLen, ReconstructionType.DataOnly, this.parityBlkNum);
    }

    @Test(timeout=120000L)
    public void testRecoverOneDataBlock() throws Exception {
        int fileLen = (this.dataBlkNum + 1) * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverOneDataBlock", fileLen, ReconstructionType.DataOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverOneDataBlock1() throws Exception {
        int fileLen = this.cellSize + this.cellSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverOneDataBlock1", fileLen, ReconstructionType.DataOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverOneDataBlock2() throws Exception {
        int fileLen = 1;
        this.assertFileBlocksReconstruction("/testRecoverOneDataBlock2", fileLen, ReconstructionType.DataOnly, 1);
    }

    @Test(timeout=120000L)
    public void testRecoverAnyBlocks() throws Exception {
        int fileLen = this.parityBlkNum * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverAnyBlocks", fileLen, ReconstructionType.Any, this.random.nextInt(this.parityBlkNum) + 1);
    }

    @Test(timeout=120000L)
    public void testRecoverAnyBlocks1() throws Exception {
        int fileLen = (this.dataBlkNum + this.parityBlkNum) * this.blockSize + this.blockSize / 10;
        this.assertFileBlocksReconstruction("/testRecoverAnyBlocks1", fileLen, ReconstructionType.Any, this.random.nextInt(this.parityBlkNum) + 1);
    }

    private int[] generateDeadDnIndices(ReconstructionType type, int deadNum, byte[] indices) {
        ArrayList<Integer> deadList = new ArrayList<Integer>(deadNum);
        while (deadList.size() < deadNum) {
            int dead = this.random.nextInt(indices.length);
            boolean isOfType = true;
            if (type == ReconstructionType.DataOnly) {
                isOfType = indices[dead] < this.dataBlkNum;
            } else if (type == ReconstructionType.ParityOnly) {
                boolean bl = isOfType = indices[dead] >= this.dataBlkNum;
            }
            if (!isOfType || deadList.contains(dead)) continue;
            deadList.add(dead);
        }
        int[] d = new int[deadNum];
        for (int i = 0; i < deadNum; ++i) {
            d[i] = (Integer)deadList.get(i);
        }
        return d;
    }

    private void shutdownDataNode(DataNode dn) throws IOException {
        dn.shutdown();
        this.cluster.setDataNodeDead(dn.getDatanodeId());
    }

    private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets, ReconstructionType type) throws IOException {
        int stoppedDNs = 0;
        for (Map.Entry<ExtendedBlock, DataNode> target : corruptTargets.entrySet()) {
            if (stoppedDNs == 0 || type != ReconstructionType.DataOnly || this.random.nextBoolean()) {
                LOG.info((Object)("Note: stop DataNode " + target.getValue().getDisplayName() + " with internal block " + target.getKey()));
                this.shutdownDataNode(target.getValue());
                ++stoppedDNs;
                continue;
            }
            LOG.info((Object)("Note: corrupt data on " + target.getValue().getDisplayName() + " with internal block " + target.getKey()));
            this.cluster.corruptReplica(target.getValue(), target.getKey());
        }
        return stoppedDNs;
    }

    private static void writeFile(DistributedFileSystem fs, String fileName, int fileLen) throws Exception {
        byte[] data = new byte[fileLen];
        Arrays.fill(data, (byte)1);
        DFSTestUtil.writeFile((FileSystem)fs, new Path(fileName), data);
        StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
    }

    private void assertFileBlocksReconstruction(String fileName, int fileLen, ReconstructionType type, int toRecoverBlockNum) throws Exception {
        int i;
        if (toRecoverBlockNum < 1 || toRecoverBlockNum > this.parityBlkNum) {
            Assert.fail((String)("toRecoverBlockNum should be between 1 ~ " + this.parityBlkNum));
        }
        Assert.assertTrue((String)"File length must be positive.", (fileLen > 0 ? 1 : 0) != 0);
        Path file = new Path(fileName);
        TestReconstructStripedFile.writeFile(this.fs, fileName, fileLen);
        LocatedBlocks locatedBlocks = StripedFileTestUtil.getLocatedBlocks(file, this.fs);
        Assert.assertEquals((long)locatedBlocks.getFileLength(), (long)fileLen);
        LocatedStripedBlock lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
        DatanodeInfo[] storageInfos = lastBlock.getLocations();
        byte[] indices = lastBlock.getBlockIndices();
        BitSet bitset = new BitSet(this.dnNum);
        for (DatanodeInfo storageInfo : storageInfos) {
            bitset.set(this.dnMap.get(storageInfo));
        }
        int[] dead = this.generateDeadDnIndices(type, toRecoverBlockNum, indices);
        LOG.info((Object)("Note: indices == " + Arrays.toString(indices) + ". Generate errors on datanodes: " + Arrays.toString(dead)));
        DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
        int[] deadDnIndices = new int[toRecoverBlockNum];
        ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum];
        File[] replicas = new File[toRecoverBlockNum];
        long[] replicaLengths = new long[toRecoverBlockNum];
        File[] metadatas = new File[toRecoverBlockNum];
        byte[][] replicaContents = new byte[toRecoverBlockNum][];
        HashMap<ExtendedBlock, DataNode> errorMap = new HashMap<ExtendedBlock, DataNode>(dead.length);
        for (int i2 = 0; i2 < toRecoverBlockNum; ++i2) {
            dataDNs[i2] = storageInfos[dead[i2]];
            deadDnIndices[i2] = this.dnMap.get(dataDNs[i2]);
            blocks[i2] = StripedBlockUtil.constructInternalBlock((ExtendedBlock)lastBlock.getBlock(), (int)this.cellSize, (int)this.dataBlkNum, (int)indices[dead[i2]]);
            errorMap.put(blocks[i2], this.cluster.getDataNodes().get(deadDnIndices[i2]));
            replicas[i2] = this.cluster.getBlockFile(deadDnIndices[i2], blocks[i2]);
            replicaLengths[i2] = replicas[i2].length();
            metadatas[i2] = this.cluster.getBlockMetadataFile(deadDnIndices[i2], blocks[i2]);
            Assert.assertEquals((long)replicaLengths[i2], (long)StripedBlockUtil.getInternalBlockLength((long)lastBlock.getBlockSize(), (int)this.cellSize, (int)this.dataBlkNum, (int)indices[dead[i2]]));
            Assert.assertTrue((boolean)metadatas[i2].getName().endsWith(blocks[i2].getGenerationStamp() + ".meta"));
            LOG.info((Object)("replica " + i2 + " locates in file: " + replicas[i2]));
            replicaContents[i2] = DFSTestUtil.readFileAsBytes(replicas[i2]);
        }
        int lastGroupDataLen = fileLen % (this.dataBlkNum * this.blockSize);
        int lastGroupNumBlk = lastGroupDataLen == 0 ? this.dataBlkNum : Math.min(this.dataBlkNum, (lastGroupDataLen - 1) / this.cellSize + 1);
        int groupSize = lastGroupNumBlk + this.parityBlkNum;
        int stoppedDN = this.generateErrors(errorMap, type);
        locatedBlocks = StripedFileTestUtil.getLocatedBlocks(file, this.fs);
        lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
        storageInfos = lastBlock.getLocations();
        Assert.assertEquals((long)storageInfos.length, (long)(groupSize - stoppedDN));
        int[] targetDNs = new int[this.dnNum - groupSize];
        int n = 0;
        for (i = 0; i < this.dnNum; ++i) {
            if (bitset.get(i)) continue;
            targetDNs[n++] = i;
        }
        StripedFileTestUtil.waitForReconstructionFinished(file, this.fs, groupSize);
        targetDNs = this.sortTargetsByReplicas(blocks, targetDNs);
        for (i = 0; i < toRecoverBlockNum; ++i) {
            File replicaAfterReconstruction = this.cluster.getBlockFile(targetDNs[i], blocks[i]);
            LOG.info((Object)("replica after reconstruction " + replicaAfterReconstruction));
            File metadataAfterReconstruction = this.cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
            Assert.assertEquals((long)replicaLengths[i], (long)replicaAfterReconstruction.length());
            LOG.info((Object)("replica before " + replicas[i]));
            Assert.assertTrue((boolean)metadataAfterReconstruction.getName().endsWith(blocks[i].getGenerationStamp() + ".meta"));
            byte[] replicaContentAfterReconstruction = DFSTestUtil.readFileAsBytes(replicaAfterReconstruction);
            Assert.assertArrayEquals((byte[])replicaContents[i], (byte[])replicaContentAfterReconstruction);
        }
    }

    private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
        int[] result = new int[blocks.length];
        for (int i = 0; i < blocks.length; ++i) {
            result[i] = -1;
            for (int j = 0; j < targetDNs.length; ++j) {
                File replica;
                if (targetDNs[j] == -1 || (replica = this.cluster.getBlockFile(targetDNs[j], blocks[i])) == null) continue;
                result[i] = targetDNs[j];
                targetDNs[j] = -1;
                break;
            }
            if (result[i] != -1) continue;
            Assert.fail((String)("Failed to reconstruct striped block: " + blocks[i].getBlockId()));
        }
        return result;
    }

    @Test
    public void testProcessErasureCodingTasksSubmitionShouldSucceed() throws Exception {
        DataNode dataNode = this.cluster.dataNodes.get((int)0).datanode;
        int size = this.cluster.dataNodes.size();
        byte[] liveIndices = new byte[size];
        DatanodeInfo[] dataDNs = new DatanodeInfo[size + 1];
        DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("s01"));
        DatanodeStorageInfo[] dnStorageInfo = new DatanodeStorageInfo[]{targetDnInfos_1};
        BlockECReconstructionCommand.BlockECReconstructionInfo invalidECInfo = new BlockECReconstructionCommand.BlockECReconstructionInfo(new ExtendedBlock("bp-id", 123456L), dataDNs, dnStorageInfo, liveIndices, StripedFileTestUtil.getDefaultECPolicy());
        ArrayList<BlockECReconstructionCommand.BlockECReconstructionInfo> ecTasks = new ArrayList<BlockECReconstructionCommand.BlockECReconstructionInfo>();
        ecTasks.add(invalidECInfo);
        dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
    }

    @Test(timeout=120000L)
    public void testNNSendsErasureCodingTasks() throws Exception {
        this.testNNSendsErasureCodingTasks(1);
        this.testNNSendsErasureCodingTasks(2);
    }

    private void testNNSendsErasureCodingTasks(int deadDN) throws Exception {
        this.cluster.shutdown();
        int numDataNodes = this.dnNum + 1;
        this.conf.setInt("dfs.namenode.reconstruction.pending.timeout-sec", 10);
        this.conf.setInt("dfs.namenode.replication.max-streams", 20);
        this.conf.setInt("dfs.datanode.ec.reconstruction.threads", 2);
        this.conf.setInt("dfs.client.socket-timeout", 5000);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(numDataNodes).build();
        this.cluster.waitActive();
        this.fs = this.cluster.getFileSystem();
        ErasureCodingPolicy policy = StripedFileTestUtil.getDefaultECPolicy();
        this.fs.getClient().setErasureCodingPolicy("/", policy.getName());
        int fileLen = this.cellSize * this.ecPolicy.getNumDataUnits();
        for (int i = 0; i < 50; ++i) {
            TestReconstructStripedFile.writeFile(this.fs, "/ec-file-" + i, fileLen);
        }
        Assert.assertTrue((policy.getNumParityUnits() >= deadDN ? 1 : 0) != 0);
        ArrayList<DataNode> dataNodes = new ArrayList<DataNode>(this.cluster.getDataNodes());
        Collections.shuffle(dataNodes);
        for (DataNode dn : dataNodes.subList(0, deadDN)) {
            this.shutdownDataNode(dn);
        }
        FSNamesystem ns = this.cluster.getNamesystem();
        GenericTestUtils.waitFor(() -> ns.getPendingDeletionBlocks() == 0L, (int)500, (int)30000);
        while (ns.getPendingReconstructionBlocks() > 0L) {
            long timeoutPending = ns.getNumTimedOutPendingReconstructions();
            Assert.assertTrue((String)String.format("Found %d timeout pending reconstruction tasks", timeoutPending), (timeoutPending == 0L ? 1 : 0) != 0);
            Thread.sleep(1000L);
        }
        GenericTestUtils.waitFor(() -> this.cluster.getDataNodes().stream().mapToInt(DataNode::getXmitsInProgress).sum() == 0, (int)500, (int)30000);
    }

    @Test(timeout=180000L)
    public void testErasureCodingWorkerXmitsWeight() throws Exception {
        this.testErasureCodingWorkerXmitsWeight(1.0f, this.ecPolicy.getNumDataUnits());
        this.testErasureCodingWorkerXmitsWeight(0.0f, 1);
        this.testErasureCodingWorkerXmitsWeight(10.0f, 10 * this.ecPolicy.getNumDataUnits());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testErasureCodingWorkerXmitsWeight(float weight, int expectedWeight) throws Exception {
        this.conf.setFloat("dfs.datanode.ec.reconstruction.xmits.weight", weight);
        this.cluster.shutdown();
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(this.dnNum).build();
        this.cluster.waitActive();
        this.fs = this.cluster.getFileSystem();
        this.fs.enableErasureCodingPolicy(StripedFileTestUtil.getDefaultECPolicy().getName());
        this.fs.getClient().setErasureCodingPolicy("/", StripedFileTestUtil.getDefaultECPolicy().getName());
        int fileLen = this.cellSize * this.ecPolicy.getNumDataUnits() * 2;
        TestReconstructStripedFile.writeFile(this.fs, "/ec-xmits-weight", fileLen);
        DataNode dn = this.cluster.getDataNodes().get(0);
        int corruptBlocks = dn.getFSDataset().getFinalizedBlocks(this.cluster.getNameNode().getNamesystem().getBlockPoolId()).size();
        int expectedXmits = corruptBlocks * expectedWeight;
        final CyclicBarrier barrier = new CyclicBarrier(corruptBlocks + 1);
        DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector delayInjector = new DataNodeFaultInjector(){

            public void stripedBlockReconstruction() throws IOException {
                try {
                    barrier.await();
                }
                catch (InterruptedException | BrokenBarrierException e) {
                    throw new IOException(e);
                }
            }
        };
        DataNodeFaultInjector.set((DataNodeFaultInjector)delayInjector);
        try {
            this.shutdownDataNode(dn);
            LambdaTestUtils.await((int)30000, (int)500, () -> {
                int totalXmits = this.cluster.getDataNodes().stream().mapToInt(DataNode::getXmitsInProgress).sum();
                return totalXmits == expectedXmits;
            });
        }
        finally {
            barrier.await();
            DataNodeFaultInjector.set((DataNodeFaultInjector)oldInjector);
        }
    }

    static {
        GenericTestUtils.setLogLevel((Logger)DFSClient.LOG, (Level)Level.ALL);
        GenericTestUtils.setLogLevel((Logger)BlockManager.LOG, (Level)Level.ALL);
        GenericTestUtils.setLogLevel((Logger)BlockManager.blockLog, (Level)Level.ALL);
    }

    static enum ReconstructionType {
        DataOnly,
        ParityOnly,
        Any;

    }
}

