/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.record;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Iterator;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.record.RecInt;
import org.apache.hadoop.record.RecString;

public class TestRecordMR
extends TestCase {
    private static int range = 10;
    private static int counts = 100;
    private static Random r = new Random();
    private static Configuration conf = new Configuration();

    public void testMapred() throws Exception {
        TestRecordMR.launch();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void launch() throws Exception {
        Path testdir;
        FileSystem fs;
        int countsToGo = counts;
        int[] dist = new int[range];
        for (int i = 0; i < range; ++i) {
            double avgInts = 1.0 * (double)countsToGo / (double)(range - i);
            dist[i] = (int)Math.max(0L, Math.round(avgInts + Math.sqrt(avgInts) * r.nextGaussian()));
            countsToGo -= dist[i];
        }
        if (countsToGo > 0) {
            int n = dist.length - 1;
            dist[n] = dist[n] + countsToGo;
        }
        if (!(fs = FileSystem.get((Configuration)conf)).mkdirs(testdir = new Path("mapred.loadtest"))) {
            throw new IOException("Mkdirs failed to create directory " + testdir.toString());
        }
        Path randomIns = new Path(testdir, "genins");
        if (!fs.mkdirs(randomIns)) {
            throw new IOException("Mkdirs failed to create directory " + randomIns.toString());
        }
        Path answerkey = new Path(randomIns, "answer.key");
        SequenceFile.Writer out = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (Path)answerkey, RecInt.class, RecInt.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE);
        try {
            for (int i = 0; i < range; ++i) {
                RecInt k = new RecInt();
                RecInt v = new RecInt();
                k.setData(i);
                v.setData(dist[i]);
                out.append((Writable)k, (Writable)v);
            }
        }
        finally {
            out.close();
        }
        Path randomOuts = new Path(testdir, "genouts");
        fs.delete(randomOuts, true);
        JobConf genJob = new JobConf(conf, TestRecordMR.class);
        FileInputFormat.setInputPaths((JobConf)genJob, (Path[])new Path[]{randomIns});
        genJob.setInputFormat(SequenceFileInputFormat.class);
        genJob.setMapperClass(RandomGenMapper.class);
        FileOutputFormat.setOutputPath((JobConf)genJob, (Path)randomOuts);
        genJob.setOutputKeyClass(RecInt.class);
        genJob.setOutputValueClass(RecString.class);
        genJob.setOutputFormat(SequenceFileOutputFormat.class);
        genJob.setReducerClass(RandomGenReducer.class);
        genJob.setNumReduceTasks(1);
        JobClient.runJob((JobConf)genJob);
        int intermediateReduces = 10;
        Path intermediateOuts = new Path(testdir, "intermediateouts");
        fs.delete(intermediateOuts, true);
        JobConf checkJob = new JobConf(conf, TestRecordMR.class);
        FileInputFormat.setInputPaths((JobConf)checkJob, (Path[])new Path[]{randomOuts});
        checkJob.setInputFormat(SequenceFileInputFormat.class);
        checkJob.setMapperClass(RandomCheckMapper.class);
        FileOutputFormat.setOutputPath((JobConf)checkJob, (Path)intermediateOuts);
        checkJob.setOutputKeyClass(RecInt.class);
        checkJob.setOutputValueClass(RecString.class);
        checkJob.setOutputFormat(SequenceFileOutputFormat.class);
        checkJob.setReducerClass(RandomCheckReducer.class);
        checkJob.setNumReduceTasks(intermediateReduces);
        JobClient.runJob((JobConf)checkJob);
        Path finalOuts = new Path(testdir, "finalouts");
        fs.delete(finalOuts, true);
        JobConf mergeJob = new JobConf(conf, TestRecordMR.class);
        FileInputFormat.setInputPaths((JobConf)mergeJob, (Path[])new Path[]{intermediateOuts});
        mergeJob.setInputFormat(SequenceFileInputFormat.class);
        mergeJob.setMapperClass(MergeMapper.class);
        FileOutputFormat.setOutputPath((JobConf)mergeJob, (Path)finalOuts);
        mergeJob.setOutputKeyClass(RecInt.class);
        mergeJob.setOutputValueClass(RecInt.class);
        mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
        mergeJob.setReducerClass(MergeReducer.class);
        mergeJob.setNumReduceTasks(1);
        JobClient.runJob((JobConf)mergeJob);
        boolean success = true;
        Path recomputedkey = new Path(finalOuts, "part-00000");
        SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
        int totalseen = 0;
        try {
            RecInt key = new RecInt();
            RecInt val = new RecInt();
            for (int i = 0; i < range; ++i) {
                if (dist[i] == 0) continue;
                if (!in.next((Writable)key, (Writable)val)) {
                    System.err.println("Cannot read entry " + i);
                    success = false;
                    break;
                }
                if (key.getData() != i || val.getData() != dist[i]) {
                    System.err.println("Mismatch!  Pos=" + key.getData() + ", i=" + i + ", val=" + val.getData() + ", dist[i]=" + dist[i]);
                    success = false;
                }
                totalseen += val.getData();
            }
            if (success && in.next((Writable)key, (Writable)val)) {
                System.err.println("Unnecessary lines in recomputed key!");
                success = false;
            }
        }
        finally {
            in.close();
        }
        int originalTotal = 0;
        for (int i = 0; i < dist.length; ++i) {
            originalTotal += dist[i];
        }
        System.out.println("Original sum: " + originalTotal);
        System.out.println("Recomputed sum: " + totalseen);
        Path resultFile = new Path(testdir, "results");
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter((OutputStream)fs.create(resultFile)));
        try {
            bw.write("Success=" + success + "\n");
            System.out.println("Success=" + success);
        }
        finally {
            bw.close();
        }
        fs.delete(testdir, true);
    }

    public static void main(String[] argv) throws Exception {
        if (argv.length < 2) {
            System.err.println("Usage: TestRecordMR <range> <counts>");
            System.err.println();
            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
            return;
        }
        int i = 0;
        int range = Integer.parseInt(argv[i++]);
        int counts = Integer.parseInt(argv[i++]);
        TestRecordMR.launch();
    }

    public static class MergeReducer
    implements Reducer<RecInt, RecInt, RecInt, RecInt> {
        public void configure(JobConf job) {
        }

        public void reduce(RecInt key, Iterator<RecInt> it, OutputCollector<RecInt, RecInt> out, Reporter reporter) throws IOException {
            int keyint = key.getData();
            int total = 0;
            while (it.hasNext()) {
                total += it.next().getData();
            }
            out.collect((Object)new RecInt(keyint), (Object)new RecInt(total));
        }

        public void close() {
        }
    }

    public static class MergeMapper
    implements Mapper<RecInt, RecString, RecInt, RecInt> {
        public void configure(JobConf job) {
        }

        public void map(RecInt key, RecString val, OutputCollector<RecInt, RecInt> out, Reporter reporter) throws IOException {
            int keyint = key.getData();
            String valstr = val.getData();
            out.collect((Object)new RecInt(keyint), (Object)new RecInt(Integer.parseInt(valstr)));
        }

        public void close() {
        }
    }

    public static class RandomCheckReducer
    implements Reducer<RecInt, RecString, RecInt, RecString> {
        public void configure(JobConf job) {
        }

        public void reduce(RecInt key, Iterator<RecString> it, OutputCollector<RecInt, RecString> out, Reporter reporter) throws IOException {
            int keyint = key.getData();
            int count = 0;
            while (it.hasNext()) {
                it.next();
                ++count;
            }
            out.collect((Object)new RecInt(keyint), (Object)new RecString(Integer.toString(count)));
        }

        public void close() {
        }
    }

    public static class RandomCheckMapper
    implements Mapper<RecInt, RecString, RecInt, RecString> {
        public void configure(JobConf job) {
        }

        public void map(RecInt key, RecString val, OutputCollector<RecInt, RecString> out, Reporter reporter) throws IOException {
            int pos = key.getData();
            String str = val.getData();
            out.collect((Object)new RecInt(pos), (Object)new RecString("1"));
        }

        public void close() {
        }
    }

    public static class RandomGenReducer
    implements Reducer<RecInt, RecString, RecInt, RecString> {
        public void configure(JobConf job) {
        }

        public void reduce(RecInt key, Iterator<RecString> it, OutputCollector<RecInt, RecString> out, Reporter reporter) throws IOException {
            int keyint = key.getData();
            while (it.hasNext()) {
                String val = it.next().getData();
                out.collect((Object)new RecInt(Integer.parseInt(val)), (Object)new RecString(""));
            }
        }

        public void close() {
        }
    }

    public static class RandomGenMapper
    implements Mapper<RecInt, RecInt, RecInt, RecString> {
        Random r = new Random();

        public void configure(JobConf job) {
        }

        public void map(RecInt key, RecInt val, OutputCollector<RecInt, RecString> out, Reporter reporter) throws IOException {
            int randomVal = key.getData();
            int randomCount = val.getData();
            for (int i = 0; i < randomCount; ++i) {
                out.collect((Object)new RecInt(Math.abs(this.r.nextInt())), (Object)new RecString(Integer.toString(randomVal)));
            }
        }

        public void close() {
        }
    }
}

