blob: 724d1a581fd4eaeb006603df4ddecb107b8806bf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hama.ipc.RPC;
import org.apache.hama.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.MasterSyncClient;
import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.http.HttpServer;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
import org.apache.hama.ipc.MasterProtocol;
import org.apache.hama.monitor.fd.FDProvider;
import org.apache.hama.monitor.fd.Supervisor;
import org.apache.hama.monitor.fd.UDPSupervisor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
/**
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs. It has the following responsibilities:
* <ol>
* <li><b>Job submission</b>. BSPMaster is responsible for accepting new job
* requests and assigning the job to scheduler for scheduling BSP Tasks defined
* for the job.
* <li><b>GroomServer monitoring</b> BSPMaster keeps track of all the groom
* servers in the cluster. It is responsible for adding new grooms to the
* cluster and keeping a tab on all the grooms and could blacklist a groom if it
* get fails the availability requirement.
* <li>BSPMaster keeps track of all the task status for each job and handles the
* failure of job as requested by the jobs.
* </ol>
*/
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
GroomServerManager, Watcher, MonitorManager {
public static final Log LOG = LogFactory.getLog(BSPMaster.class);
public static final String localModeMessage = "Local mode detected, no launch of the daemon needed.";
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
private HamaConfiguration conf;
MasterSyncClient syncClient = null;
/**
* Constants for BSPMaster's status.
*/
public static enum State {
INITIALIZING, RUNNING
}
static long JOBINIT_SLEEP_INTERVAL = 2000;
// States
final AtomicReference<State> state = new AtomicReference<State>(
State.INITIALIZING);
// Attributes
String masterIdentifier;
// private Server interServer;
private Server masterServer;
// host and port
private String host;
private int port;
// startTime
private long startTime;
// HTTP server
private HttpServer infoServer;
private int infoPort;
// Filesystem
static final String SUBDIR = "bspMaster";
FileSystem fs = null;
Path systemDir = null;
// system directories are world-wide readable and owner readable
final static FsPermission SYSTEM_DIR_PERMISSION = FsPermission
.createImmutable((short) 0733); // rwx-wx-wx
// system files should have 700 permission
final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission
.createImmutable((short) 0700); // rwx------
// Jobs' Meta Data
private Integer nextJobId = 1;
private int totalSubmissions = 0; // how many jobs has been submitted by
// clients
private int totalTasks = 0; // currnetly running tasks
private int totalTaskCapacity; // max tasks that groom server can run
private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
private TaskScheduler taskScheduler;
// Gorom Server Manager attributes
// GroomServers cache
protected ConcurrentMap<GroomServerStatus, GroomProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus, GroomProtocol>();
private final List<GroomServerStatus> blackList = new CopyOnWriteArrayList<GroomServerStatus>();
private Instructor instructor;
private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
private final List<GroomStatusListener> groomStatusListeners = new CopyOnWriteArrayList<GroomStatusListener>();
private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
/**
* ReportGroomStatusHandler keeps track of the status reported by each
* Groomservers on the task they are executing currently. Based on the status
* reported, it is responsible for issuing task recovery requests, updating
* the job progress and other book keeping on currently running jobs.
*/
private class ReportGroomStatusHandler implements DirectiveHandler {
@Override
public void handle(Directive directive) throws DirectiveException {
// update GroomServerStatus held in the groomServers cache.
GroomServerStatus groomStatus = ((ReportGroomStatusDirective) directive)
.getStatus();
// groomServers cache contains groom server status reported back
if (groomServers.containsKey(groomStatus)) {
GroomServerStatus tmpStatus = null;
for (GroomServerStatus old : groomServers.keySet()) {
if (old.equals(groomStatus)) {
totalTasks -= old.countTasks();
tmpStatus = groomStatus;
updateGroomServersKey(old, tmpStatus);
break;
}
}
if (null != tmpStatus) {
totalTasks += tmpStatus.countTasks();
List<TaskStatus> tlist = tmpStatus.getTaskReports();
for (TaskStatus ts : tlist) {
JobInProgress jip = taskScheduler.findJobById(ts.getJobId());
TaskInProgress tip = jip.findTaskInProgress(ts.getTaskId()
.getTaskID());
if (ts.getRunState() == TaskStatus.State.SUCCEEDED) {
jip.completedTask(tip, ts);
// increment counters only if successful
jip.getCounters().incrAllCounters(ts.getCounters());
for (GroomStatusListener listener : groomStatusListeners) {
listener.taskComplete(groomStatus, tip);
}
} else if (ts.getRunState() == TaskStatus.State.RUNNING) {
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (ts.getRunState() == TaskStatus.State.FAILED) {
if (jip.handleFailure(tip)) {
recoverTask(jip);
} else {
jip.status.setRunState(JobStatus.FAILED);
jip.failedTask(tip, ts);
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobRemoved(jip);
} catch (IOException ioe) {
LOG.error("Fail to alter scheduler a job is moved.", ioe);
}
}
}
}
if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobRemoved(jip);
} catch (IOException ioe) {
LOG.error("Fail to alter scheduler a job is moved.", ioe);
}
}
} else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
GroomProtocol worker = findGroomServer(tmpStatus);
Directive d1 = new DispatchTasksDirective(
new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
try {
worker.dispatch(d1);
} catch (IOException ioe) {
throw new DirectiveException("Error when dispatching kill task"
+ " action.", ioe);
}
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobRemoved(jip);
} catch (IOException ioe) {
LOG.error("Fail to alter scheduler a job is moved.", ioe);
}
}
}
}
} else {
throw new RuntimeException("BSPMaster contains GroomServerSatus, "
+ "but fail to retrieve it.");
}
} else {
throw new RuntimeException("GroomServer not found."
+ groomStatus.getGroomName());
}
}
}
private class Instructor extends Thread {
private final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
private final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
public void bind(Class<? extends Directive> instruction,
DirectiveHandler handler) {
handlers.putIfAbsent(instruction, handler);
}
public void put(Directive directive) {
try {
buffer.put(directive);
} catch (InterruptedException ie) {
LOG.error("Fail to put directive into queue.", ie);
}
}
@Override
public void run() {
while (true) {
try {
Directive directive = this.buffer.take();
handlers.get(directive.getClass()).handle(directive);
} catch (InterruptedException ie) {
LOG.error("Unable to retrieve directive from the queue.", ie);
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Fail to execute directive command.", e);
}
}
}
}
/**
* Start the BSPMaster process, listen on the indicated hostname/port
*
* @param conf provides runtime parameters.
*/
public BSPMaster(HamaConfiguration conf) throws IOException,
InterruptedException {
this(conf, generateNewIdentifier());
}
BSPMaster(HamaConfiguration conf, String identifier) throws IOException,
InterruptedException {
this.conf = conf;
this.masterIdentifier = identifier;
// Create the scheduler and init scheduler services
Class<? extends TaskScheduler> schedulerClass = conf.getClass(
"bsp.master.taskscheduler", SimpleTaskScheduler.class,
TaskScheduler.class);
this.taskScheduler = ReflectionUtils.newInstance(schedulerClass, conf);
InetSocketAddress inetSocketAddress = getAddress(conf);
// inetSocketAddress is null if we are in local mode, then we should start
// nothing.
if (inetSocketAddress != null) {
host = inetSocketAddress.getHostName();
port = inetSocketAddress.getPort();
LOG.info("RPC BSPMaster: host " + host + " port " + port);
startTime = System.currentTimeMillis();
this.masterServer = RPC.getServer(this, host, port, conf);
infoPort = conf.getInt("bsp.http.infoserver.port", 40013);
infoServer = new HttpServer("bspmaster", host, infoPort, true, conf);
infoServer.setAttribute("bsp.master", this);
// starting webserver
infoServer.start();
if (conf.getBoolean("bsp.monitor.fd.enabled", false)) {
this.supervisor.set(FDProvider.createSupervisor(conf.getClass(
"bsp.monitor.fd.supervisor.class", UDPSupervisor.class,
Supervisor.class), conf));
}
while (!Thread.currentThread().isInterrupted()) {
try {
if (fs == null) {
fs = FileSystem.get(conf);
}
} catch (IOException e) {
LOG.error("Can't get connection to Hadoop Namenode!", e);
}
try {
// clean up the system dir, which will only work if hdfs is out of
// safe mode
if (systemDir == null) {
systemDir = new Path(getSystemDir());
}
LOG.info("Cleaning up the system directory");
LOG.info(systemDir);
fs.delete(systemDir, true);
if (FileSystem.mkdirs(fs, systemDir, new FsPermission(
SYSTEM_DIR_PERMISSION))) {
break;
}
LOG.error("Mkdirs failed to create " + systemDir);
LOG.info(SUBDIR);
} catch (AccessControlException ace) {
LOG.warn("Failed to operate on bsp.system.dir (" + systemDir
+ ") because of permissions.");
LOG.warn("Manually delete the bsp.system.dir (" + systemDir
+ ") and then start the BSPMaster.");
LOG.warn("Bailing out ... ");
throw ace;
} catch (IOException ie) {
LOG.info("problem cleaning system directory: " + systemDir, ie);
}
Thread.sleep(FS_ACCESS_RETRY_PERIOD);
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
deleteLocalFiles(SUBDIR);
} else {
System.out.println(localModeMessage);
LOG.info(localModeMessage);
}
}
/**
* A GroomServer registers with its status to BSPMaster when startup, which
* will update GroomServers cache.
*
* @param status to be updated in cache.
* @return true if registering successfully; false if fail.
*/
@Override
public boolean register(GroomServerStatus status) throws IOException {
if (null == status) {
LOG.error("No groom server status.");
throw new NullPointerException("No groom server status.");
}
Throwable e = null;
try {
GroomProtocol wc = (GroomProtocol) RPC.waitForProxy(GroomProtocol.class,
HamaRPCProtocolVersion.versionID,
resolveWorkerAddress(status.getRpcServer()), this.conf);
if (null == wc) {
LOG.warn("Fail to create Worker client at host");
return false;
}
// TODO: need to check if peer name has changed
groomServers.putIfAbsent(status, wc);
} catch (UnsupportedOperationException u) {
e = u;
} catch (ClassCastException c) {
e = c;
} catch (NullPointerException n) {
e = n;
} catch (IllegalArgumentException i) {
e = i;
} catch (Exception ex) {
e = ex;
}
if (null != e) {
LOG.error("Fail to register GroomServer " + status.getGroomName(), e);
return false;
}
for (GroomStatusListener listener : groomStatusListeners) {
listener.groomServerRegistered(status);
}
LOG.info(status.getGroomName() + " is added.");
return true;
}
public static InetSocketAddress resolveWorkerAddress(String data) {
return new InetSocketAddress(data.split(":")[0], Integer.parseInt(data
.split(":")[1]));
}
private void updateGroomServersKey(GroomServerStatus old,
GroomServerStatus newKey) {
synchronized (groomServers) {
GroomProtocol worker = groomServers.remove(old);
groomServers.put(newKey, worker);
}
}
@Override
public boolean report(Directive directive) throws IOException {
instructor.put(directive);
return true;
}
// /////////////////////////////////////////////////////////////
// BSPMaster methods
// /////////////////////////////////////////////////////////////
// Get the job directory in system directory
Path getSystemDirectoryForJob(BSPJobID id) {
return new Path(getSystemDir(), id.toString());
}
String[] getLocalDirs() throws IOException {
return conf.getStrings("bsp.local.dir");
}
void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
for (String localDir : localDirs) {
FileSystem.getLocal(conf).delete(new Path(localDir), true);
}
}
void deleteLocalFiles(String subdir) throws IOException {
try {
String[] localDirs = getLocalDirs();
for (String localDir : localDirs) {
FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
}
} catch (NullPointerException e) {
LOG.info(e);
}
}
/**
* Constructs a local file name. Files are distributed among configured local
* directories.
*/
Path getLocalPath(String pathString) throws IOException {
return conf.getLocalPath("bsp.local.dir", pathString);
}
/**
* Starts the BSP Master process.
*
* @param conf The Hama configuration.
* @return an instance of BSPMaster
* @throws IOException
* @throws InterruptedException
*/
public static BSPMaster startMaster(HamaConfiguration conf)
throws IOException, InterruptedException {
return startMaster(conf, generateNewIdentifier());
}
/**
* Starts the BSP Master process
*
* @param conf The Hama configuration
* @param identifier Identifier for the job.
* @return an instance of BSPMaster
* @throws IOException
* @throws InterruptedException
*/
public static BSPMaster startMaster(HamaConfiguration conf, String identifier)
throws IOException, InterruptedException {
BSPMaster result = new BSPMaster(conf, identifier);
// init zk root and child nodes
// zk is required to be initialized before scheduler is started.
result.initZK(conf);
result.taskScheduler.setGroomServerManager(result);
result.taskScheduler.setMonitorManager(result);
if (conf.getBoolean("bsp.monitor.fd.enabled", false)) {
result.supervisor.get().start();
}
result.taskScheduler.start();
return result;
}
/**
* Initialize the global synchronization client.
*
* @param conf Hama configuration.
*/
private void initZK(HamaConfiguration conf) {
this.syncClient = new ZKSyncBSPMasterClient();
this.syncClient.init(conf);
}
/**
* Get a handle of the global synchronization client used by BSPMaster.
*
* @return The synchronization client.
*/
public MasterSyncClient getSyncClient() {
return this.syncClient;
}
/**
* Parses the configuration for the master addresses.
*
* @param conf normal configuration containing the information the user
* configured.
* @return a InetSocketAddress if everything went fine. Or "null" if "local"
* was configured, which is no valid hostname.
*/
public static InetSocketAddress getAddress(Configuration conf) {
String hamaMasterStr = conf.get("bsp.master.address", "localhost");
// we ensure that hamaMasterStr is non-null here because we provided
// "localhost" as default.
if (!hamaMasterStr.equals("local")) {
int defaultPort = conf.getInt("bsp.master.port", 40000);
return NetUtils.createSocketAddr(hamaMasterStr, defaultPort);
} else {
return null;
}
}
/**
* BSPMaster identifier
*
* @return String BSPMaster identification number
*/
private static String generateNewIdentifier() {
return new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
}
public void offerService() throws InterruptedException, IOException {
this.masterServer.start();
state.set(State.RUNNING);
instructor = new Instructor();
instructor.bind(ReportGroomStatusDirective.class,
new ReportGroomStatusHandler());
instructor.start();
LOG.info("Starting RUNNING");
this.masterServer.join();
LOG.info("Stopped RPC Master server.");
}
// //////////////////////////////////////////////////
// InterServerProtocol
// //////////////////////////////////////////////////
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
if (protocol.equals(MasterProtocol.class.getName())) {
return HamaRPCProtocolVersion.versionID;
} else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
return HamaRPCProtocolVersion.versionID;
} else {
throw new IOException("Unknown protocol to BSPMaster: " + protocol);
}
}
// //////////////////////////////////////////////////
// JobSubmissionProtocol
// //////////////////////////////////////////////////
/**
* This method returns new job id. The returned job id increases sequentially.
*/
@Override
public BSPJobID getNewJobId() throws IOException {
int id;
synchronized (nextJobId) {
id = nextJobId;
nextJobId = id + 1;
}
return new BSPJobID(this.masterIdentifier, id);
}
@Override
public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
if (jobs.containsKey(jobID)) {
// job already running, don't start twice
LOG.info("The job (" + jobID + ") was already submitted");
return jobs.get(jobID).getStatus();
}
JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
this.conf);
++totalSubmissions;
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting job number = " + totalSubmissions + " id = "
+ job.getJobID());
}
return addJob(jobID, job);
}
// //////////////////////////////////////////////////
// GroomServerManager functions
// //////////////////////////////////////////////////
@Override
public ClusterStatus getClusterStatus(boolean detailed) {
Map<String, GroomServerStatus> groomsMap = null;
// give the caller a snapshot of the cluster status
int numGroomServers = groomServers.size();
if (detailed) {
groomsMap = new HashMap<String, GroomServerStatus>();
for (Map.Entry<GroomServerStatus, GroomProtocol> entry : groomServers
.entrySet()) {
GroomServerStatus s = entry.getKey();
groomsMap.put(s.getGroomHostName() + ":"
+ Constants.DEFAULT_GROOM_INFO_SERVER, s);
}
}
int tasksPerGroom = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
this.totalTaskCapacity = conf.getInt(Constants.MAX_TASKS, tasksPerGroom * numGroomServers);
if (detailed) {
return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity,
state.get());
} else {
return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
state.get());
}
}
@Override
public GroomProtocol findGroomServer(GroomServerStatus status) {
return groomServers.get(status);
}
@Override
public Collection<GroomProtocol> findGroomServers() {
return groomServers.values();
}
@Override
public Collection<GroomServerStatus> groomServerStatusKeySet() {
return groomServers.keySet();
}
@Override
public void addJobInProgressListener(JobInProgressListener listener) {
jobInProgressListeners.add(listener);
}
@Override
public void removeJobInProgressListener(JobInProgressListener listener) {
jobInProgressListeners.remove(listener);
}
@Override
public void addGroomStatusListener(GroomStatusListener listener) {
groomStatusListeners.add(listener);
}
@Override
public void removeGroomStatusListener(GroomStatusListener listener) {
groomStatusListeners.remove(listener);
}
@Override
public void moveToBlackList(String host) {
LOG.info("[moveToBlackList()]Host to be moved to black list: " + host);
for (GroomServerStatus groomStatus : groomServerStatusKeySet()) {
LOG.info("[moveToBlackList()]GroomServerStatus's host name:"
+ groomStatus.getGroomHostName() + " host:" + host);
if (groomStatus.getGroomHostName().equals(host)) {
boolean result = groomServers.remove(groomStatus,
findGroomServer(groomStatus));
if (!result) {
LOG.error("Fail to remove " + host + " out of groom server cache!");
}
blackList.add(groomStatus);
LOG.info("[moveToBlackList()] " + host
+ " is successfully moved to black list.");
}
}
}
@Override
public void removeOutOfBlackList(String host) {
for (GroomServerStatus groomStatus : blackList) {
if (groomStatus.getGroomHostName().equals(host)) {
boolean result = blackList.remove(groomStatus);
if (result)
LOG.info("Successfully remove " + host + " out of black list.");
else
LOG.error("Fail to remove " + host + " out of black list.");
}
}
}
@Override
public Collection<GroomServerStatus> alive() {
return groomServerStatusKeySet();
}
@Override
public Collection<GroomServerStatus> blackList() {
return blackList;
}
@Override
public GroomServerStatus findInBlackList(String host) {
for (GroomServerStatus status : blackList) {
if (host.equals(status.getGroomHostName())) {
return status;
}
}
return null;
}
public String getBSPMasterName() {
return host + ":" + port;
}
public long getStartTime() {
return startTime;
}
public String getBSPMasterIdentifier() {
return masterIdentifier;
}
public int getHttpPort() {
return infoPort;
}
/**
* Adds a job to the bsp master. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic
*
* @param jobId The id for the job submitted which needs to be added
*/
private synchronized JobStatus addJob(BSPJobID jobId, JobInProgress job) {
synchronized (jobs) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobAdded(job);
} catch (IOException ioe) {
LOG.error("Fail to alter Scheduler a job is added.", ioe);
}
}
}
return job.getStatus();
}
/**
* Recovers task in job. To be called when a particular task in a job has
* failed and there is a need to schedule it on a machine.
*/
private synchronized void recoverTask(JobInProgress job) {
++totalSubmissions;
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.recoverTaskInJob(job);
} catch (IOException ioe) {
LOG.error("Fail to alter Scheduler a job is added.", ioe);
}
}
}
@Override
public JobStatus[] jobsToComplete() throws IOException {
return getJobStatus(jobs.values(), true);
}
@Override
public JobStatus[] getAllJobs() throws IOException {
LOG.debug("returns all jobs: " + jobs.size());
return getJobStatus(jobs.values(), false);
}
private synchronized static JobStatus[] getJobStatus(
Collection<JobInProgress> jips, boolean toComplete) {
if (jips == null) {
return new JobStatus[] {};
}
List<JobStatus> jobStatusList = new ArrayList<JobStatus>();
for (JobInProgress jip : jips) {
JobStatus status = jip.getStatus();
status.setStartTime(jip.getStartTime());
status.setNumOfTasks(jip.getNumOfTasks());
// Sets the user name
status.setUsername(jip.getProfile().getUser());
status.setName(jip.getJobName());
if (toComplete) {
if (status.getRunState() == JobStatus.RUNNING
|| status.getRunState() == JobStatus.PREP) {
jobStatusList.add(status);
}
} else {
jobStatusList.add(status);
}
}
return jobStatusList.toArray(new JobStatus[jobStatusList.size()]);
}
@Override
public synchronized String getFilesystemName() throws IOException {
if (fs == null) {
throw new IllegalStateException("FileSystem object not available yet");
}
return fs.getUri().toString();
}
/**
* Return system directory to which BSP store control files.
*/
@Override
public String getSystemDir() {
Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"));
return fs.makeQualified(sysDir).toString();
}
@Override
public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
synchronized (this) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
return job.getProfile();
}
}
return null;
}
@Override
public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
synchronized (this) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
return job.getStatus();
}
}
return null;
}
@Override
public void killJob(BSPJobID jobid) throws IOException {
JobInProgress job = jobs.get(jobid);
if (null == job) {
LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job");
return;
}
killJob(job);
}
private synchronized static void killJob(JobInProgress job) {
LOG.info("Killing job " + job.getJobID());
job.kill();
}
@Override
public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
throws IOException {
return false;
}
public static BSPMaster constructMaster(
Class<? extends BSPMaster> masterClass, final Configuration conf) {
try {
Constructor<? extends BSPMaster> c = masterClass
.getConstructor(Configuration.class);
return c.newInstance(conf);
} catch (Exception e) {
throw new RuntimeException("Failed construction of " + "Master: "
+ masterClass.toString()
+ ((e.getCause() != null) ? e.getCause().getMessage() : ""), e);
}
}
/**
* Shuts down the BSP Process and does the necessary clean up.
*/
public void shutdown() {
try {
this.syncClient.close();
} catch (IOException e) {
LOG.error("Error closing the sync client", e);
}
if (null != this.supervisor.get()) {
this.supervisor.get().stop();
}
this.masterServer.stop();
}
public BSPMaster.State currentState() {
return this.state.get();
}
@Override
public void process(WatchedEvent event) {
}
@Override
public Supervisor supervisor() {
return this.supervisor.get();
}
TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
@Override
public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID jobid,
int fromEventId, int maxEvents) {
synchronized (this) {
JobInProgress job = this.jobs.get(jobid);
if (null != job) {
if (job.areTasksInited()) {
return job.getTaskCompletionEvents(fromEventId, maxEvents);
} else {
return EMPTY_EVENTS;
}
}
}
return null;
}
}