blob: e4daf0b4113e690d1de5d42438cc79a87172d549 [file] [log] [blame]
package edu.uci.ics.hyracks.hadoop.compat.client;
import java.io.File;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
public class HyracksClient {
private HadoopAdapter hadoopAdapter;
private static HyracksRMIConnection connection;
private static final String applicationName = "CompatibilityLayer";
public HyracksClient(String clusterConf) throws Exception {
Properties properties = Utilities.getProperties(clusterConf, '=');
String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
String fileSystem = (String) properties.get(ConfigurationConstants.namenodeURL);
initialize(clusterController, fileSystem);
}
public HyracksClient(String clusterControllerAddr, String fileSystem) throws Exception {
initialize(clusterControllerAddr, fileSystem);
}
private void initialize(String clusterControllerAddr, String namenodeUrl) throws Exception {
connection = new HyracksRMIConnection(clusterControllerAddr, 1099);
connection.destroyApplication(applicationName);
hadoopAdapter = new HadoopAdapter(namenodeUrl);
}
public HyracksRunningJob submitJobs(List<JobConf> confs, Set<String> requiredLibs) throws Exception {
JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
String appName = getApplicationNameHadoopJob(confs.get(0));
return submitJob(appName,spec, requiredLibs);
}
private String getApplicationNameHadoopJob(JobConf jobConf) {
String jar = jobConf.getJar();
if( jar != null){
return jar.substring(jar.lastIndexOf("/") >=0 ? jar.lastIndexOf("/") +1 : 0);
}else {
return "" + System.currentTimeMillis();
}
}
public HyracksRunningJob submitJob(JobConf conf, Set<String> requiredLibs) throws Exception {
JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
String appName = getApplicationNameHadoopJob(conf);
return submitJob(appName, spec, requiredLibs);
}
public JobStatus getJobStatus(UUID jobId) throws Exception {
return connection.getJobStatus(jobId);
}
public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> requiredLibs) throws Exception {
UUID jobId = null;
try {
jobId = connection.createJob(applicationName, spec);
} catch (Exception e){
System.out.println(" application not found, creating application" + applicationName);
connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
jobId = connection.createJob(applicationName, spec);
}
connection.start(jobId);
HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
return runningJob;
}
public HadoopAdapter getHadoopAdapter() {
return hadoopAdapter;
}
public void setHadoopAdapter(HadoopAdapter hadoopAdapter) {
this.hadoopAdapter = hadoopAdapter;
}
public void waitForCompleton(UUID jobId) throws Exception {
connection.waitForCompletion(jobId);
}
}