blob: bf09bb02b7f8e435f6e3409dde9198d130b7a8b2 [file] [log] [blame]
package edu.uci.ics.hyracks.hadoop.compat.driver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.kohsuke.args4j.CmdLineParser;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
public class CompatibilityLayer {
HyracksClient hyracksClient;
DCacheHandler dCacheHander = null;
Properties clusterConf;
HadoopAdapter hadoopAdapter;
private static char configurationFileDelimiter = '=';
private static final String dacheKeyPrefix = "dcache.key";
public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
initialize(clConfig);
}
private void initialize(CompatibilityConfig clConfig) throws Exception {
clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
hadoopAdapter = new HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL));
hyracksClient = new HyracksClient(clusterConf);
dCacheHander = new DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration));
}
public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) throws Exception {
List<JobConf> jobConfs = new ArrayList<JobConf>();
jobConfs.add(conf);
String applicationName = conf.getJobName() + System.currentTimeMillis();
JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
return hyracksRunningJob;
}
public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles, Set<String> userLibs)
throws Exception {
List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
populateDCache(jobFiles[0]);
JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
return hyracksRunningJob;
}
public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles) throws Exception {
List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
populateDCache(jobFiles[0]);
JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec);
return hyracksRunningJob;
}
private void populateDCache(String jobFile) throws IOException {
Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
String tempDir = "/tmp";
if (dcacheTasks.size() > 0) {
for (String key : dcacheTasks.keySet()) {
String destPath = tempDir + "/" + key + System.currentTimeMillis();
hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
System.out.println(" source :" + dcacheTasks.get(key));
System.out.println(" dest :" + destPath);
System.out.println(" key :" + key);
System.out.println(" value :" + destPath);
dCacheHander.put(key, destPath);
}
}
}
private String getApplicationNameForHadoopJob(JobConf jobConf) {
String jar = jobConf.getJar();
if (jar != null) {
return jar.substring(jar.lastIndexOf("/") >= 0 ? jar.lastIndexOf("/") + 1 : 0);
} else {
return "" + System.currentTimeMillis();
}
}
private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
Map<String, String> foundProperties = new HashMap<String, String>();
Set<Entry<Object, Object>> entrySet = properties.entrySet();
for (Entry entry : entrySet) {
String key = (String) entry.getKey();
String value = (String) entry.getValue();
if ((key.startsWith(prefix))) {
String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
foundProperties.put(actualKey, value);
}
}
return foundProperties;
}
public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
Properties jobProperties = Utilities.getProperties(jobFile, ',');
Map<String, String> dcacheTasks = new HashMap<String, String>();
Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
for (String key : dcacheKeys.keySet()) {
String sourcePath = dcacheKeys.get(key);
if (sourcePath != null) {
dcacheTasks.put(key, sourcePath);
}
}
return dcacheTasks;
}
public void waitForCompletion(JobId jobId) throws Exception {
hyracksClient.waitForCompleton(jobId);
}
private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
List<JobConf> jobConfs = new ArrayList<JobConf>();
for (String jobFile : jobFiles) {
jobConfs.add(constructHadoopJobConf(jobFile));
}
return jobConfs;
}
private JobConf constructHadoopJobConf(String jobFile) {
Properties jobProperties = Utilities.getProperties(jobFile, '=');
JobConf conf = new JobConf(hadoopAdapter.getConf());
for (Entry entry : jobProperties.entrySet()) {
conf.set((String) entry.getKey(), (String) entry.getValue());
System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
}
return conf;
}
private String[] getJobs(CompatibilityConfig clConfig) {
return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
}
public static void main(String args[]) throws Exception {
long startTime = System.nanoTime();
CompatibilityConfig clConfig = new CompatibilityConfig();
CmdLineParser cp = new CmdLineParser(clConfig);
try {
cp.parseArgument(args);
} catch (Exception e) {
System.err.println(e.getMessage());
cp.printUsage(System.err);
return;
}
CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
String applicationName = clConfig.applicationName;
String[] jobFiles = compatLayer.getJobs(clConfig);
String[] userLibraries = null;
if (clConfig.userLibs != null) {
userLibraries = clConfig.userLibs.split(",");
}
try {
HyracksRunningJob hyraxRunningJob = null;
if (userLibraries != null) {
Set<String> userLibs = new HashSet<String>();
for (String userLib : userLibraries) {
userLibs.add(userLib);
}
hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles, userLibs);
} else {
hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles);
}
compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
long end_time = System.nanoTime();
System.out.println("TOTAL TIME (from Launch to Completion):"
+ ((end_time - startTime) / (float) 1000000000.0) + " seconds.");
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}