GIRAPH-595: InternalVertexRunner.run() should take GiraphConfiguration, not GiraphClasses (nitay via apresta)
diff --git a/CHANGELOG b/CHANGELOG
index 4f4b7b9..05245de 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-595: InternalVertexRunner.run() should take GiraphConfiguration, not GiraphClasses
+ (nitay via apresta)
+
GIRAPH-364: Clean up directories created by test suite (majakabiljo)
GIRAPH-614: SplitMasterWorker=false is broken (majakabiljo)
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 77fa83c..3a755c8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -46,7 +46,7 @@
CommandLine cmd) {
conf.setVertexClass(PageRankVertex.class);
conf.setVertexEdgesClass(IntNullArrayEdges.class);
- conf.setVertexCombinerClass(FloatSumCombiner.class);
+ conf.setCombinerClass(FloatSumCombiner.class);
conf.setVertexInputFormatClass(
PseudoRandomIntNullVertexInputFormat.class);
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 58d3fee..df53ee6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -67,7 +67,7 @@
LOG.info("Using class " + GiraphConstants.VERTEX_CLASS.get(conf));
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
if (!NO_COMBINER.optionTurnedOn(cmd)) {
- conf.setVertexCombinerClass(MinimumDoubleCombiner.class);
+ conf.setCombinerClass(MinimumDoubleCombiner.class);
}
conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
BenchmarkOption.VERTICES.getOptionLongValue(cmd));
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
index 4c76996..9de10b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
@@ -116,7 +116,7 @@
LOG.info("Using edges class " +
GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
- configuration.setVertexCombinerClass(DoubleSumCombiner.class);
+ configuration.setCombinerClass(DoubleSumCombiner.class);
}
if (EDGE_INPUT.optionTurnedOn(cmd)) {
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
index d67e0a5..41d120b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
@@ -146,6 +146,24 @@
}
/**
+ * Set classes for this key
+ * @param conf Configuration
+ * @param klasses Classes to set
+ */
+ public void setMany(Configuration conf, Class<? extends C> ... klasses) {
+ String[] klassNames = new String[klasses.length];
+ for (int i = 0; i < klasses.length; ++i) {
+ Class<?> klass = klasses[i];
+ if (!interfaceClass.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " does not implement " +
+ interfaceClass.getName());
+ }
+ klassNames[i] = klasses[i].getName();
+ }
+ conf.setStrings(getKey(), klassNames);
+ }
+
+ /**
* Add class to list for key
* @param conf Configuration
* @param klass Class to add
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 0aeec40..c527595 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -67,6 +67,15 @@
}
/**
+ * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
+ *
+ * @return User's vertex class
+ */
+ public Class<? extends Vertex> getVertexClass() {
+ return VERTEX_CLASS.get(this);
+ }
+
+ /**
* Set the vertex class (required)
*
* @param vertexClass Runs vertex computation
@@ -154,6 +163,15 @@
}
/**
+ * Does the job have a {@link VertexInputFormat}?
+ *
+ * @return True iff a {@link VertexInputFormat} has been specified.
+ */
+ public boolean hasVertexInputFormat() {
+ return VERTEX_INPUT_FORMAT_CLASS.get(this) != null;
+ }
+
+ /**
* Set the vertex input format class (required)
*
* @param vertexInputFormatClass Determines how graph is input
@@ -164,6 +182,15 @@
}
/**
+ * Does the job have a {@link EdgeInputFormat}?
+ *
+ * @return True iff a {@link EdgeInputFormat} has been specified.
+ */
+ public boolean hasEdgeInputFormat() {
+ return EDGE_INPUT_FORMAT_CLASS.get(this) != null;
+ }
+
+ /**
* Set the edge input format class (required)
*
* @param edgeInputFormatClass Determines how graph is input
@@ -252,6 +279,15 @@
}
/**
+ * Does the job have a {@link VertexOutputFormat}?
+ *
+ * @return True iff a {@link VertexOutputFormat} has been specified.
+ */
+ public boolean hasVertexOutputFormat() {
+ return VERTEX_OUTPUT_FORMAT_CLASS.get(this) != null;
+ }
+
+ /**
* Set the vertex output format class (optional)
*
* @param vertexOutputFormatClass Determines how graph is output
@@ -326,11 +362,20 @@
}
/**
+ * Get the vertex combiner class (optional)
+ *
+ * @return vertexCombinerClass Determines how vertex messages are combined
+ */
+ public Class<? extends Combiner> getCombinerClass() {
+ return VERTEX_COMBINER_CLASS.get(this);
+ }
+
+ /**
* Set the vertex combiner class (optional)
*
* @param vertexCombinerClass Determines how vertex messages are combined
*/
- public final void setVertexCombinerClass(
+ public final void setCombinerClass(
Class<? extends Combiner> vertexCombinerClass) {
VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f6619c1..dc3a26f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -57,8 +57,6 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Progressable;
-import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
-
/**
* The classes set here are immutable, the remaining configuration is mutable.
* Classes are immutable and final to provide the best performance for
@@ -110,6 +108,23 @@
}
/**
+ * Create a new ImmutableClassesGiraphConfiguration. This is a convenience
+ * method to make it easier to deal with generics.
+ *
+ * @param conf Configuration to read
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ * @return new ImmutableClassesGiraphConfiguration
+ */
+ public static <I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ ImmutableClassesGiraphConfiguration<I, V, E, M> create(Configuration conf) {
+ return new ImmutableClassesGiraphConfiguration<I, V, E, M>(conf);
+ }
+
+ /**
* Configure an object with this instance if the object is configurable.
* @param obj Object
*/
@@ -141,11 +156,7 @@
return ReflectionUtils.newInstance(klass, this);
}
- /**
- * Does the job have a {@link VertexInputFormat}?
- *
- * @return True iff a {@link VertexInputFormat} has been specified.
- */
+ @Override
public boolean hasVertexInputFormat() {
return classes.hasVertexInputFormat();
}
@@ -172,11 +183,7 @@
return ReflectionUtils.newInstance(klass, this);
}
- /**
- * Does the job have a {@link VertexOutputFormat}?
- *
- * @return True iff a {@link VertexOutputFormat} has been specified.
- */
+ @Override
public boolean hasVertexOutputFormat() {
return classes.hasVertexOutputFormat();
}
@@ -223,11 +230,7 @@
}
}
- /**
- * Does the job have an {@link EdgeInputFormat}?
- *
- * @return True iff an {@link EdgeInputFormat} has been specified.
- */
+ @Override
public boolean hasEdgeInputFormat() {
return classes.hasEdgeInputFormat();
}
@@ -382,11 +385,7 @@
return ReflectionUtils.newInstance(getMasterComputeClass(), this);
}
- /**
- * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
- *
- * @return User's vertex class
- */
+ @Override
public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
return classes.getVertexClass();
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java
index fc4baa4..7474aa3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java
@@ -86,13 +86,23 @@
*/
public static void addVertexInputPath(Configuration conf,
Path path) throws IOException {
- path = path.getFileSystem(conf).makeQualified(path);
- String dirStr = StringUtils.escapeString(path.toString());
+ String dirStr = pathToDirString(conf, path);
String dirs = conf.get(VERTEX_INPUT_DIR);
conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
/**
+ * Set the {@link Path} for vertex input.
+ * @param conf Configuration to store in
+ * @param path {@link Path} to set
+ * @throws IOException on I/O errors
+ */
+ public static void setVertexInputPath(Configuration conf,
+ Path path) throws IOException {
+ conf.set(VERTEX_INPUT_DIR, pathToDirString(conf, path));
+ }
+
+ /**
* Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
*
* @param conf the Configuration to store the input paths
@@ -101,13 +111,38 @@
*/
public static void addEdgeInputPath(Configuration conf,
Path path) throws IOException {
- path = path.getFileSystem(conf).makeQualified(path);
- String dirStr = StringUtils.escapeString(path.toString());
+ String dirStr = pathToDirString(conf, path);
String dirs = conf.get(EDGE_INPUT_DIR);
conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
/**
+ * Set the {@link Path} for edge input.
+ * @param conf Configuration to store in
+ * @param path {@link Path} to set
+ * @throws IOException on I/O errors
+ */
+ public static void setEdgeInputPath(Configuration conf,
+ Path path) throws IOException {
+ conf.set(EDGE_INPUT_DIR, pathToDirString(conf, path));
+ }
+
+ /**
+ * Convert from a Path to a string.
+ * This makes the path fully qualified and does escaping.
+ *
+ * @param conf Configuration to use
+ * @param path Path to convert
+ * @return String of escaped dir
+ * @throws IOException on I/O errors
+ */
+ private static String pathToDirString(Configuration conf, Path path)
+ throws IOException {
+ path = path.getFileSystem(conf).makeQualified(path);
+ return StringUtils.escapeString(path.toString());
+ }
+
+ /**
* Get the list of vertex input {@link Path}s.
*
* @param context The job
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index de7ea97..c9a52d1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -210,9 +210,8 @@
giraphConfiguration.setVertexClass(
(Class<? extends Vertex>) Class.forName(vertexClassName));
if (cmd.hasOption("c")) {
- giraphConfiguration.setVertexCombinerClass(
- (Class<? extends Combiner>)
- Class.forName(cmd.getOptionValue("c")));
+ giraphConfiguration.setCombinerClass(
+ (Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
}
if (cmd.hasOption("ve")) {
giraphConfiguration.setVertexEdgesClass(
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index e389e01..bb60436 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -18,16 +18,17 @@
package org.apache.giraph.utils;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
@@ -38,7 +39,7 @@
import java.io.File;
import java.io.IOException;
-import java.util.Map;
+import java.lang.reflect.Field;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -58,6 +59,10 @@
/** ZooKeeper port to use for tests */
public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(InternalVertexRunner.class);
+
/** Don't construct */
private InternalVertexRunner() { }
@@ -66,17 +71,16 @@
* writing to a temporary folder on local disk. Will start its own zookeeper
* instance.
*
- * @param classes GiraphClasses specifying which types to use
- * @param params a map of parameters to add to the hadoop configuration
+ *
+ * @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
* @return linewise output data
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
- GiraphClasses classes,
- Map<String, String> params,
+ GiraphConfiguration conf,
String[] vertexInputData) throws Exception {
- return run(classes, params, vertexInputData, null);
+ return run(conf, vertexInputData, null);
}
/**
@@ -84,29 +88,28 @@
* writing to a temporary folder on local disk. Will start its own zookeeper
* instance.
*
- * @param classes GiraphClasses specifying which types to use
- * @param params a map of parameters to add to the hadoop configuration
+ *
+ * @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
* @param edgeInputData linewise edge input data
* @return linewise output data
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
- GiraphClasses classes,
- Map<String, String> params,
+ GiraphConfiguration conf,
String[] vertexInputData,
String[] edgeInputData) throws Exception {
File tmpDir = null;
try {
// Prepare input file, output folder and temporary folders
- tmpDir = FileUtils.createTestDir(classes.getVertexClass());
+ tmpDir = FileUtils.createTestDir(conf.getVertexClass());
File vertexInputFile = null;
File edgeInputFile = null;
- if (classes.hasVertexInputFormat()) {
+ if (conf.hasVertexInputFormat()) {
vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
}
- if (classes.hasEdgeInputFormat()) {
+ if (conf.hasEdgeInputFormat()) {
edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
}
@@ -116,42 +119,13 @@
File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
// Write input data to disk
- if (classes.hasVertexInputFormat()) {
+ if (conf.hasVertexInputFormat()) {
FileUtils.writeLines(vertexInputFile, vertexInputData);
}
- if (classes.hasEdgeInputFormat()) {
+ if (conf.hasEdgeInputFormat()) {
FileUtils.writeLines(edgeInputFile, edgeInputData);
}
- // Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
- GiraphConfiguration conf = job.getConfiguration();
- conf.setVertexClass(classes.getVertexClass());
- conf.setVertexEdgesClass(classes.getVertexEdgesClass());
- conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass());
- conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
- if (classes.hasVertexInputFormat()) {
- conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
- }
- if (classes.hasEdgeInputFormat()) {
- conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass());
- }
- if (classes.hasVertexOutputFormat()) {
- conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
- }
- if (classes.hasWorkerContextClass()) {
- conf.setWorkerContextClass(classes.getWorkerContextClass());
- }
- if (classes.hasPartitionContextClass()) {
- conf.setPartitionContextClass(classes.getPartitionContextClass());
- }
- if (classes.hasCombinerClass()) {
- conf.setVertexCombinerClass(classes.getCombinerClass());
- }
- if (classes.hasMasterComputeClass()) {
- conf.setMasterComputeClass(classes.getMasterComputeClass());
- }
-
conf.setWorkerConfiguration(1, 1, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
@@ -163,17 +137,16 @@
zkMgrDir.toString());
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
- for (Map.Entry<String, String> param : params.entrySet()) {
- conf.set(param.getKey(), param.getValue());
- }
+ // Create and configure the job to run the vertex
+ GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName());
Job internalJob = job.getInternalJob();
- if (classes.hasVertexInputFormat()) {
- GiraphFileInputFormat.addVertexInputPath(internalJob.getConfiguration(),
+ if (conf.hasVertexInputFormat()) {
+ GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
new Path(vertexInputFile.toString()));
}
- if (classes.hasEdgeInputFormat()) {
- GiraphFileInputFormat.addEdgeInputPath(internalJob.getConfiguration(),
+ if (conf.hasEdgeInputFormat()) {
+ GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
new Path(edgeInputFile.toString()));
}
FileOutputFormat.setOutputPath(job.getInternalJob(),
@@ -208,7 +181,7 @@
zookeeper.end();
}
- if (classes.hasVertexOutputFormat()) {
+ if (conf.hasVertexOutputFormat()) {
return Files.readLines(new File(outputDir, "part-m-00000"),
Charsets.UTF_8);
} else {
@@ -223,12 +196,12 @@
* Attempts to run the vertex internally in the current JVM, reading and
* writing to an in-memory graph. Will start its own zookeeper
* instance.
- * @param <I> The vertex index type
- * @param <V> The vertex type
- * @param <E> The edge type
- * @param <M> The message type
- * @param classes GiraphClasses specifying which types to use
- * @param params a map of parameters to add to the hadoop configuration
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ * @param conf GiraphClasses specifying which types to use
* @param graph input graph
* @return iterable output data
* @throws Exception if anything goes wrong
@@ -237,48 +210,22 @@
V extends Writable,
E extends Writable,
M extends Writable> TestGraph<I, V, E, M> run(
- GiraphClasses<I, V, E, M> classes,
- Map<String, String> params,
+ GiraphConfiguration conf,
TestGraph<I, V, E, M> graph) throws Exception {
File tmpDir = null;
try {
// Prepare temporary folders
- tmpDir = FileUtils.createTestDir(classes.getVertexClass());
+ tmpDir = FileUtils.createTestDir(conf.getVertexClass());
File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
// Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
+ GiraphJob job = new GiraphJob(conf.getVertexClass().getName());
InMemoryVertexInputFormat.setGraph(graph);
- GiraphConfiguration conf = job.getConfiguration();
- conf.setVertexClass(classes.getVertexClass());
- conf.setVertexEdgesClass(classes.getVertexEdgesClass());
- conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
- conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass());
- conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
- if (classes.hasWorkerContextClass()) {
- conf.setWorkerContextClass(classes.getWorkerContextClass());
- }
- if (classes.hasCombinerClass()) {
- conf.setVertexCombinerClass(classes.getCombinerClass());
- }
- if (classes.hasMasterComputeClass()) {
- conf.setMasterComputeClass(classes.getMasterComputeClass());
- }
- if (classes.hasVertexInputFormat()) {
- conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
- }
- if (classes.hasEdgeInputFormat()) {
- conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass());
- }
- if (classes.hasVertexOutputFormat()) {
- conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
- }
-
conf.setWorkerConfiguration(1, 1, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
@@ -290,10 +237,6 @@
zkMgrDir.toString());
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
- for (Map.Entry<String, String> param : params.entrySet()) {
- conf.set(param.getKey(), param.getValue());
- }
-
// Configure a local zookeeper instance
Properties zkProperties = configLocalZooKeeper(zkDir);
@@ -357,7 +300,27 @@
* Shutdown the ZooKeeper instance.
*/
void end() {
- shutdown();
+ if (getCnxnFactory() != null) {
+ shutdown();
+ }
+ }
+
+ /**
+ * Get the ZooKeeper connection factory using reflection.
+ * @return {@link NIOServerCnxn.Factory} from ZooKeeper
+ */
+ private NIOServerCnxn.Factory getCnxnFactory() {
+ NIOServerCnxn.Factory factory = null;
+ try {
+ Field field = ZooKeeperServerMain.class.getDeclaredField("cnxnFactory");
+ field.setAccessible(true);
+ factory = (NIOServerCnxn.Factory) field.get(this);
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ LOG.error("Couldn't get cnxn factory", e);
+ }
+ return factory;
}
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index e5c2fdd..3577a9e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -18,12 +18,6 @@
package org.apache.giraph.utils;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
@@ -36,6 +30,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
/**
* TestGraph class for in-memory testing.
*
@@ -57,11 +56,10 @@
/**
* Constructor requiring classes
*
- * @param classes Should have vertex and edge classes set.
+ * @param conf Should have vertex and edge classes set.
*/
- public TestGraph(GiraphClasses<I, V, E, M> classes) {
- super();
- setConf(classes);
+ public TestGraph(GiraphConfiguration conf) {
+ this.conf = new ImmutableClassesGiraphConfiguration(conf);
}
public HashMap<I, Vertex<I, V, E, M>> getVertices() {
@@ -100,8 +98,8 @@
*/
public TestGraph<I, V, E, M> addEdge(I vertexId, Entry<I, E> edgePair) {
if (!vertices.containsKey(vertexId)) {
- Vertex<I, V, E, M> v = getConf().createVertex();
- v.initialize(vertexId, getConf().createVertexValue());
+ Vertex<I, V, E, M> v = conf.createVertex();
+ v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
}
vertices.get(vertexId)
@@ -120,8 +118,8 @@
*/
public TestGraph<I, V, E, M> addEdge(I vertexId, I toVertex, E edgeValue) {
if (!vertices.containsKey(vertexId)) {
- Vertex<I, V, E, M> v = getConf().createVertex();
- v.initialize(vertexId, getConf().createVertexValue());
+ Vertex<I, V, E, M> v = conf.createVertex();
+ v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
}
vertices.get(vertexId)
@@ -182,34 +180,11 @@
protected Vertex<I, V, E, M> makeVertex(I id, V value,
Entry<I, E>... edges) {
@SuppressWarnings("unchecked")
- Vertex<I, V, E, M> vertex = getConf().createVertex();
+ Vertex<I, V, E, M> vertex = conf.createVertex();
vertex.initialize(id, value, createEdges(edges));
return vertex;
}
- /**
- * Get the configuration
- *
- * @return the configuration
- */
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- /**
- * Create a configuration from giraph classes
- *
- * @param classes Should have vertex and edge class set
- */
- public void setConf(GiraphClasses<I, V, E, M> classes) {
- GiraphConfiguration giraphConf = new GiraphConfiguration();
- giraphConf.setVertexClass(classes.getVertexClass());
- giraphConf.setVertexEdgesClass(classes.getVertexEdgesClass());
- giraphConf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
-
- conf = new ImmutableClassesGiraphConfiguration(giraphConf);
- }
-
@Override
public String toString() {
return Objects.toStringHelper(this).add("vertices", vertices).toString();
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index f3c3358..dbee7f0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -18,7 +18,6 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.job.GiraphJob;
@@ -141,58 +140,29 @@
* Prepare a GiraphJob for test purposes
*
* @param name identifying name for job
- * @param classes GiraphClasses describing which classes to use
+ * @param conf GiraphConfiguration describing which classes to use
* @return GiraphJob configured for testing
* @throws IOException if anything goes wrong
*/
- protected GiraphJob prepareJob(String name, GiraphClasses classes)
+ protected GiraphJob prepareJob(String name, GiraphConfiguration conf)
throws IOException {
- return prepareJob(name, classes, null);
+ return prepareJob(name, conf, null);
}
/**
* Prepare a GiraphJob for test purposes
*
* @param name identifying name for job
- * @param classes GiraphClasses describing which classes to use
+ * @param conf GiraphConfiguration describing which classes to use
* @param outputPath Where to right output to
* @return GiraphJob configured for testing
* @throws IOException if anything goes wrong
*/
- protected GiraphJob prepareJob(String name, GiraphClasses classes,
+ protected GiraphJob prepareJob(String name, GiraphConfiguration conf,
Path outputPath)
throws IOException {
- GiraphJob job = new GiraphJob(name);
+ GiraphJob job = new GiraphJob(conf, name);
setupConfiguration(job);
- GiraphConfiguration conf = job.getConfiguration();
- conf.setVertexClass(classes.getVertexClass());
- conf.setVertexEdgesClass(classes.getVertexEdgesClass());
- conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass());
- conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
- if (classes.hasAggregatorWriterClass()) {
- conf.setAggregatorWriterClass(classes.getAggregatorWriterClass());
- }
- if (classes.hasCombinerClass()) {
- conf.setVertexCombinerClass(classes.getCombinerClass());
- }
- if (classes.hasEdgeInputFormat()) {
- conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass());
- }
- if (classes.hasMasterComputeClass()) {
- conf.setMasterComputeClass(classes.getMasterComputeClass());
- }
- if (classes.hasVertexInputFormat()) {
- conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
- }
- if (classes.hasVertexOutputFormat()) {
- conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
- }
- if (classes.hasPartitionContextClass()) {
- conf.setPartitionContextClass(classes.getPartitionContextClass());
- }
- if (classes.hasWorkerContextClass()) {
- conf.setWorkerContextClass(classes.getWorkerContextClass());
- }
if (outputPath != null) {
removeAndSetOutput(job, outputPath);
}
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
index 0dcefd9..ede76f0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
@@ -18,10 +18,8 @@
package org.apache.giraph.io;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.giraph.BspCase;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.Edge;
@@ -36,6 +34,8 @@
import org.apache.hadoop.io.NullWritable;
import org.junit.Test;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.Map;
@@ -63,14 +63,12 @@
"4 1"
};
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(TestVertexWithNumEdges.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> params = ImmutableMap.of();
- Iterable<String> results = InternalVertexRunner.run(classes, params,
- null, edges);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(TestVertexWithNumEdges.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
Map<Integer, Integer> values = parseResults(results);
@@ -94,14 +92,12 @@
"4 1"
};
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(TestVertexWithNumEdges.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> params = ImmutableMap.of();
- Iterable<String> results = InternalVertexRunner.run(classes, params,
- null, edges);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(TestVertexWithNumEdges.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
Map<Integer, Integer> values = parseResults(results);
@@ -132,17 +128,15 @@
"5 3"
};
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(TestVertexDoNothing.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> emptyParams = ImmutableMap.of();
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(TestVertexDoNothing.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+ conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
// Run a job with a vertex that does nothing
- Iterable<String> results = InternalVertexRunner.run(classes, emptyParams,
- vertices, edges);
+ Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
Map<Integer, Integer> values = parseResults(results);
@@ -158,23 +152,22 @@
assertEquals(0, (int) values.get(5));
// Run a job with a custom VertexValueFactory
- classes.setVertexValueFactoryClass(TestVertexValueFactory.class);
- results = InternalVertexRunner.run(classes, emptyParams,
- vertices, edges);
+ conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
+ results = InternalVertexRunner.run(conf, vertices, edges);
values = parseResults(results);
// A vertex with edges but no initial value should have been constructed
// by the custom factory
assertEquals(3, (int) values.get(5));
- classes = new GiraphClasses();
- classes.setVertexClass(TestVertexWithNumEdges.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(TestVertexWithNumEdges.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+ conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
// Run a job with a vertex that counts outgoing edges
- results = InternalVertexRunner.run(classes, emptyParams, vertices, edges);
+ results = InternalVertexRunner.run(conf, vertices, edges);
values = parseResults(results);
@@ -196,15 +189,13 @@
"4 1"
};
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(TestVertexCheckEdgesType.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setInputVertexEdgesClass(TestVertexEdgesFilterEven.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> params = ImmutableMap.of();
- Iterable<String> results = InternalVertexRunner.run(classes, params,
- null, edges);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(TestVertexCheckEdgesType.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setInputVertexEdgesClass(TestVertexEdgesFilterEven.class);
+ conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
Map<Integer, Integer> values = parseResults(results);
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index ae9441e..a6aa78a 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -20,14 +20,13 @@
import org.apache.giraph.BspCase;
import org.apache.giraph.benchmark.WeightedPageRankVertex;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
import org.apache.giraph.job.GiraphJob;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
@@ -61,11 +60,11 @@
throws IOException, InterruptedException, ClassNotFoundException {
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(WeightedPageRankVertex.class);
- classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(WeightedPageRankVertex.class);
+ conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
job.getConfiguration().setLong(
PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
job.getConfiguration().setLong(
@@ -75,31 +74,28 @@
assertTrue(job.run(true));
Path outputPath2 = getTempPath(getCallingMethodName() + "2");
- classes = new GiraphClasses();
- classes.setVertexClass(WeightedPageRankVertex.class);
- classes.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
- classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
- job = prepareJob(getCallingMethodName(), classes, outputPath2);
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(WeightedPageRankVertex.class);
+ conf.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
+ conf.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
+ job = prepareJob(getCallingMethodName(), conf, outputPath2);
job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 3);
GiraphFileInputFormat.addVertexInputPath(
job.getInternalJob().getConfiguration(), outputPath);
assertTrue(job.run(true));
Path outputPath3 = getTempPath(getCallingMethodName() + "3");
- classes = new GiraphClasses();
- classes.setVertexClass(WeightedPageRankVertex.class);
- classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
- job = prepareJob(getCallingMethodName(), classes, outputPath3);
- job.getConfiguration().setLong(
- PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
- job.getConfiguration().setLong(
- PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
- job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 5);
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(WeightedPageRankVertex.class);
+ conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
+ job = prepareJob(getCallingMethodName(), conf, outputPath3);
+ conf = job.getConfiguration();
+ conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
+ conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
+ conf.setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 5);
assertTrue(job.run(true));
- Configuration conf = job.getConfiguration();
-
assertEquals(101, getNumResults(conf, outputPath));
assertEquals(101, getNumResults(conf, outputPath2));
assertEquals(101, getNumResults(conf, outputPath3));
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java b/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
index 8cd427e..1c34f78 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
@@ -18,7 +18,7 @@
package org.apache.giraph.master;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.graph.Vertex;
@@ -26,14 +26,11 @@
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
-import com.google.common.collect.Maps;
-
import java.io.IOException;
-import java.util.Map;
+import static org.apache.hadoop.util.StringUtils.arrayToString;
import static org.junit.Assert.assertEquals;
public class TestMasterObserver {
@@ -83,19 +80,18 @@
String[] graph = new String[] { "1", "2", "3" };
- Map<String, String> params = Maps.newHashMap();
String klasses[] = new String[] {
Obs.class.getName(),
Obs.class.getName()
};
- params.put(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
- StringUtils.arrayToString(klasses));
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(NoOpVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(IntNullNullTextInputFormat.class);
- InternalVertexRunner.run(classes, params, graph);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.set(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
+ arrayToString(klasses));
+ conf.setVertexClass(NoOpVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(IntNullNullTextInputFormat.class);
+ InternalVertexRunner.run(conf, graph);
assertEquals(2, Obs.preApp);
// 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
index 8a98e75..13d1d7c 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
@@ -18,6 +18,7 @@
package org.apache.giraph.examples;
+import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
@@ -36,9 +37,8 @@
Vertex<LongWritable, DoubleWritable,
FloatWritable, DoubleWritable> {
/** The shortest paths id */
- public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
- /** Default shortest paths id */
- public static final long SOURCE_ID_DEFAULT = 1;
+ public static final LongConfOption SOURCE_ID =
+ new LongConfOption("SimpleShortestPathsVertex.sourceId", 1);
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SimpleShortestPathsVertex.class);
@@ -49,7 +49,7 @@
* @return True if the source id
*/
private boolean isSource() {
- return getId().get() == getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ return getId().get() == SOURCE_ID.get(getConf());
}
@Override
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
index 386f67b..652913b 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
@@ -18,7 +18,6 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.SimpleCheckpointVertex;
@@ -58,18 +57,15 @@
return;
}
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
-
- GiraphConfiguration conf = job.getConfiguration();
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
conf.setInt("mapred.map.max.attempts", 4);
// Trigger failure faster
@@ -82,7 +78,7 @@
GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.set(conf, 10000);
GiraphConstants.ZOOKEEPER_MIN_SESSION_TIMEOUT.set(conf, 10000);
-
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
assertTrue(job.run(true));
}
}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index ab88b88..e034b2f 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -117,12 +117,11 @@
InvocationTargetException, SecurityException, NoSuchMethodException {
System.out.println("testInstantiateVertex: java.class.path=" +
System.getProperty("java.class.path"));
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleSuperstepVertex.class);
- classes.setVertexInputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleSuperstepVertex.class);
+ conf.setVertexInputFormatClass(
SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
ImmutableClassesGiraphConfiguration configuration =
new ImmutableClassesGiraphConfiguration(job.getConfiguration());
Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
@@ -194,11 +193,11 @@
"non-local");
return;
}
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(SimpleSuperstepVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- GiraphConfiguration conf = job.getConfiguration();
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleSuperstepVertex.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
+ conf = job.getConfiguration();
conf.setWorkerConfiguration(5, 5, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, true);
@@ -214,7 +213,8 @@
fail();
} catch (IllegalArgumentException e) {
}
- job.getConfiguration().setWorkerConfiguration(1, 1, 100.0f);
+
+ conf.setWorkerConfiguration(1, 1, 100.0f);
job.run(true);
}
@@ -235,11 +235,10 @@
return;
}
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleFailVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes,
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleFailVertex.class);
+ conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf,
getTempPath(getCallingMethodName()));
job.getConfiguration().setInt("mapred.map.max.attempts", 1);
assertTrue(!job.run(true));
@@ -257,18 +256,17 @@
throws IOException, InterruptedException, ClassNotFoundException {
String callingMethod = getCallingMethodName();
Path outputPath = getTempPath(callingMethod);
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleSuperstepVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob(callingMethod, classes, outputPath);
- Configuration conf = job.getConfiguration();
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleSuperstepVertex.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ GiraphJob job = prepareJob(callingMethod, conf, outputPath);
+ Configuration configuration = job.getConfiguration();
// GeneratedInputSplit will generate 10 vertices
- conf.setLong(GeneratedVertexReader.READER_VERTICES, 10);
+ configuration.setLong(GeneratedVertexReader.READER_VERTICES, 10);
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
- FileStatus fileStatus = getSinglePartFileStatus(conf, outputPath);
+ FileStatus fileStatus = getSinglePartFileStatus(configuration, outputPath);
assertEquals(49l, fileStatus.getLen());
}
}
@@ -283,11 +281,10 @@
@Test
public void testBspMsg()
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMsgVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleMsgVertex.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
assertTrue(job.run(true));
}
@@ -303,11 +300,10 @@
@Test
public void testEmptyVertexInputFormat()
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMsgVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleMsgVertex.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
assertTrue(job.run(true));
}
@@ -322,13 +318,11 @@
@Test
public void testBspCombiner()
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleCombinerVertex.class);
- classes.setVertexInputFormatClass(
- SimpleSuperstepVertexInputFormat.class);
- classes.setCombinerClass(SimpleSumCombiner.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleCombinerVertex.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setCombinerClass(SimpleSumCombiner.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
assertTrue(job.run(true));
}
@@ -385,15 +379,13 @@
public void testBspShortestPaths()
throws IOException, InterruptedException, ClassNotFoundException {
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleShortestPathsVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setVertexOutputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleShortestPathsVertex.class);
+ conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(
JsonLongDoubleFloatDoubleVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
- Configuration conf = job.getConfiguration();
- conf.setLong(SimpleShortestPathsVertex.SOURCE_ID, 0);
+ SimpleShortestPathsVertex.SOURCE_ID.set(conf, 0);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
assertTrue(job.run(true));
@@ -415,18 +407,17 @@
throws IOException, InterruptedException, ClassNotFoundException {
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimplePageRankVertex.class);
- classes.setAggregatorWriterClass(TextAggregatorWriter.class);
- classes.setMasterComputeClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimplePageRankVertex.class);
+ conf.setAggregatorWriterClass(TextAggregatorWriter.class);
+ conf.setMasterComputeClass(
SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setVertexOutputFormatClass(
+ conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(
SimplePageRankVertex.SimplePageRankVertexOutputFormat.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
GiraphConfiguration configuration = job.getConfiguration();
Path aggregatorValues = getTempPath("aggregatorValues");
configuration.setInt(TextAggregatorWriter.FREQUENCY,
@@ -507,15 +498,14 @@
@Test
public void testBspMasterCompute()
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMasterComputeVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setMasterComputeClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleMasterComputeVertex.class);
+ conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ conf.setMasterComputeClass(
SimpleMasterComputeVertex.SimpleMasterCompute.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
double finalSum =
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index f7fa3f2..12f0d8d 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -18,7 +18,7 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.SimpleCheckpointVertex;
@@ -75,16 +75,16 @@
public void testPartitioners()
throws IOException, InterruptedException, ClassNotFoundException {
Path outputPath = getTempPath("testVertexBalancer");
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob("testVertexBalancer", classes, outputPath);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ GiraphJob job = prepareJob("testVertexBalancer", conf, outputPath);
job.getConfiguration().set(
PartitionBalancer.PARTITION_BALANCE_ALGORITHM,
@@ -93,31 +93,31 @@
assertTrue(job.run(true));
FileSystem hdfs = FileSystem.get(job.getConfiguration());
- classes = new GiraphClasses();
- classes.setVertexClass(
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
outputPath = getTempPath("testHashPartitioner");
- job = prepareJob("testHashPartitioner", classes, outputPath);
+ job = prepareJob("testHashPartitioner", conf, outputPath);
assertTrue(job.run(true));
verifyOutput(hdfs, outputPath);
outputPath = getTempPath("testSuperstepHashPartitioner");
- classes = new GiraphClasses();
- classes.setVertexClass(
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- job = prepareJob("testSuperstepHashPartitioner", classes, outputPath);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job = prepareJob("testSuperstepHashPartitioner", conf, outputPath);
job.getConfiguration().setGraphPartitionerFactoryClass(
SuperstepHashPartitionerFactory.class);
@@ -145,16 +145,16 @@
verifyOutput(hdfs, outputPath);
outputPath = getTempPath("testReverseIdSuperstepHashPartitioner");
- classes = new GiraphClasses();
- classes.setVertexClass(
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- job = prepareJob("testReverseIdSuperstepHashPartitioner", classes,
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job = prepareJob("testReverseIdSuperstepHashPartitioner", conf,
outputPath);
job.getConfiguration().setGraphPartitionerFactoryClass(
SuperstepHashPartitionerFactory.class);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
index 210b78a..766e1af 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
@@ -18,7 +18,6 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.SimpleCheckpointVertex;
@@ -54,21 +53,21 @@
throws IOException, InterruptedException, ClassNotFoundException {
Path checkpointsDir = getTempPath("checkPointsForTesting");
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
- GiraphConfiguration conf = job.getConfiguration();
- GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
- conf.setCheckpointFrequency(2);
+ GiraphConfiguration configuration = job.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
+ configuration.setCheckpointFrequency(2);
assertTrue(job.run(true));
@@ -86,19 +85,19 @@
System.out.println("testBspCheckpoint: Restarting from superstep 2" +
" with checkpoint path = " + checkpointsDir);
outputPath = getTempPath(getCallingMethodName() + "Restarted");
- classes = new GiraphClasses();
- classes.setVertexClass(
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
- classes, outputPath);
conf.setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+ conf, outputPath);
+ configuration.setMasterComputeClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
checkpointsDir.toString());
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java b/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java
index d7ac4e8..0989ac5 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java
@@ -18,8 +18,7 @@
package org.apache.giraph;
-import java.io.IOException;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.counters.GiraphHadoopCounter;
import org.apache.giraph.counters.GiraphStats;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
@@ -31,6 +30,8 @@
import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
+import java.io.IOException;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -65,12 +66,11 @@
@Test
public void testMaxSuperstep()
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(InfiniteLoopVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes,
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(InfiniteLoopVertex.class);
+ conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf,
getTempPath(getCallingMethodName()));
job.getConfiguration().setMaxNumberOfSupersteps(3);
assertTrue(job.run(true));
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java b/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
index 0427b85..da85fc4 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
@@ -18,14 +18,11 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.examples.SimpleMutateGraphVertex;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
import java.io.IOException;
@@ -50,14 +47,13 @@
@Test
public void testMutateGraph()
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMutateGraphVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- classes.setWorkerContextClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimpleMutateGraphVertex.class);
+ conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ conf.setWorkerContextClass(
SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes,
+ GiraphJob job = prepareJob(getCallingMethodName(), conf,
getTempPath(getCallingMethodName()));
assertTrue(job.run(true));
}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java b/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
index 759624b..6da9c50 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
@@ -18,7 +18,7 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
@@ -55,12 +55,12 @@
return;
}
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(
SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
// An unlikely impossible number of workers to achieve
final int unlikelyWorkers = Short.MAX_VALUE;
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
index f236128..4b042df 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -18,15 +18,12 @@
package org.apache.giraph;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.PartitionContextTestVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
import org.apache.giraph.job.GiraphJob;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
import java.io.IOException;
@@ -46,16 +43,15 @@
"testComputeContext: Ignore this test in distributed mode.");
return;
}
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(PartitionContextTestVertex.class);
- classes.setVertexInputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(PartitionContextTestVertex.class);
+ conf.setVertexInputFormatClass(
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
PartitionContextTestVertex.TestPartitionContextWorkerContext.class);
- classes.setPartitionContextClass(
+ conf.setPartitionContextClass(
PartitionContextTestVertex.TestPartitionContextPartitionContext.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
// Use multithreading
job.getConfiguration().setNumComputeThreads(
PartitionContextTestVertex.NUM_COMPUTE_THREADS);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index 372c32f..53cdeab 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -20,7 +20,6 @@
import org.apache.giraph.BspCase;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -31,7 +30,6 @@
import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
@@ -76,12 +74,11 @@
@Test
public void testAggregatorsHandling() throws IOException,
ClassNotFoundException, InterruptedException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(AggregatorsTestVertex.class);
- classes.setVertexInputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(AggregatorsTestVertex.class);
+ conf.setVertexInputFormatClass(
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
job.getConfiguration().setMasterComputeClass(
AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
// test with aggregators split in a few requests
@@ -157,19 +154,18 @@
IOException, InterruptedException {
Path checkpointsDir = getTempPath("checkPointsForTesting");
Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(AggregatorsTestVertex.class);
- classes.setMasterComputeClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(AggregatorsTestVertex.class);
+ conf.setMasterComputeClass(
AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
- classes.setVertexInputFormatClass(
+ conf.setVertexInputFormatClass(
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
- GiraphConfiguration conf = job.getConfiguration();
- GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
- conf.setCheckpointFrequency(4);
+ GiraphConfiguration configuration = job.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
+ configuration.setCheckpointFrequency(4);
assertTrue(job.run(true));
@@ -177,14 +173,14 @@
System.out.println("testAggregatorsCheckpointing: Restarting from " +
"superstep 4 with checkpoint path = " + checkpointsDir);
outputPath = getTempPath(getCallingMethodName() + "Restarted");
- classes = new GiraphClasses();
- classes.setVertexClass(AggregatorsTestVertex.class);
- classes.setMasterComputeClass(
+ conf = new GiraphConfiguration();
+ conf.setVertexClass(AggregatorsTestVertex.class);
+ conf.setMasterComputeClass(
AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
- classes.setVertexInputFormatClass(
+ conf.setVertexInputFormatClass(
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
- classes, outputPath);
+ conf, outputPath);
job.getConfiguration().setMasterComputeClass(
SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
GiraphConfiguration restartedJobConf = restartedJob.getConfiguration();
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
index 55ca60c..5d3fae1 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
@@ -18,20 +18,19 @@
package org.apache.giraph.examples;
-import com.google.common.base.Splitter;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
import org.apache.giraph.combiner.MinimumIntCombiner;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntNullTextInputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.giraph.edge.ByteArrayEdges;
import org.junit.Test;
-import java.util.Map;
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
+
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -66,17 +65,15 @@
"9" };
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(ConnectedComponentsVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setCombinerClass(MinimumIntCombiner.class);
- classes.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> emptyParams = ImmutableMap.of();
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(ConnectedComponentsVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setCombinerClass(MinimumIntCombiner.class);
+ conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
// run internally
- Iterable<String> results = InternalVertexRunner.run(classes,
- emptyParams, graph);
+ Iterable<String> results = InternalVertexRunner.run(conf, graph);
SetMultimap<Integer,Integer> components = parseResults(results);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTestInMemory.java
index e42f172..f748cb0 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTestInMemory.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTestInMemory.java
@@ -18,22 +18,21 @@
package org.apache.giraph.examples;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.SetMultimap;
import org.apache.giraph.combiner.MinimumIntCombiner;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.giraph.utils.TestGraph;
-import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.junit.Test;
-import java.util.Map;
-import java.util.Map.Entry;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -57,19 +56,13 @@
*/
@Test
public void testToyData() throws Exception {
- GiraphClasses<IntWritable,
- IntWritable,
- NullWritable,
- IntWritable > classes = new GiraphClasses();
- classes.setVertexClass(ConnectedComponentsVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setCombinerClass(MinimumIntCombiner.class);
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(ConnectedComponentsVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setCombinerClass(MinimumIntCombiner.class);
TestGraph<IntWritable, IntWritable, NullWritable, IntWritable> graph =
- new TestGraph<IntWritable,
- IntWritable,
- NullWritable,
- IntWritable> (classes);
+ new TestGraph<IntWritable, IntWritable, NullWritable, IntWritable> (conf);
// a small graph with three components
graph.addVertex(new IntWritable(1), new IntWritable(1), makeEdges(2, 3))
.addVertex(new IntWritable(2), new IntWritable(2), makeEdges(1, 4, 5))
@@ -87,11 +80,9 @@
.addVertex(new IntWritable(11), new IntWritable(11), makeEdges(7, 10))
.addVertex(new IntWritable(9), new IntWritable(9));
- Map<String, String> emptyParams = ImmutableMap.of();
-
// run internally
TestGraph<IntWritable, IntWritable, NullWritable, IntWritable> results =
- InternalVertexRunner.run(classes, emptyParams, graph);
+ InternalVertexRunner.run(conf, graph);
SetMultimap<Integer,Integer> components = parseResults(results);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
index 042dbe4..2e4bbc4 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
@@ -18,13 +18,9 @@
package org.apache.giraph.examples;
-import com.google.common.collect.Maps;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.junit.Test;
import java.util.Map;
@@ -52,25 +48,19 @@
"5 2 4"
};
- Map<String, String> params = Maps.newHashMap();
- params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, String.valueOf(50));
- params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY,
- String.valueOf(0.15));
-
- GiraphClasses<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
- classes = new GiraphClasses<LongWritable, DoubleWritable,
- NullWritable, DoubleWritable>();
- classes.setVertexClass(PageRankVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(
- LongDoubleNullTextInputFormat.class);
- classes.setVertexOutputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setInt(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, 50);
+ conf.setFloat(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, 0.15f);
+ conf.setVertexClass(PageRankVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(LongDoubleNullTextInputFormat.class);
+ conf.setVertexOutputFormatClass(
VertexWithDoubleValueNullEdgeTextOutputFormat.class);
- classes.setWorkerContextClass(RandomWalkWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setWorkerContextClass(RandomWalkWorkerContext.class);
+ conf.setMasterComputeClass(
RandomWalkVertex.RandomWalkVertexMasterCompute.class);
// Run internally
- Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+ Iterable<String> results = InternalVertexRunner.run(conf, graph);
Map<Long, Double> steadyStateProbabilities =
RandomWalkTestUtils.parseSteadyStateProbabilities(results);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
index f77ea3e..a3dbd45 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -18,16 +18,12 @@
package org.apache.giraph.examples;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.examples.RandomWalkVertex.RandomWalkVertexMasterCompute;
import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
-import com.google.common.collect.Maps;
-
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -42,28 +38,22 @@
*/
@Test
public void testToyData() throws Exception {
-
// A small graph
String[] graph = new String[] { "12 34 56", "34 78", "56 34 78", "78 34" };
- Map<String, String> params = Maps.newHashMap();
- params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
- params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
- params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.25");
-
- GiraphClasses<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable>
- classes = new GiraphClasses<LongWritable, DoubleWritable,
- DoubleWritable, DoubleWritable>();
- classes.setVertexClass(RandomWalkWithRestartVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(
- LongDoubleDoubleTextInputFormat.class);
- classes.setVertexOutputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setInt(RandomWalkWithRestartVertex.SOURCE_VERTEX, 12);
+ conf.setInt(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, 30);
+ conf.setFloat(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, 0.25f);
+ conf.setVertexClass(RandomWalkWithRestartVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(LongDoubleDoubleTextInputFormat.class);
+ conf.setVertexOutputFormatClass(
VertexWithDoubleValueDoubleEdgeTextOutputFormat.class);
- classes.setWorkerContextClass(RandomWalkWorkerContext.class);
- classes.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
+ conf.setWorkerContextClass(RandomWalkWorkerContext.class);
+ conf.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
// Run internally
- Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+ Iterable<String> results = InternalVertexRunner.run(conf, graph);
Map<Long, Double> steadyStateProbabilities =
RandomWalkTestUtils.parseSteadyStateProbabilities(results);
@@ -87,24 +77,20 @@
new String[] { "12 34:0.1 56:0.9", "34 78:0.9 56:0.1",
"56 12:0.1 34:0.8 78:0.1", "78 34:1.0" };
- Map<String, String> params = Maps.newHashMap();
- params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
- params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
- params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.15");
-
- GiraphClasses<LongWritable, DoubleWritable, DoubleWritable,
- DoubleWritable> classes = new GiraphClasses<LongWritable,
- DoubleWritable, DoubleWritable, DoubleWritable>();
- classes.setVertexClass(RandomWalkWithRestartVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setInt(RandomWalkWithRestartVertex.SOURCE_VERTEX, 12);
+ conf.setInt(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, 30);
+ conf.setFloat(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, 0.15f);
+ conf.setVertexClass(RandomWalkWithRestartVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(
NormalizingLongDoubleDoubleTextInputFormat.class);
- classes.setVertexOutputFormatClass(
+ conf.setVertexOutputFormatClass(
VertexWithDoubleValueDoubleEdgeTextOutputFormat.class);
- classes.setWorkerContextClass(RandomWalkWorkerContext.class);
- classes.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
+ conf.setWorkerContextClass(RandomWalkWorkerContext.class);
+ conf.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
// Run internally
- Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+ Iterable<String> results = InternalVertexRunner.run(conf, graph);
Map<Long, Double> steadyStateProbabilities =
RandomWalkTestUtils.parseSteadyStateProbabilities(results);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
index e51b611..ee99dc5 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
@@ -18,16 +18,13 @@
package org.apache.giraph.examples;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.giraph.utils.MockUtils;
-import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -36,8 +33,13 @@
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.util.Map;
+import static org.apache.giraph.examples.SimpleShortestPathsVertex.SOURCE_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -58,9 +60,7 @@
DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
new LongWritable(7L), new DoubleWritable(Double.MAX_VALUE), false);
- Mockito.when(env.getConfiguration().getLong(
- SimpleShortestPathsVertex.SOURCE_ID,
- SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+ Mockito.when(SOURCE_ID.get(env.getConfiguration())).thenReturn(2L);
vertex.addEdge(EdgeFactory.create(
new LongWritable(10L), new FloatWritable(2.5f)));
@@ -90,9 +90,7 @@
DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
new LongWritable(7L), new DoubleWritable(0.5), false);
- Mockito.when(env.getConfiguration().getLong(
- SimpleShortestPathsVertex.SOURCE_ID,
- SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+ Mockito.when(SOURCE_ID.get(env.getConfiguration())).thenReturn(2L);
vertex.addEdge(EdgeFactory.create(new LongWritable(10L),
new FloatWritable(2.5f)));
@@ -122,21 +120,18 @@
"[4,0,[]]"
};
+ GiraphConfiguration conf = new GiraphConfiguration();
// start from vertex 1
- Map<String, String> params = Maps.newHashMap();
- params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
-
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleShortestPathsVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setVertexInputFormatClass(
+ SOURCE_ID.set(conf, 1);
+ conf.setVertexClass(SimpleShortestPathsVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setVertexInputFormatClass(
JsonLongDoubleFloatDoubleVertexInputFormat.class);
- classes.setVertexOutputFormatClass(
+ conf.setVertexOutputFormatClass(
JsonLongDoubleFloatDoubleVertexOutputFormat.class);
// run internally
- Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+ Iterable<String> results = InternalVertexRunner.run(conf, graph);
Map<Long, Double> distances = parseDistances(results);
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
index 448afe6..922e736 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -18,13 +18,9 @@
package org.apache.giraph.examples;
import org.apache.giraph.BspCase;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.job.GiraphJob;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
import java.io.IOException;
@@ -67,22 +63,20 @@
*/
private void testPageRank(int numComputeThreads)
throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimplePageRankVertex.class);
- classes.setVertexInputFormatClass(
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(SimplePageRankVertex.class);
+ conf.setVertexInputFormatClass(
SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
- classes.setWorkerContextClass(
+ conf.setWorkerContextClass(
SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
- classes.setMasterComputeClass(
+ conf.setMasterComputeClass(
SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- GiraphConfiguration conf = job.getConfiguration();
conf.setNumComputeThreads(numComputeThreads);
// Set enough partitions to generate randomness on the compute side
if (numComputeThreads != 1) {
GiraphConstants.USER_PARTITION_COUNT.set(conf, numComputeThreads * 5);
}
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
double maxPageRank =
@@ -97,7 +91,7 @@
" numComputeThreads=" + numComputeThreads);
assertEquals(34.03, maxPageRank, 0.001);
assertEquals(0.03, minPageRank, 0.00001);
- assertEquals(5l, numVertices);
+ assertEquals(5L, numVertices);
}
}
}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
index 48153ae..4945b52 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -19,21 +19,19 @@
package org.apache.giraph.examples;
import org.apache.giraph.combiner.MinimumIntCombiner;
-import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntNullTextInputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.giraph.edge.ByteArrayEdges;
import org.junit.Test;
import com.google.common.base.Splitter;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
-import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -70,19 +68,15 @@
// run internally
// fail the first port binding attempt
- Map<String, String> params = Maps.<String, String>newHashMap();
- params.put(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.getKey(),
- "true");
+ GiraphConfiguration conf = new GiraphConfiguration();
+ GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.set(conf, true);
+ conf.setVertexClass(ConnectedComponentsVertex.class);
+ conf.setVertexEdgesClass(ByteArrayEdges.class);
+ conf.setCombinerClass(MinimumIntCombiner.class);
+ conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(ConnectedComponentsVertex.class);
- classes.setVertexEdgesClass(ByteArrayEdges.class);
- classes.setCombinerClass(MinimumIntCombiner.class);
- classes.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
- Iterable<String> results = InternalVertexRunner.run(classes, params,
- graph);
+ Iterable<String> results = InternalVertexRunner.run(conf, graph);
SetMultimap<Integer,Integer> components = parseResults(results);