blob: 5db84215e90d47ac2fca19f8c16171f6d66212c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.utils;
import com.google.common.base.Charsets;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.giraph.graph.GiraphJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.Writer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* A base class for running internal tests on a vertex
*
* Extending classes only have to invoke the run() method to test their vertex. All data
* is written to a local tmp directory that is removed afterwards. A local zookeeper
* instance is started in an extra thread and shutdown at the end.
*
* Heavily inspired from Apache Mahout's MahoutTestCase
*/
public class InternalVertexRunner {
public static final int LOCAL_ZOOKEEPER_PORT = 22182;
private InternalVertexRunner() {
}
/**
* Attempts to run the vertex internally in the current JVM, reading from and writing to a
* temporary folder on local disk. Will start an own zookeeper instance.
*
* @param vertexClass the vertex class to instantiate
* @param vertexInputFormatClass the inputformat to use
* @param vertexOutputFormatClass the outputformat to use
* @param params a map of parameters to add to the hadoop configuration
* @param data linewise input data
* @return linewise output data
* @throws Exception
*/
public static Iterable<String> run(Class<?> vertexClass,
Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
Map<String, String> params, String... data) throws Exception {
return run(vertexClass, null, vertexInputFormatClass,
vertexOutputFormatClass, params, data);
}
/**
* Attempts to run the vertex internally in the current JVM, reading from and writing to a
* temporary folder on local disk. Will start an own zookeeper instance.
*
* @param vertexClass the vertex class to instantiate
* @param vertexCombinerClass the vertex combiner to use (or null)
* @param vertexInputFormatClass the inputformat to use
* @param vertexOutputFormatClass the outputformat to use
* @param params a map of parameters to add to the hadoop configuration
* @param data linewise input data
* @return linewise output data
* @throws Exception
*/
public static Iterable<String> run(Class<?> vertexClass,
Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass,
Class<?> vertexOutputFormatClass, Map<String, String> params,
String... data) throws Exception {
File tmpDir = null;
try {
// prepare input file, output folder and zookeeper folder
tmpDir = createTestDir(vertexClass);
File inputFile = createTempFile(tmpDir, "graph.txt");
File outputDir = createTempDir(tmpDir, "output");
File zkDir = createTempDir(tmpDir, "zooKeeper");
// write input data to disk
writeLines(inputFile, data);
// create and configure the job to run the vertex
GiraphJob job = new GiraphJob(vertexClass.getName());
job.setVertexClass(vertexClass);
job.setVertexInputFormatClass(vertexInputFormatClass);
job.setVertexOutputFormatClass(vertexOutputFormatClass);
if (vertexCombinerClass != null) {
job.setVertexCombinerClass(vertexCombinerClass);
}
job.setWorkerConfiguration(1, 1, 100.0f);
Configuration conf = job.getConfiguration();
conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true);
conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
String.valueOf(LOCAL_ZOOKEEPER_PORT));
for (Map.Entry<String,String> param : params.entrySet()) {
conf.set(param.getKey(), param.getValue());
}
FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
FileOutputFormat.setOutputPath(job, new Path(outputDir.toString()));
// configure a local zookeeper instance
Properties zkProperties = new Properties();
zkProperties.setProperty("tickTime", "2000");
zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
zkProperties.setProperty("clientPort",
String.valueOf(LOCAL_ZOOKEEPER_PORT));
zkProperties.setProperty("maxClientCnxns", "10000");
zkProperties.setProperty("minSessionTimeout", "10000");
zkProperties.setProperty("maxSessionTimeout", "100000");
zkProperties.setProperty("initLimit", "10");
zkProperties.setProperty("syncLimit", "5");
zkProperties.setProperty("snapCount", "50000");
QuorumPeerConfig qpConfig = new QuorumPeerConfig();
qpConfig.parseProperties(zkProperties);
// create and run the zookeeper instance
final InternalZooKeeper zookeeper = new InternalZooKeeper();
final ServerConfig zkConfig = new ServerConfig();
zkConfig.readFrom(qpConfig);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
zookeeper.runFromConfig(zkConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
try {
job.run(true);
} finally {
executorService.shutdown();
zookeeper.end();
}
return Files.readLines(new File(outputDir, "part-m-00000"),
Charsets.UTF_8);
} finally {
if (tmpDir != null) {
new DeletingVisitor().accept(tmpDir);
}
}
}
/**
* Create a temporary folder that will be removed after the test
*/
private static final File createTestDir(Class<?> vertexClass)
throws IOException {
String systemTmpDir = System.getProperty("java.io.tmpdir");
long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
File testTempDir = new File(systemTmpDir, "giraph-" +
vertexClass.getSimpleName() + '-' + simpleRandomLong);
if (!testTempDir.mkdir()) {
throw new IOException("Could not create " + testTempDir);
}
testTempDir.deleteOnExit();
return testTempDir;
}
private static final File createTempFile(File parent, String name)
throws IOException {
return createTestTempFileOrDir(parent, name, false);
}
private static final File createTempDir(File parent, String name)
throws IOException {
File dir = createTestTempFileOrDir(parent, name, true);
dir.delete();
return dir;
}
private static File createTestTempFileOrDir(File parent, String name,
boolean dir) throws IOException {
File f = new File(parent, name);
f.deleteOnExit();
if (dir && !f.mkdirs()) {
throw new IOException("Could not make directory " + f);
}
return f;
}
private static void writeLines(File file, String... lines)
throws IOException {
Writer writer = Files.newWriter(file, Charsets.UTF_8);
try {
for (String line : lines) {
writer.write(line);
writer.write('\n');
}
} finally {
Closeables.closeQuietly(writer);
}
}
private static class DeletingVisitor implements FileFilter {
@Override
public boolean accept(File f) {
if (!f.isFile()) {
f.listFiles(this);
}
f.delete();
return false;
}
}
/**
* Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
*/
private static class InternalZooKeeper extends ZooKeeperServerMain {
void end() {
shutdown();
}
}
}