/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hdt.hadoop.release;

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hdt.core.launch.AbstractHadoopCluster;
import org.apache.hdt.core.launch.IHadoopJob;
import org.eclipse.core.runtime.internal.adaptor.ContextFinder;

/**
 * Representation of a Map/Reduce running job on a given location
 */

public class HadoopJob implements IHadoopJob {

	/**
	 * Enum representation of a Job state
	 */
	public enum JobState {
		PREPARE(JobStatus.PREP), RUNNING(JobStatus.RUNNING), FAILED(JobStatus.FAILED), SUCCEEDED(JobStatus.SUCCEEDED);

		final int state;

		JobState(int state) {
			this.state = state;
		}

		static JobState ofInt(int state) {
			if (state == JobStatus.PREP) {
				return PREPARE;
			} else if (state == JobStatus.RUNNING) {
				return RUNNING;
			} else if (state == JobStatus.FAILED) {
				return FAILED;
			} else if (state == JobStatus.SUCCEEDED) {
				return SUCCEEDED;
			} else {
				return null;
			}
		}
	}

	/**
	 * Location this Job runs on
	 */
	private final HadoopCluster location;

	/**
	 * Unique identifier of this Job
	 */
	final JobID jobId;

	/**
	 * Status representation of a running job. This actually contains a
	 * reference to a JobClient. Its methods might block.
	 */
	RunningJob running;

	/**
	 * Last polled status
	 * 
	 * @deprecated should apparently not be used
	 */
	JobStatus status;

	/**
	 * Last polled counters
	 */
	Counters counters;

	/**
	 * Job Configuration
	 */
	JobConf jobConf = null;

	boolean completed = false;

	boolean successful = false;

	boolean killed = false;

	int totalMaps;

	int totalReduces;

	int completedMaps;

	int completedReduces;

	float mapProgress;

	float reduceProgress;

	/**
	 * Constructor for a Hadoop job representation
	 * 
	 * @param location
	 * @param id
	 * @param running
	 * @param status
	 */
	public HadoopJob(HadoopCluster location, JobID id, RunningJob running, JobStatus status) {
		//HadoopCluster.updateCurrentClassLoader();

		this.location = location;
		this.jobId = id;
		this.running = running;

		loadJobFile();

		update(status);
	}

	/**
	 * Try to locate and load the JobConf file for this job so to get more
	 * details on the job (number of maps and of reduces)
	 */
	private void loadJobFile() {
		try {
			String jobFile = getJobFile();
			FileSystem fs = location.getDFS();
			File tmp = File.createTempFile(getJobID().toString(), ".xml");
			if (FileUtil.copy(fs, new Path(jobFile), tmp, false, location.getConf())) {
				this.jobConf = new JobConf(tmp.toString());

				this.totalMaps = jobConf.getNumMapTasks();
				this.totalReduces = jobConf.getNumReduceTasks();
			}

		} catch (IOException ioe) {
			ioe.printStackTrace();
		}
	}

	/* @inheritDoc */
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
		result = prime * result + ((location == null) ? 0 : location.hashCode());
		return result;
	}

	/* @inheritDoc */
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (!(obj instanceof HadoopJob))
			return false;
		final HadoopJob other = (HadoopJob) obj;
		if (jobId == null) {
			if (other.jobId != null)
				return false;
		} else if (!jobId.equals(other.jobId))
			return false;
		if (location == null) {
			if (other.location != null)
				return false;
		} else if (!location.equals(other.location))
			return false;
		return true;
	}

	/**
	 * Get the running status of the Job (@see {@link JobStatus}).
	 * 
	 * @return
	 */
	public String getState() {
		if (this.completed) {
			if (this.successful) {
				return JobState.SUCCEEDED.toString();
			} else {
				return JobState.FAILED.toString();
			}
		} else {
			return JobState.RUNNING.toString();
		}
		// return JobState.ofInt(this.status.getRunState());
	}

	/**
	 * @return
	 */
	public String getJobID() {
		return this.jobId.toString();
	}

	/**
	 * @return
	 */
	public AbstractHadoopCluster getLocation() {
		return this.location;
	}

	/**
	 * @return
	 */
	public boolean isCompleted() {
		return this.completed;
	}

	/**
	 * @return
	 */
	public String getJobName() {
		return this.running.getJobName();
	}

	/**
	 * @return
	 */
	public String getJobFile() {
		return this.running.getJobFile();
	}

	/**
	 * Return the tracking URL for this Job.
	 * 
	 * @return string representation of the tracking URL for this Job
	 */
	public String getTrackingURL() {
		return this.running.getTrackingURL();
	}

	/**
	 * Returns a string representation of this job status
	 * 
	 * @return string representation of this job status
	 */
	public String getStatus() {

		StringBuffer s = new StringBuffer();

		s.append("Maps : " + completedMaps + "/" + totalMaps);
		s.append(" (" + mapProgress + ")");
		s.append("  Reduces : " + completedReduces + "/" + totalReduces);
		s.append(" (" + reduceProgress + ")");

		return s.toString();
	}

	/**
	 * Update this job status according to the given JobStatus
	 * 
	 * @param status
	 */
	void update(JobStatus status) {
		this.status = status;
		try {
			this.counters = running.getCounters();
			this.completed = running.isComplete();
			this.successful = running.isSuccessful();
			this.mapProgress = running.mapProgress();
			this.reduceProgress = running.reduceProgress();
			// running.getTaskCompletionEvents(fromEvent);

		} catch (IOException ioe) {
			ioe.printStackTrace();
		}

		this.completedMaps = (int) (this.totalMaps * this.mapProgress);
		this.completedReduces = (int) (this.totalReduces * this.reduceProgress);
	}

	/**
	 * Print this job counters (for debugging purpose)
	 */
	void printCounters() {
		System.out.printf("New Job:\n", counters);
		for (String groupName : counters.getGroupNames()) {
			Counters.Group group = counters.getGroup(groupName);
			System.out.printf("\t%s[%s]\n", groupName, group.getDisplayName());

			for (Counters.Counter counter : group) {
				System.out.printf("\t\t%s: %s\n", counter.getDisplayName(), counter.getCounter());
			}
		}
		System.out.printf("\n");
	}

	/**
	 * Kill this job
	 */
	public void kill() {
		try {
			this.running.killJob();
			this.killed = true;

		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Print this job status (for debugging purpose)
	 */
	public void display() {
		System.out.printf("Job id=%s, name=%s\n", getJobID(), getJobName());
		System.out.printf("Configuration file: %s\n", getJobID());
		System.out.printf("Tracking URL: %s\n", getTrackingURL());

		System.out.printf("Completion: map: %f reduce %f\n", 100.0 * this.mapProgress, 100.0 * this.reduceProgress);

		System.out.println("Job total maps = " + totalMaps);
		System.out.println("Job completed maps = " + completedMaps);
		System.out.println("Map percentage complete = " + mapProgress);
		System.out.println("Job total reduces = " + totalReduces);
		System.out.println("Job completed reduces = " + completedReduces);
		System.out.println("Reduce percentage complete = " + reduceProgress);
		System.out.flush();
	}

}
