blob: 90a5859e9927dfe6efc38077260443a9c10a38d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.utils;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.zk.InProcessZooKeeperRunner;
import org.apache.giraph.zk.ZookeeperConfig;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* A base class for running internal tests on a vertex
*
* Extending classes only have to invoke the run() method to test their vertex.
* All data is written to a local tmp directory that is removed afterwards.
* A local zookeeper instance is started in an extra thread and
* shutdown at the end.
*
* Heavily inspired from Apache Mahout's MahoutTestCase
*/
@SuppressWarnings("unchecked")
public class InternalVertexRunner {
/** Logger */
private static final Logger LOG =
Logger.getLogger(InternalVertexRunner.class);
/** Don't construct */
private InternalVertexRunner() { }
/**
* Attempts to run the vertex internally in the current JVM, reading from and
* writing to a temporary folder on local disk. Will start its own zookeeper
* instance.
*
* @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
* @return linewise output data, or null if job fails
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
GiraphConfiguration conf,
String[] vertexInputData) throws Exception {
return run(conf, vertexInputData, null);
}
/**
* Run the ZooKeeper in-process and the job.
*
* @param zookeeperConfig Quorum peer configuration
* @param giraphJob Giraph job to run
* @return True if successful, false otherwise
*/
private static boolean runZooKeeperAndJob(
final ZookeeperConfig zookeeperConfig,
GiraphJob giraphJob) throws IOException {
final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
new InProcessZooKeeperRunner.ZooKeeperServerRunner();
int port = zookeeper.start(zookeeperConfig);
LOG.info("Started test zookeeper on port " + port);
GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
"localhost:" + port);
try {
return giraphJob.run(true);
} catch (InterruptedException |
ClassNotFoundException | IOException e) {
LOG.error("runZooKeeperAndJob: Got exception on running", e);
} finally {
zookeeper.stop();
}
return false;
}
/**
* Attempts to run the vertex internally in the current JVM, reading from and
* writing to a temporary folder on local disk. Will start its own zookeeper
* instance.
*
*
* @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
* @param edgeInputData linewise edge input data
* @return linewise output data, or null if job fails
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
GiraphConfiguration conf,
String[] vertexInputData,
String[] edgeInputData) throws Exception {
// Prepare input file, output folder and temporary folders
File tmpDir = FileUtils.createTestDir(conf.getComputationName());
try {
return run(conf, vertexInputData, edgeInputData, null, tmpDir);
} finally {
FileUtils.delete(tmpDir);
}
}
/**
* Attempts to run the vertex internally in the current JVM, reading from and
* writing to a temporary folder on local disk. Will start its own zookeeper
* instance.
*
*
* @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
* @param edgeInputData linewise edge input data
* @param checkpointsDir if set, will use this folder
* for storing checkpoints.
* @param tmpDir file path for storing temporary files.
* @return linewise output data, or null if job fails
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
GiraphConfiguration conf,
String[] vertexInputData,
String[] edgeInputData,
String checkpointsDir,
File tmpDir) throws Exception {
File vertexInputFile = null;
File edgeInputFile = null;
if (conf.hasVertexInputFormat()) {
vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
}
if (conf.hasEdgeInputFormat()) {
edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
}
File outputDir = FileUtils.createTempDir(tmpDir, "output");
File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
// Write input data to disk
if (conf.hasVertexInputFormat()) {
FileUtils.writeLines(vertexInputFile, vertexInputData);
}
if (conf.hasEdgeInputFormat()) {
FileUtils.writeLines(edgeInputFile, edgeInputData);
}
conf.setWorkerConfiguration(1, 1, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
zkMgrDir.toString());
if (checkpointsDir == null) {
checkpointsDir = FileUtils.createTempDir(
tmpDir, "_checkpoints").toString();
}
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
// Create and configure the job to run the vertex
GiraphJob job = new GiraphJob(conf, conf.getComputationName());
Job internalJob = job.getInternalJob();
if (conf.hasVertexInputFormat()) {
GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
new Path(vertexInputFile.toString()));
}
if (conf.hasEdgeInputFormat()) {
GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
new Path(edgeInputFile.toString()));
}
FileOutputFormat.setOutputPath(job.getInternalJob(),
new Path(outputDir.toString()));
// Configure a local zookeeper instance
ZookeeperConfig qpConfig = configLocalZooKeeper(zkDir);
boolean success = runZooKeeperAndJob(qpConfig, job);
if (!success) {
return null;
}
File outFile = new File(outputDir, "part-m-00000");
if (conf.hasVertexOutputFormat() && outFile.canRead()) {
return Files.readLines(outFile, Charsets.UTF_8);
} else {
return ImmutableList.of();
}
}
/**
* Attempts to run the vertex internally in the current JVM,
* reading from an in-memory graph. Will start its own zookeeper
* instance.
*
* @param <I> Vertex ID
* @param <V> Vertex Value
* @param <E> Edge Value
* @param conf GiraphClasses specifying which types to use
* @param graph input graph
* @throws Exception if anything goes wrong
*/
public static <I extends WritableComparable,
V extends Writable,
E extends Writable> void run(
GiraphConfiguration conf,
TestGraph<I, V, E> graph) throws Exception {
// Prepare temporary folders
File tmpDir = FileUtils.createTestDir(conf.getComputationName());
try {
run(conf, graph, tmpDir, null);
} finally {
FileUtils.delete(tmpDir);
}
}
/**
* Attempts to run the vertex internally in the current JVM,
* reading from an in-memory graph. Will start its own zookeeper
* instance.
*
* @param <I> Vertex ID
* @param <V> Vertex Value
* @param <E> Edge Value
* @param conf GiraphClasses specifying which types to use
* @param graph input graph
* @param tmpDir file path for storing temporary files.
* @param checkpointsDir if set, will use this folder
* for storing checkpoints.
* @throws Exception if anything goes wrong
*/
public static <I extends WritableComparable,
V extends Writable,
E extends Writable> void run(
GiraphConfiguration conf,
TestGraph<I, V, E> graph,
File tmpDir,
String checkpointsDir) throws Exception {
File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
if (checkpointsDir == null) {
checkpointsDir = FileUtils.
createTempDir(tmpDir, "_checkpoints").toString();
}
conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
// Create and configure the job to run the vertex
GiraphJob job = new GiraphJob(conf, conf.getComputationName());
InMemoryVertexInputFormat.setGraph(graph);
conf.setWorkerConfiguration(1, 1, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
GiraphConstants.ZOOKEEPER_SERVER_PORT.set(conf, 0);
conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
zkMgrDir.toString());
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
runZooKeeperAndJob(configLocalZooKeeper(zkDir), job);
}
/**
* Attempts to run the vertex internally in the current JVM, reading and
* writing to an in-memory graph. Will start its own zookeeper
* instance.
*
* @param <I> Vertex ID
* @param <V> Vertex Value
* @param <E> Edge Value
* @param conf GiraphClasses specifying which types to use
* @param graph input graph
* @return Output graph
* @throws Exception if anything goes wrong
*/
public static <I extends WritableComparable,
V extends Writable,
E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
GiraphConfiguration conf,
TestGraph<I, V, E> graph) throws Exception {
// Prepare temporary folders
File tmpDir = FileUtils.createTestDir(conf.getComputationName());
try {
return runWithInMemoryOutput(conf, graph, tmpDir, null);
} finally {
FileUtils.delete(tmpDir);
}
}
/**
* Attempts to run the vertex internally in the current JVM, reading and
* writing to an in-memory graph. Will start its own zookeeper
* instance.
*
* @param <I> Vertex ID
* @param <V> Vertex Value
* @param <E> Edge Value
* @param conf GiraphClasses specifying which types to use
* @param graph input graph
* @param tmpDir file path for storing temporary files.
* @param checkpointsDir if set, will use this folder
* for storing checkpoints.
* @return Output graph
* @throws Exception if anything goes wrong
*/
public static <I extends WritableComparable,
V extends Writable,
E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
GiraphConfiguration conf,
TestGraph<I, V, E> graph,
File tmpDir,
String checkpointsDir) throws Exception {
conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
InMemoryVertexOutputFormat.initializeOutputGraph(conf);
InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
return InMemoryVertexOutputFormat.getOutputGraph();
}
/**
* Configuration options for running local ZK.
*
* @param zkDir directory for ZK to hold files in.
* @return zookeeper configuration object
*/
private static ZookeeperConfig configLocalZooKeeper(File zkDir) {
ZookeeperConfig config = new ZookeeperConfig();
config.setMaxSessionTimeout(100000);
config.setMinSessionTimeout(10000);
config.setClientPortAddress(new InetSocketAddress("localhost", 0));
config.setDataDir(zkDir.getAbsolutePath());
return config;
}
}