blob: 3057ba0e59f8ee22f98a17d46bae9b8a918816ce [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.pregelix.core.driver;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.client.stats.Counters;
import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.GlobalEdgeCountAggregator;
import edu.uci.ics.pregelix.api.util.GlobalVertexCountAggregator;
import edu.uci.ics.pregelix.api.util.ReflectionUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
public static final String[] COUNTERS = { Counters.NUM_PROCESSOR, Counters.SYSTEM_LOAD, Counters.MEMORY_USAGE,
Counters.DISK_READ, Counters.DISK_WRITE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE };
private static final Log LOG = LogFactory.getLog(Driver.class);
private IHyracksClientConnection hcc;
private final Class exampleClass;
private boolean profiling = false;
private final StringBuffer counterBuffer = new StringBuffer();
public Driver(Class exampleClass) {
this.exampleClass = exampleClass;
}
@Override
public void runJob(PregelixJob job, String ipAddress, int port) throws HyracksException {
runJob(job, Plan.OUTER_JOIN, ipAddress, port, false);
}
@Override
public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException {
runJobs(jobs, Plan.OUTER_JOIN, ipAddress, port, false);
}
@Override
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
runJobs(Collections.singletonList(job), planChoice, ipAddress, port, profiling);
}
@Override
public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
try {
counterBuffer.delete(0, counterBuffer.length());
counterBuffer.append("performance counters\n");
if (jobs.size() <= 0) {
throw new HyracksException("Please submit at least one job for execution!");
}
for (PregelixJob job : jobs) {
initJobConfiguration(job);
}
this.profiling = profiling;
PregelixJob currentJob = jobs.get(0);
PregelixJob lastJob = currentJob;
addHadoopConfiguration(currentJob, ipAddress, port, true);
ClientCounterContext counterContext = new ClientCounterContext(ipAddress, ClusterConfig.getCCHTTPort(),
Arrays.asList(ClusterConfig.getNCNames()));
JobGen jobGen = null;
/** prepare job -- deploy jars */
DeploymentId deploymentId = prepareJobs(ipAddress, port);
LOG.info("job started");
IntWritable lastSnapshotJobIndex = new IntWritable(0);
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
int maxRetryCount = 3;
IOptimizer dynamicOptimizer = BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration()) == false ? new NoOpOptimizer()
: new DynamicOptimizer(counterContext);
jobGen = selectJobGen(planChoice, currentJob, dynamicOptimizer);
jobGen = dynamicOptimizer.optimize(jobGen, 0);
do {
try {
for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
lastJob = currentJob;
currentJob = jobs.get(i);
currentJob.setRecoveryCount(retryCount);
/** add hadoop configurations */
addHadoopConfiguration(currentJob, ipAddress, port, failed);
ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
boolean compatible = i == 0 ? false : compatible(lastJob, currentJob);
/** load the data */
if (!failed) {
IterationUtils.makeTempDirectory(currentJob.getConfiguration());
if (i == 0) {
jobGen.reset(currentJob);
loadData(currentJob, jobGen, deploymentId);
} else if (!compatible) {
finishJobs(jobGen, deploymentId);
/** invalidate/clear checkpoint */
lastSnapshotJobIndex.set(0);
lastSnapshotSuperstep.set(0);
jobGen.reset(currentJob);
loadData(currentJob, jobGen, deploymentId);
} else {
jobGen.reset(currentJob);
}
} else {
jobGen.reset(currentJob);
}
/** run loop-body jobs with dynamic optimizer if it is enabled */
jobGen = dynamicOptimizer.optimize(jobGen, i);
runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
ckpHook, failed);
failed = false;
}
/** finish the jobs */
finishJobs(jobGen, deploymentId);
/** clear state */
runClearState(deploymentId, jobGen, true);
/** undeploy the binary */
hcc.unDeployBinary(deploymentId);
} catch (Exception e1) {
Set<String> blackListNodes = new HashSet<String>();
/** disk failures or node failures */
if (ExceptionUtilities.recoverable(e1, blackListNodes)) {
ClusterConfig.addToBlackListNodes(blackListNodes);
failed = true;
retryCount++;
} else {
throw e1;
}
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
for (String counter : COUNTERS) {
counterBuffer.append("\t" + counter + ": " + counterContext.getCounter(counter, false).get() + "\n");
}
LOG.info(counterBuffer.toString());
counterContext.stop();
} catch (Exception e) {
throw new HyracksException(e);
} finally {
/** clear temporary directories */
try {
for (PregelixJob job : jobs) {
IterationUtils.removeTempDirectory(job.getConfiguration());
}
} catch (Exception e) {
throw new HyracksException(e);
}
}
}
private boolean compatible(PregelixJob lastJob, PregelixJob currentJob) {
Class lastVertexIdClass = BspUtils.getVertexIndexClass(lastJob.getConfiguration());
Class lastVertexValueClass = BspUtils.getVertexValueClass(lastJob.getConfiguration());
Class lastEdgeValueClass = BspUtils.getEdgeValueClass(lastJob.getConfiguration());
Path lastOutputPath = FileOutputFormat.getOutputPath(lastJob);
Class currentVertexIdClass = BspUtils.getVertexIndexClass(currentJob.getConfiguration());
Class currentVertexValueClass = BspUtils.getVertexValueClass(currentJob.getConfiguration());
Class currentEdegeValueClass = BspUtils.getEdgeValueClass(currentJob.getConfiguration());
Path[] currentInputPaths = FileInputFormat.getInputPaths(currentJob);
return lastVertexIdClass.equals(currentVertexIdClass)
&& lastVertexValueClass.equals(currentVertexValueClass)
&& lastEdgeValueClass.equals(currentEdegeValueClass)
&& (currentInputPaths.length == 0 || currentInputPaths.length == 1
&& lastOutputPath.equals(currentInputPaths[0]));
}
private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
return JobGenFactory.createJobGen(planChoice, currentJob, optimizer);
}
private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
Exception {
long start;
long end;
long time;
start = System.currentTimeMillis();
FileSystem dfs = FileSystem.get(currentJob.getConfiguration());
Path outputPath = FileOutputFormat.getOutputPath(currentJob);
if (outputPath != null) {
dfs.delete(outputPath, true);
}
runCreate(deploymentId, jobGen);
runDataLoad(deploymentId, jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("data loading finished " + time + "ms");
return time;
}
private void finishJobs(JobGen jobGen, DeploymentId deploymentId) throws Exception {
long start;
long end;
long time;
start = System.currentTimeMillis();
runHDFSWRite(deploymentId, jobGen);
runCleanup(deploymentId, jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
}
private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
if (hcc == null) {
hcc = new HyracksConnection(ipAddress, port);
}
URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
List<File> jars = new ArrayList<File>();
URL[] urls = classLoader.getURLs();
for (URL url : urls) {
if (url.toString().endsWith(".jar")) {
jars.add(new File(url.getPath()));
}
}
DeploymentId deploymentId = installApplication(jars);
return deploymentId;
}
private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port, boolean loadClusterConfig)
throws HyracksException {
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
if (hadoopCore != null) {
job.getConfiguration().addResource(hadoopCore);
}
URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
if (hadoopMapRed != null) {
job.getConfiguration().addResource(hadoopMapRed);
}
URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
if (hadoopHdfs != null) {
job.getConfiguration().addResource(hadoopHdfs);
}
if (loadClusterConfig) {
ClusterConfig.loadClusterConfig(ipAddress, port);
}
}
private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook, boolean doRecovery)
throws Exception {
if (doRecovery) {
/** reload the checkpoint */
if (snapshotSuperstep.get() > 0) {
runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
} else {
runClearState(deploymentId, jobGen, true);
loadData(job, jobGen, deploymentId);
}
}
// TODO how long should the hook persist? One per job? Or one per recovery attempt?
// since the hook shouldn't be stateful, we do one per recovery attempt
IIterationCompleteReporterHook itCompleteHook = BspUtils.createIterationCompleteHook(job.getConfiguration());
int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
int ckpInterval = BspUtils.getCheckpointingInterval(job.getConfiguration());
boolean terminate = false;
long start, end, time;
do {
start = System.currentTimeMillis();
runLoopBodyIteration(deploymentId, jobGen, i);
end = System.currentTimeMillis();
time = end - start;
LOG.info(job + ": iteration " + i + " finished " + time + "ms");
if (i == 1) {
counterBuffer.append("\t"
+ "total vertice: "
+ IterationUtils.readGlobalAggregateValue(job.getConfiguration(),
BspUtils.getJobId(job.getConfiguration()), GlobalVertexCountAggregator.class.getName())
+ "\n");
counterBuffer.append("\t"
+ "total edges: "
+ IterationUtils.readGlobalAggregateValue(job.getConfiguration(),
BspUtils.getJobId(job.getConfiguration()), GlobalEdgeCountAggregator.class.getName())
+ "\n");
}
terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId())
|| i >= BspUtils.getMaxIteration(job.getConfiguration());
if (ckpHook.checkpoint(i) || ckpInterval > 0 && i % ckpInterval == 0) {
runCheckpoint(deploymentId, jobGen, i);
snapshotJobIndex.set(currentJobIndex);
snapshotSuperstep.set(i);
}
itCompleteHook.completeIteration(i, job);
i++;
} while (!terminate);
}
private void runCheckpoint(DeploymentId deploymentId, JobGen jobGen, int lastSuccessfulIteration) throws Exception {
try {
JobSpecification[] ckpJobs = jobGen.generateCheckpointing(lastSuccessfulIteration);
runJobArray(deploymentId, ckpJobs);
} catch (Exception e) {
throw e;
}
}
private void runLoadCheckpoint(DeploymentId deploymentId, JobGen jobGen, int checkPointedIteration)
throws Exception {
try {
JobSpecification[] ckpJobs = jobGen.generateLoadingCheckpoint(checkPointedIteration);
runJobArray(deploymentId, ckpJobs);
} catch (Exception e) {
throw e;
}
}
private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
execute(deploymentId, treeCreateSpec);
} catch (Exception e) {
throw e;
}
}
private void runDataLoad(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
execute(deploymentId, bulkLoadJobSpec);
} catch (Exception e) {
throw e;
}
}
private void runLoopBodyIteration(DeploymentId deploymentId, JobGen jobGen, int iteration) throws Exception {
try {
JobSpecification loopBody = jobGen.generateJob(iteration);
execute(deploymentId, loopBody);
} catch (Exception e) {
throw e;
}
}
private void runHDFSWRite(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
execute(deploymentId, scanSortPrintJobSpec);
} catch (Exception e) {
throw e;
}
}
private void runCleanup(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification[] cleanups = jobGen.generateCleanup();
runJobArray(deploymentId, cleanups);
} catch (Exception e) {
throw e;
}
}
private void runClearState(DeploymentId deploymentId, JobGen jobGen, boolean allStates) throws Exception {
try {
JobSpecification clear = jobGen.generateClearState(allStates);
execute(deploymentId, clear);
} catch (Exception e) {
throw e;
}
}
private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
for (JobSpecification job : jobs) {
execute(deploymentId, job);
}
}
private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
job.setReportTaskDetails(false);
job.setMaxReattempts(0);
JobId jobId = hcc.startJob(deploymentId, job,
profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
public DeploymentId installApplication(List<File> jars) throws Exception {
List<String> allJars = new ArrayList<String>();
for (File jar : jars) {
allJars.add(jar.getAbsolutePath());
}
long start = System.currentTimeMillis();
DeploymentId deploymentId = hcc.deployBinary(allJars);
long end = System.currentTimeMillis();
LOG.info("jar deployment finished " + (end - start) + "ms");
return deploymentId;
}
@SuppressWarnings({ "unchecked" })
private void initJobConfiguration(PregelixJob job) {
Configuration conf = job.getConfiguration();
Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
Type vertexIndexType = parameterTypes.get(0);
Type vertexValueType = parameterTypes.get(1);
Type edgeValueType = parameterTypes.get(2);
Type messageValueType = parameterTypes.get(3);
conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
List aggregatorClasses = BspUtils.getGlobalAggregatorClasses(conf);
for (int i = 0; i < aggregatorClasses.size(); i++) {
Class aggregatorClass = (Class) aggregatorClasses.get(i);
if (!aggregatorClass.equals(GlobalAggregator.class)) {
List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
Type partialAggregateValueType = argTypes.get(4);
conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS + "$" + aggregatorClass.getName(),
(Class<?>) partialAggregateValueType, Writable.class);
Type finalAggregateValueType = argTypes.get(5);
conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS + "$" + aggregatorClass.getName(),
(Class<?>) finalAggregateValueType, Writable.class);
}
}
Class combinerClass = BspUtils.getMessageCombinerClass(conf);
if (!combinerClass.equals(MessageCombiner.class)) {
List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
Type partialCombineValueType = argTypes.get(2);
conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
}
}
}
class FileFilter implements FilenameFilter {
private final String ext;
public FileFilter(String ext) {
this.ext = "." + ext;
}
@Override
public boolean accept(File dir, String name) {
return name.endsWith(ext);
}
}