blob: 3bd7ec8e819ed15615f32038ae2cc5ba6d1f1b83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.NumberFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.ipc.BSPPeerProtocol;
/**
* Base class for tasks.
*/
public abstract class Task implements Writable {
public static final Log LOG = LogFactory.getLog(Task.class);
// //////////////////////////////////////////
// Fields
// //////////////////////////////////////////
protected BSPJobID jobId;
protected String jobFile;
protected TaskAttemptID taskId;
protected int partition;
protected LocalDirAllocator lDirAlloc;
// Current counters
private transient Counters counters = new Counters();
public Task() {
jobId = new BSPJobID();
taskId = new TaskAttemptID();
}
public Task(BSPJobID jobId, String jobFile, TaskAttemptID taskId,
int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskId;
this.partition = partition;
}
// //////////////////////////////////////////
// Accessors
// //////////////////////////////////////////
public void setJobFile(String jobFile) {
this.jobFile = jobFile;
}
public String getJobFile() {
return jobFile;
}
public TaskAttemptID getTaskAttemptId() {
return this.taskId;
}
public TaskAttemptID getTaskID() {
return taskId;
}
/**
* Get the job name for this task.
*
* @return the job name
*/
public BSPJobID getJobID() {
return jobId;
}
/**
* Get the index of this task within the job.
*
* @return the integer part of the task id
*/
public int getPartition() {
return partition;
}
/**
* Construct output file names so that, when an output directory listing is
* sorted lexicographically, positions correspond to output partitions.
*/
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
static synchronized String getOutputName(int partition) {
return "part-" + NUMBER_FORMAT.format(partition);
}
@Override
public String toString() {
return taskId.toString();
}
// //////////////////////////////////////////
// Writable
// //////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
jobId.write(out);
Text.writeString(out, jobFile);
taskId.write(out);
out.writeInt(partition);
}
@Override
public void readFields(DataInput in) throws IOException {
jobId.readFields(in);
jobFile = Text.readString(in);
taskId.readFields(in);
partition = in.readInt();
}
/**
* Run this task as a part of the named job. This method is executed in the
* child process.
*
* @param bspPeer for communications
* @param umbilical for communications with GroomServer
*/
public abstract void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?, ?> bspPeer,
BSPPeerProtocol umbilical) throws Exception;
public abstract BSPTaskRunner createRunner(GroomServer groom);
public void done(BSPPeerProtocol umbilical) throws IOException {
umbilical.done(getTaskID());
}
public abstract BSPJob getConf();
public abstract void setConf(BSPJob localJobConf);
Counters getCounters() {
return counters;
}
}