| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.hadoop.util.StringUtils; |
| /************************************************** |
| * Describes the current status of a task. This is |
| * not intended to be a comprehensive piece of data. |
| * |
| **************************************************/ |
| public abstract class TaskStatus implements Writable, Cloneable { |
| static final Log LOG = |
| LogFactory.getLog(TaskStatus.class.getName()); |
| |
| //enumeration for reporting current phase of a task. |
| public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP} |
| |
| // what state is the task in? |
| public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, |
| COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN} |
| |
| private final TaskAttemptID taskid; |
| private float progress; |
| private volatile State runState; |
| private String diagnosticInfo; |
| private String stateString; |
| private String taskTracker; |
| private int numSlots; |
| |
| private long startTime; //in ms |
| private long finishTime; |
| private long outputSize; |
| |
| private volatile Phase phase = Phase.STARTING; |
| private Counters counters; |
| private boolean includeCounters; |
| private SortedRanges.Range nextRecordRange = new SortedRanges.Range(); |
| |
| public TaskStatus() { |
| taskid = new TaskAttemptID(); |
| numSlots = 0; |
| } |
| |
| public TaskStatus(TaskAttemptID taskid, float progress, int numSlots, |
| State runState, String diagnosticInfo, |
| String stateString, String taskTracker, |
| Phase phase, Counters counters) { |
| this.taskid = taskid; |
| this.progress = progress; |
| this.numSlots = numSlots; |
| this.runState = runState; |
| this.diagnosticInfo = diagnosticInfo; |
| this.stateString = stateString; |
| this.taskTracker = taskTracker; |
| this.phase = phase; |
| this.counters = counters; |
| this.includeCounters = true; |
| } |
| |
| public TaskAttemptID getTaskID() { return taskid; } |
| public abstract boolean getIsMap(); |
| public int getNumSlots() { |
| return numSlots; |
| } |
| |
| public float getProgress() { return progress; } |
| public void setProgress(float progress) { |
| this.progress = progress; |
| } |
| public State getRunState() { return runState; } |
| public String getTaskTracker() {return taskTracker;} |
| public void setTaskTracker(String tracker) { this.taskTracker = tracker;} |
| public void setRunState(State runState) { this.runState = runState; } |
| public String getDiagnosticInfo() { return diagnosticInfo; } |
| public void setDiagnosticInfo(String info) { |
| diagnosticInfo = |
| ((diagnosticInfo == null) ? info : diagnosticInfo.concat(info)); |
| } |
| public String getStateString() { return stateString; } |
| public void setStateString(String stateString) { this.stateString = stateString; } |
| |
| /** |
| * Get the next record range which is going to be processed by Task. |
| * @return nextRecordRange |
| */ |
| public SortedRanges.Range getNextRecordRange() { |
| return nextRecordRange; |
| } |
| |
| /** |
| * Set the next record range which is going to be processed by Task. |
| * @param nextRecordRange |
| */ |
| public void setNextRecordRange(SortedRanges.Range nextRecordRange) { |
| this.nextRecordRange = nextRecordRange; |
| } |
| |
| /** |
| * Get task finish time. if shuffleFinishTime and sortFinishTime |
| * are not set before, these are set to finishTime. It takes care of |
| * the case when shuffle, sort and finish are completed with in the |
| * heartbeat interval and are not reported separately. if task state is |
| * TaskStatus.FAILED then finish time represents when the task failed. |
| * @return finish time of the task. |
| */ |
| public long getFinishTime() { |
| return finishTime; |
| } |
| |
| /** |
| * Sets finishTime for the task status if and only if the |
| * start time is set and passed finish time is greater than |
| * zero. |
| * |
| * @param finishTime finish time of task. |
| */ |
| void setFinishTime(long finishTime) { |
| if(this.getStartTime() > 0 && finishTime > 0) { |
| this.finishTime = finishTime; |
| } else { |
| //Using String utils to get the stack trace. |
| LOG.error("Trying to set finish time for task " + taskid + |
| " when no start time is set, stackTrace is : " + |
| StringUtils.stringifyException(new Exception())); |
| } |
| } |
| /** |
| * Get shuffle finish time for the task. If shuffle finish time was |
| * not set due to shuffle/sort/finish phases ending within same |
| * heartbeat interval, it is set to finish time of next phase i.e. sort |
| * or task finish when these are set. |
| * @return 0 if shuffleFinishTime, sortFinishTime and finish time are not set. else |
| * it returns approximate shuffle finish time. |
| */ |
| public long getShuffleFinishTime() { |
| return 0; |
| } |
| |
| /** |
| * Set shuffle finish time. |
| * @param shuffleFinishTime |
| */ |
| void setShuffleFinishTime(long shuffleFinishTime) {} |
| |
| /** |
| * Get map phase finish time for the task. If map finsh time was |
| * not set due to sort phase ending within same heartbeat interval, |
| * it is set to finish time of next phase i.e. sort phase |
| * when it is set. |
| * @return 0 if mapFinishTime, sortFinishTime are not set. else |
| * it returns approximate map finish time. |
| */ |
| public long getMapFinishTime() { |
| return 0; |
| } |
| |
| /** |
| * Set map phase finish time. |
| * @param mapFinishTime |
| */ |
| void setMapFinishTime(long mapFinishTime) {} |
| |
| /** |
| * Get sort finish time for the task,. If sort finish time was not set |
| * due to sort and reduce phase finishing in same heartebat interval, it is |
| * set to finish time, when finish time is set. |
| * @return 0 if sort finish time and finish time are not set, else returns sort |
| * finish time if that is set, else it returns finish time. |
| */ |
| public long getSortFinishTime() { |
| return 0; |
| } |
| |
| /** |
| * Sets sortFinishTime, if shuffleFinishTime is not set before |
| * then its set to sortFinishTime. |
| * @param sortFinishTime |
| */ |
| void setSortFinishTime(long sortFinishTime) {} |
| |
| /** |
| * Get start time of the task. |
| * @return 0 is start time is not set, else returns start time. |
| */ |
| public long getStartTime() { |
| return startTime; |
| } |
| |
| /** |
| * Set startTime of the task if start time is greater than zero. |
| * @param startTime start time |
| */ |
| void setStartTime(long startTime) { |
| //Making the assumption of passed startTime to be a positive |
| //long value explicit. |
| if (startTime > 0) { |
| this.startTime = startTime; |
| } else { |
| //Using String utils to get the stack trace. |
| LOG.error("Trying to set illegal startTime for task : " + taskid + |
| ".Stack trace is : " + |
| StringUtils.stringifyException(new Exception())); |
| } |
| } |
| /** |
| * Get current phase of this task. Phase.Map in case of map tasks, |
| * for reduce one of Phase.SHUFFLE, Phase.SORT or Phase.REDUCE. |
| * @return . |
| */ |
| public Phase getPhase(){ |
| return this.phase; |
| } |
| /** |
| * Set current phase of this task. |
| * @param phase phase of this task |
| */ |
| public void setPhase(Phase phase){ |
| TaskStatus.Phase oldPhase = getPhase(); |
| if (oldPhase != phase){ |
| // sort phase started |
| if (phase == TaskStatus.Phase.SORT){ |
| if (oldPhase == TaskStatus.Phase.MAP) { |
| setMapFinishTime(System.currentTimeMillis()); |
| } |
| else { |
| setShuffleFinishTime(System.currentTimeMillis()); |
| } |
| }else if (phase == TaskStatus.Phase.REDUCE){ |
| setSortFinishTime(System.currentTimeMillis()); |
| } |
| this.phase = phase; |
| } |
| } |
| |
| boolean inTaskCleanupPhase() { |
| return (this.phase == TaskStatus.Phase.CLEANUP && |
| (this.runState == TaskStatus.State.FAILED_UNCLEAN || |
| this.runState == TaskStatus.State.KILLED_UNCLEAN)); |
| } |
| |
| public boolean getIncludeCounters() { |
| return includeCounters; |
| } |
| |
| public void setIncludeCounters(boolean send) { |
| includeCounters = send; |
| } |
| |
| /** |
| * Get task's counters. |
| */ |
| public Counters getCounters() { |
| return counters; |
| } |
| /** |
| * Set the task's counters. |
| * @param counters |
| */ |
| public void setCounters(Counters counters) { |
| this.counters = counters; |
| } |
| |
| /** |
| * Returns the number of bytes of output from this map. |
| */ |
| public long getOutputSize() { |
| return outputSize; |
| } |
| |
| /** |
| * Set the size on disk of this task's output. |
| * @param l the number of map output bytes |
| */ |
| void setOutputSize(long l) { |
| outputSize = l; |
| } |
| |
| /** |
| * Get the list of maps from which output-fetches failed. |
| * |
| * @return the list of maps from which output-fetches failed. |
| */ |
| public List<TaskAttemptID> getFetchFailedMaps() { |
| return null; |
| } |
| |
| /** |
| * Add to the list of maps from which output-fetches failed. |
| * |
| * @param mapTaskId map from which fetch failed |
| */ |
| public abstract void addFetchFailedMap(TaskAttemptID mapTaskId); |
| |
| /** |
| * Update the status of the task. |
| * |
| * This update is done by ping thread before sending the status. |
| * |
| * @param progress |
| * @param state |
| * @param counters |
| */ |
| synchronized void statusUpdate(float progress, |
| String state, |
| Counters counters) { |
| setProgress(progress); |
| setStateString(state); |
| setCounters(counters); |
| } |
| |
| /** |
| * Update the status of the task. |
| * |
| * @param status updated status |
| */ |
| synchronized void statusUpdate(TaskStatus status) { |
| setProgress (status.getProgress()); |
| this.runState = status.getRunState(); |
| this.stateString = status.getStateString(); |
| this.nextRecordRange = status.getNextRecordRange(); |
| |
| setDiagnosticInfo(status.getDiagnosticInfo()); |
| |
| if (status.getStartTime() > 0) { |
| this.setStartTime(status.getStartTime()); |
| } |
| if (status.getFinishTime() > 0) { |
| this.setFinishTime(status.getFinishTime()); |
| } |
| |
| this.phase = status.getPhase(); |
| this.counters = status.getCounters(); |
| this.outputSize = status.outputSize; |
| } |
| |
| /** |
| * Update specific fields of task status |
| * |
| * This update is done in JobTracker when a cleanup attempt of task |
| * reports its status. Then update only specific fields, not all. |
| * |
| * @param runState |
| * @param progress |
| * @param state |
| * @param phase |
| * @param finishTime |
| */ |
| synchronized void statusUpdate(State runState, |
| float progress, |
| String state, |
| Phase phase, |
| long finishTime) { |
| setRunState(runState); |
| setProgress(progress); |
| setStateString(state); |
| setPhase(phase); |
| if (finishTime > 0) { |
| setFinishTime(finishTime); |
| } |
| } |
| |
| /** |
| * Clear out transient information after sending out a status-update |
| * from either the {@link Task} to the {@link TaskTracker} or from the |
| * {@link TaskTracker} to the {@link JobTracker}. |
| */ |
| synchronized void clearStatus() { |
| // Clear diagnosticInfo |
| diagnosticInfo = ""; |
| } |
| |
| @Override |
| public Object clone() { |
| try { |
| return super.clone(); |
| } catch (CloneNotSupportedException cnse) { |
| // Shouldn't happen since we do implement Clonable |
| throw new InternalError(cnse.toString()); |
| } |
| } |
| |
| ////////////////////////////////////////////// |
| // Writable |
| ////////////////////////////////////////////// |
| public void write(DataOutput out) throws IOException { |
| taskid.write(out); |
| out.writeFloat(progress); |
| out.writeInt(numSlots); |
| WritableUtils.writeEnum(out, runState); |
| Text.writeString(out, diagnosticInfo); |
| Text.writeString(out, stateString); |
| WritableUtils.writeEnum(out, phase); |
| out.writeLong(startTime); |
| out.writeLong(finishTime); |
| out.writeBoolean(includeCounters); |
| out.writeLong(outputSize); |
| if (includeCounters) { |
| counters.write(out); |
| } |
| nextRecordRange.write(out); |
| } |
| |
| public void readFields(DataInput in) throws IOException { |
| this.taskid.readFields(in); |
| setProgress(in.readFloat()); |
| this.numSlots = in.readInt(); |
| this.runState = WritableUtils.readEnum(in, State.class); |
| this.diagnosticInfo = Text.readString(in); |
| this.stateString = Text.readString(in); |
| this.phase = WritableUtils.readEnum(in, Phase.class); |
| this.startTime = in.readLong(); |
| this.finishTime = in.readLong(); |
| counters = new Counters(); |
| this.includeCounters = in.readBoolean(); |
| this.outputSize = in.readLong(); |
| if (includeCounters) { |
| counters.readFields(in); |
| } |
| nextRecordRange.readFields(in); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Factory-like methods to create/read/write appropriate TaskStatus objects |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, |
| float progress, int numSlots, |
| State runState, String diagnosticInfo, |
| String stateString, String taskTracker, |
| Phase phase, Counters counters) |
| throws IOException { |
| boolean isMap = in.readBoolean(); |
| return createTaskStatus(isMap, taskId, progress, numSlots, runState, |
| diagnosticInfo, stateString, taskTracker, phase, |
| counters); |
| } |
| |
| static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, |
| float progress, int numSlots, |
| State runState, String diagnosticInfo, |
| String stateString, String taskTracker, |
| Phase phase, Counters counters) { |
| return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, |
| diagnosticInfo, stateString, taskTracker, |
| phase, counters) : |
| new ReduceTaskStatus(taskId, progress, numSlots, runState, |
| diagnosticInfo, stateString, |
| taskTracker, phase, counters); |
| } |
| |
| static TaskStatus createTaskStatus(boolean isMap) { |
| return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus(); |
| } |
| |
| static TaskStatus readTaskStatus(DataInput in) throws IOException { |
| boolean isMap = in.readBoolean(); |
| TaskStatus taskStatus = createTaskStatus(isMap); |
| taskStatus.readFields(in); |
| return taskStatus; |
| } |
| |
| static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) |
| throws IOException { |
| out.writeBoolean(taskStatus.getIsMap()); |
| taskStatus.write(out); |
| } |
| } |
| |