/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.jobcontrol;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class JobControl
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(JobControl.class);
    private ThreadState runnerState;
    private LinkedList<ControlledJob> jobsInProgress = new LinkedList();
    private LinkedList<ControlledJob> successfulJobs = new LinkedList();
    private LinkedList<ControlledJob> failedJobs = new LinkedList();
    private long nextJobID = -1L;
    private String groupName;

    public JobControl(String groupName) {
        this.groupName = groupName;
        this.runnerState = ThreadState.READY;
    }

    private static List<ControlledJob> toList(LinkedList<ControlledJob> jobs) {
        ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
        for (ControlledJob job : jobs) {
            retv.add(job);
        }
        return retv;
    }

    private synchronized List<ControlledJob> getJobsIn(ControlledJob.State state) {
        LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
        for (ControlledJob j : this.jobsInProgress) {
            if (j.getJobState() != state) continue;
            l.add(j);
        }
        return l;
    }

    public List<ControlledJob> getWaitingJobList() {
        return this.getJobsIn(ControlledJob.State.WAITING);
    }

    public List<ControlledJob> getRunningJobList() {
        return this.getJobsIn(ControlledJob.State.RUNNING);
    }

    public List<ControlledJob> getReadyJobsList() {
        return this.getJobsIn(ControlledJob.State.READY);
    }

    public synchronized List<ControlledJob> getSuccessfulJobList() {
        return JobControl.toList(this.successfulJobs);
    }

    public synchronized List<ControlledJob> getFailedJobList() {
        return JobControl.toList(this.failedJobs);
    }

    private String getNextJobID() {
        ++this.nextJobID;
        return this.groupName + this.nextJobID;
    }

    public synchronized String addJob(ControlledJob aJob) {
        String id = this.getNextJobID();
        aJob.setJobID(id);
        aJob.setJobState(ControlledJob.State.WAITING);
        this.jobsInProgress.add(aJob);
        return id;
    }

    public synchronized String addJob(Job aJob) {
        return this.addJob((ControlledJob)aJob);
    }

    public void addJobCollection(Collection<ControlledJob> jobs) {
        for (ControlledJob job : jobs) {
            this.addJob(job);
        }
    }

    public ThreadState getThreadState() {
        return this.runnerState;
    }

    public void stop() {
        this.runnerState = ThreadState.STOPPING;
    }

    public void suspend() {
        if (this.runnerState == ThreadState.RUNNING) {
            this.runnerState = ThreadState.SUSPENDED;
        }
    }

    public void resume() {
        if (this.runnerState == ThreadState.SUSPENDED) {
            this.runnerState = ThreadState.RUNNING;
        }
    }

    public synchronized boolean allFinished() {
        return this.jobsInProgress.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (this.isCircular(this.jobsInProgress)) {
            throw new IllegalArgumentException("job control has circular dependency");
        }
        try {
            this.runnerState = ThreadState.RUNNING;
            while (true) {
                if (this.runnerState == ThreadState.SUSPENDED) {
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (Exception exception) {}
                    continue;
                }
                JobControl jobControl = this;
                synchronized (jobControl) {
                    Iterator it = this.jobsInProgress.iterator();
                    while (it.hasNext()) {
                        ControlledJob j = (ControlledJob)it.next();
                        LOG.debug("Checking state of job " + j);
                        switch (j.checkState()) {
                            case SUCCESS: {
                                this.successfulJobs.add(j);
                                it.remove();
                                break;
                            }
                            case FAILED: 
                            case DEPENDENT_FAILED: {
                                this.failedJobs.add(j);
                                it.remove();
                                break;
                            }
                            case READY: {
                                j.submit();
                                break;
                            }
                        }
                    }
                }
                if (this.runnerState != ThreadState.RUNNING && this.runnerState != ThreadState.SUSPENDED) break;
                try {
                    Thread.sleep(5000L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                if (this.runnerState != ThreadState.RUNNING && this.runnerState != ThreadState.SUSPENDED) break;
            }
        }
        catch (Throwable t) {
            LOG.error("Error while trying to run jobs.", t);
            this.failAllJobs(t);
        }
        this.runnerState = ThreadState.STOPPED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void failAllJobs(Throwable t) {
        String message = "Unexpected System Error Occurred: " + StringUtils.stringifyException(t);
        Iterator it = this.jobsInProgress.iterator();
        while (it.hasNext()) {
            ControlledJob j = (ControlledJob)it.next();
            try {
                j.failJob(message);
            }
            catch (IOException e) {
                LOG.error("Error while tyring to clean up " + j.getJobName(), e);
            }
            catch (InterruptedException e) {
                LOG.error("Error while tyring to clean up " + j.getJobName(), e);
            }
            finally {
                this.failedJobs.add(j);
                it.remove();
            }
        }
    }

    private boolean isCircular(List<ControlledJob> jobList) {
        boolean cyclePresent = false;
        HashSet<ControlledJob> SourceSet = new HashSet<ControlledJob>();
        HashMap<ControlledJob, List<ControlledJob>> processedMap = new HashMap<ControlledJob, List<ControlledJob>>();
        for (ControlledJob n : jobList) {
            processedMap.put(n, new ArrayList());
        }
        for (ControlledJob n : jobList) {
            if (this.hasInComingEdge(n, jobList, processedMap)) continue;
            SourceSet.add(n);
        }
        while (!SourceSet.isEmpty()) {
            ControlledJob controlledJob = (ControlledJob)SourceSet.iterator().next();
            SourceSet.remove(controlledJob);
            if (controlledJob.getDependentJobs() == null) continue;
            for (int i = 0; i < controlledJob.getDependentJobs().size(); ++i) {
                ControlledJob depenControlledJob = controlledJob.getDependentJobs().get(i);
                processedMap.get(controlledJob).add(depenControlledJob);
                if (this.hasInComingEdge(controlledJob, jobList, processedMap)) continue;
                SourceSet.add(depenControlledJob);
            }
        }
        for (ControlledJob controlledJob : jobList) {
            if (controlledJob.getDependentJobs() == null || controlledJob.getDependentJobs().size() == processedMap.get(controlledJob).size()) continue;
            cyclePresent = true;
            LOG.error("Job control has circular dependency for the  job " + controlledJob.getJobName());
            break;
        }
        return cyclePresent;
    }

    private boolean hasInComingEdge(ControlledJob controlledJob, List<ControlledJob> controlledJobList, HashMap<ControlledJob, List<ControlledJob>> processedMap) {
        boolean hasIncomingEdge = false;
        for (ControlledJob k : controlledJobList) {
            if (k == controlledJob || k.getDependentJobs() == null || processedMap.get(k).contains(controlledJob) || !k.getDependentJobs().contains(controlledJob)) continue;
            hasIncomingEdge = true;
            break;
        }
        return hasIncomingEdge;
    }

    public static enum ThreadState {
        RUNNING,
        SUSPENDED,
        STOPPED,
        STOPPING,
        READY;

    }
}

