/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskScheduler;
import org.apache.hadoop.mapred.TestJobInProgressListener;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Ignore;

@Ignore
public class TestRecoveryManager
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestRecoveryManager.class);
    private static final Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "test-recovery-manager");

    public void testJobTracker() throws Exception {
        LOG.info((Object)"Testing jobtracker restart with faulty job");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        JobConf conf = new JobConf();
        FileSystem fs = FileSystem.get((Configuration)new Configuration());
        fs.delete(TEST_DIR, true);
        conf.set("mapred.jobtracker.job.history.block.size", "1024");
        conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
        JobConf job1 = mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob1 = new JobClient(job1).submitJob(job1);
        LOG.info((Object)("Submitted job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        while (rJob2.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
        mr.stopJobTracker();
        Path jobFile = new Path(sysDir, rJob1.getID().toString() + "/" + "job-info");
        LOG.info((Object)("Deleting job token file : " + jobFile.toString()));
        fs.delete(jobFile, false);
        FSDataOutputStream out = fs.create(jobFile);
        out.write(1);
        out.close();
        mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        LOG.info((Object)"Starting jobtracker");
        mr.startJobTracker();
        ClusterStatus status = mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
        TestRecoveryManager.assertEquals((String)"JobTracker crashed!", (Object)JobTracker.State.RUNNING, (Object)status.getJobTrackerState());
        mr.shutdown();
    }

    public void testRecoveryManager() throws Exception {
        LOG.info((Object)"Testing recovery-manager");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        FileSystem fs = FileSystem.get((Configuration)new Configuration());
        fs.delete(TEST_DIR, true);
        JobConf conf = new JobConf();
        conf.set("mapred.jobtracker.job.history.block.size", "1024");
        conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
        JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = mr.createJobConf();
        String signalFile1 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, "test-recovery-manager", signalFile1, signalFile1);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        JobInProgress jip = jobtracker.getJob(rJob2.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        final JobConf job3 = mr.createJobConf();
        UserGroupInformation ugi3 = UserGroupInformation.createUserForTesting((String)"abc", (String[])new String[]{"users"});
        UtilsForTests.configureWaitingJobConf(job3, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob3 = (RunningJob)ugi3.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                return new JobClient(job3).submitJob(job3);
            }
        });
        LOG.info((Object)("Submitted job " + rJob3.getID() + " with different user"));
        jip = jobtracker.getJob(rJob3.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        mr.stopJobTracker();
        mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
        mr.getJobTrackerConf().setBoolean("mapred.acls.enabled", true);
        UserGroupInformation ugi = UserGroupInformation.getLoginUser();
        mr.getJobTrackerConf().set(QueueManager.toFullPropertyName((String)"default", (String)QueueManager.QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
        LOG.info((Object)"Starting jobtracker");
        mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = mr.getJobTrackerRunner().getJobTracker();
        TestRecoveryManager.assertEquals((String)"Recovery manager failed to tolerate job failures", (int)2, (int)jobtracker.getAllJobs().length);
        JobStatus status = jobtracker.getJobStatus(rJob1.getID());
        TestRecoveryManager.assertEquals((String)"Faulty job not failed", (int)3, (int)status.getRunState());
        jip = jobtracker.getJob(rJob2.getID());
        TestRecoveryManager.assertFalse((String)"Job should be running", (boolean)jip.isComplete());
        status = jobtracker.getJobStatus(rJob3.getID());
        TestRecoveryManager.assertNull((String)"Job should be missing", (Object)status);
        mr.shutdown();
    }

    public void testRestartCount() throws Exception {
        JobInProgress jip;
        LOG.info((Object)"Testing restart-count");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        FileSystem fs = FileSystem.get((Configuration)new Configuration());
        fs.delete(TEST_DIR, true);
        JobConf conf = new JobConf();
        conf.set("mapred.jobtracker.job.history.block.size", "1024");
        conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
        conf.setBoolean("mapred.jobtracker.restart.recover", true);
        conf.setClass("mapred.jobtracker.taskScheduler", TestJobInProgressListener.MyScheduler.class, TaskScheduler.class);
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
        JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
        JobClient jc = new JobClient(mr.createJobConf());
        Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
        TestRecoveryManager.assertTrue((String)"Jobtracker infomation is missing", (boolean)fs.exists(infoFile));
        LOG.info((Object)"Stopping jobtracker for testing with system files deleted");
        mr.stopJobTracker();
        Path rFile = jobtracker.recoveryManager.getRestartCountFile();
        fs.delete(rFile, false);
        LOG.info((Object)"Starting jobtracker with system files deleted");
        mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = mr.getJobTrackerRunner().getJobTracker();
        TestRecoveryManager.assertFalse((String)"Recovery is not disabled upon missing system files", (boolean)jobtracker.recoveryManager.shouldRecover());
        TestRecoveryManager.assertTrue((String)"Recovery file is missing upon restart", (boolean)fs.exists(rFile));
        Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
        TestRecoveryManager.assertFalse((String)"Temp recovery file exists upon restart", (boolean)fs.exists(tFile));
        JobConf job = mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob = jc.submitJob(job);
        LOG.info((Object)("Submitted first job " + rJob.getID()));
        UtilsForTests.waitFor(60000L);
        for (int i = 1; i <= 5; ++i) {
            LOG.info((Object)("Stopping jobtracker for " + i + " time"));
            mr.stopJobTracker();
            LOG.info((Object)("Starting jobtracker for " + i + " time"));
            mr.startJobTracker();
            UtilsForTests.waitForJobTracker(jc);
            TestRecoveryManager.assertTrue((String)"Recovery file is missing upon restart", (boolean)fs.exists(rFile));
            TestRecoveryManager.assertFalse((String)"Temp recovery file exists upon restart", (boolean)fs.exists(tFile));
            jobtracker = mr.getJobTrackerRunner().getJobTracker();
            jip = jobtracker.getJob(rJob.getID());
            TestRecoveryManager.assertEquals((String)"Recovery manager failed to recover restart count", (int)i, (int)jip.getNumRestarts());
        }
        rJob.killJob();
        JobConf job1 = mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, "test-recovery-manager", signalFile, signalFile);
        rJob = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job after restart" + rJob.getID()));
        jip = jobtracker.getJob(rJob.getID());
        TestRecoveryManager.assertEquals((String)"Restart count for new job is incorrect", (int)0, (int)jip.getNumRestarts());
        LOG.info((Object)"Stopping jobtracker for testing the fs errors");
        mr.stopJobTracker();
        fs.delete(rFile, false);
        FSDataOutputStream out = fs.create(rFile);
        out.writeBoolean(true);
        out.close();
        LOG.info((Object)"Starting jobtracker with fs errors");
        mr.startJobTracker();
        MiniMRCluster.JobTrackerRunner runner = mr.getJobTrackerRunner();
        TestRecoveryManager.assertFalse((String)"JobTracker is still alive", (boolean)runner.isActive());
        mr.shutdown();
    }

    public void testJobTrackerInfoCreation() throws Exception {
        LOG.info((Object)"Testing jobtracker.info file");
        MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
        String namenode = dfs.getFileSystem().getUri().getHost() + ":" + dfs.getFileSystem().getUri().getPort();
        dfs.shutdownDataNodes();
        JobConf conf = new JobConf();
        FileSystem.setDefaultUri((Configuration)conf, (String)namenode);
        conf.set("mapred.job.tracker", "localhost:0");
        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
        JobTracker jobtracker = new JobTracker(conf);
        boolean failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        TestRecoveryManager.assertTrue((String)"JobTracker created info files without datanodes!!!", (boolean)failed);
        Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
        Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
        FileSystem fs = dfs.getFileSystem();
        TestRecoveryManager.assertFalse((String)"Info file exists after update failure", (boolean)fs.exists(restartFile));
        TestRecoveryManager.assertFalse((String)"Temporary restart-file exists after update failure", (boolean)fs.exists(restartFile));
        dfs.startDataNodes((Configuration)conf, 1, true, null, null, null, null);
        dfs.waitActive();
        failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        TestRecoveryManager.assertFalse((String)"JobTracker failed to create info files with datanodes!!!", (boolean)failed);
    }
}

