/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskInProgress;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.TaskType;

public class TestSetupAndCleanupFailure
extends TestCase {
    final Path inDir = new Path("./input");
    final Path outDir = new Path("./output");
    static Path setupSignalFile = new Path("/setup-signal");
    static Path cleanupSignalFile = new Path("/cleanup-signal");
    String input = "The quick brown fox\nhas many silly\nred fox sox\n";

    public RunningJob launchJob(JobConf conf) throws IOException {
        FileSystem inFs = this.inDir.getFileSystem((Configuration)conf);
        FileSystem outFs = this.outDir.getFileSystem((Configuration)conf);
        outFs.delete(this.outDir, true);
        if (!inFs.mkdirs(this.inDir)) {
            throw new IOException("Mkdirs failed to create " + this.inDir.toString());
        }
        FSDataOutputStream file = inFs.create(new Path(this.inDir, "part-0"));
        file.writeBytes(this.input);
        file.close();
        conf.setMapperClass(IdentityMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{this.inDir});
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)this.outDir);
        String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "/tmp")).toString().replace(' ', '+');
        conf.set("test.build.data", TEST_ROOT_DIR);
        return new JobClient(conf).submitJob(conf);
    }

    private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) {
        TaskAttemptID taskid = null;
        while (taskid == null) {
            for (TaskInProgress tip : tips) {
                TaskStatus[] statuses;
                for (TaskStatus status : statuses = tip.getTaskStatuses()) {
                    if (status.getRunState() != TaskStatus.State.RUNNING) continue;
                    taskid = status.getTaskID();
                    break;
                }
                if (taskid != null) break;
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return taskid;
    }

    private void testFailCommitter(Class<? extends OutputCommitter> theClass, JobConf jobConf) throws IOException {
        jobConf.setOutputCommitter(theClass);
        RunningJob job = this.launchJob(jobConf);
        job.waitForCompletion();
        TestSetupAndCleanupFailure.assertEquals((int)3, (int)job.getJobState());
    }

    private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) throws IOException {
        JobConf jobConf = mr.createJobConf();
        jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
        RunningJob job = this.launchJob(jobConf);
        JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
        JobInProgress jip = jt.getJob(job.getID());
        while (!jip.inited()) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException ie) {}
        }
        return job;
    }

    private void testSetupAndCleanupKill(MiniMRCluster mr, MiniDFSCluster dfs, boolean commandLineKill) throws IOException {
        RunningJob job = this.launchJobWithWaitingSetupAndCleanup(mr);
        JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
        JobInProgress jip = jt.getJob(job.getID());
        TaskAttemptID setupID = this.getRunningTaskID(jip.getTasks(TaskType.JOB_SETUP));
        if (commandLineKill) {
            this.killTaskFromCommandLine(job, setupID, jt);
        } else {
            this.killTaskWithLostTracker(mr, setupID);
        }
        UtilsForTests.writeFile(dfs.getNameNode(), dfs.getFileSystem().getConf(), setupSignalFile, (short)3);
        while (job.reduceProgress() != 1.0f) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ie) {}
        }
        TaskAttemptID cleanupID = this.getRunningTaskID(jip.getTasks(TaskType.JOB_CLEANUP));
        if (commandLineKill) {
            this.killTaskFromCommandLine(job, cleanupID, jt);
        } else {
            this.killTaskWithLostTracker(mr, cleanupID);
        }
        UtilsForTests.writeFile(dfs.getNameNode(), dfs.getFileSystem().getConf(), cleanupSignalFile, (short)3);
        job.waitForCompletion();
        TestSetupAndCleanupFailure.assertEquals((int)2, (int)job.getJobState());
        TestSetupAndCleanupFailure.assertEquals((Object)TaskStatus.State.KILLED, (Object)jt.getTaskStatus(setupID).getRunState());
        TestSetupAndCleanupFailure.assertEquals((Object)TaskStatus.State.KILLED, (Object)jt.getTaskStatus(cleanupID).getRunState());
    }

    private void killTaskFromCommandLine(RunningJob job, TaskAttemptID taskid, JobTracker jt) throws IOException {
        job.killTask(taskid, false);
        while (jt.getTaskStatus(taskid).getRunState() != TaskStatus.State.KILLED) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException ie) {}
        }
    }

    private void killTaskWithLostTracker(MiniMRCluster mr, TaskAttemptID taskid) {
        JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
        String trackerName = jt.getTaskStatus(taskid).getTaskTracker();
        int trackerID = mr.getTaskTrackerID(trackerName);
        TestSetupAndCleanupFailure.assertTrue((trackerID != -1 ? 1 : 0) != 0);
        mr.stopTaskTracker(trackerID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWithDFS() throws IOException {
        MiniDFSCluster dfs = null;
        MiniMRCluster mr = null;
        FileSystem fileSys = null;
        try {
            int taskTrackers = 4;
            Configuration conf = new Configuration();
            dfs = new MiniDFSCluster(conf, 4, true, null);
            fileSys = dfs.getFileSystem();
            JobConf jtConf = new JobConf();
            jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
            jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
            jtConf.setLong("mapred.tasktracker.expiry.interval", 10000L);
            jtConf.setInt("mapred.reduce.copy.backoff", 4);
            mr = new MiniMRCluster(4, fileSys.getUri().toString(), 1, null, null, jtConf);
            this.testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
            this.testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
            this.testSetupAndCleanupKill(mr, dfs, true);
            fileSys.delete(setupSignalFile, true);
            fileSys.delete(cleanupSignalFile, true);
            this.testSetupAndCleanupKill(mr, dfs, false);
        }
        finally {
            if (dfs != null) {
                dfs.shutdown();
            }
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    public static void main(String[] argv) throws Exception {
        TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure();
        td.testWithDFS();
    }

    static class CommitterWithLongSetupAndCleanup
    extends FileOutputCommitter {
        CommitterWithLongSetupAndCleanup() {
        }

        private void waitForSignalFile(FileSystem fs, Path signalFile) throws IOException {
            while (!fs.exists(signalFile)) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException ie) {
                    break;
                }
            }
        }

        public void setupJob(JobContext context) throws IOException {
            this.waitForSignalFile(FileSystem.get((Configuration)context.getJobConf()), setupSignalFile);
            super.setupJob(context);
        }

        public void commitJob(JobContext context) throws IOException {
            this.waitForSignalFile(FileSystem.get((Configuration)context.getJobConf()), cleanupSignalFile);
            super.commitJob(context);
        }
    }

    static class CommitterWithFailCleanup
    extends FileOutputCommitter {
        CommitterWithFailCleanup() {
        }

        public void commitJob(JobContext context) throws IOException {
            throw new IOException();
        }
    }

    static class CommitterWithFailSetup
    extends FileOutputCommitter {
        CommitterWithFailSetup() {
        }

        public void setupJob(JobContext context) throws IOException {
            throw new IOException();
        }
    }
}

