blob: 38f14d6f8984bd795df293f261a4107f4b2a871d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.job;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.DefaultVertexValueFactory;
import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
/**
* GiraphConfigurationValidator attempts to verify the consistency of
* user-chosen InputFormat, OutputFormat, and Vertex generic type
* parameters as well as the general Configuration settings
* before the job run actually begins.
*
* @param <I> the Vertex ID type
* @param <V> the Vertex Value type
* @param <E> the Edge Value type
* @param <M1> the incoming Message type
* @param <M2> the outgoing Message type
*/
public class GiraphConfigurationValidator<I extends WritableComparable,
V extends Writable, E extends Writable, M1 extends Writable,
M2 extends Writable> {
/**
* Class logger object.
*/
private static Logger LOG =
Logger.getLogger(GiraphConfigurationValidator.class);
/** I param vertex index in classList */
private static final int ID_PARAM_INDEX = 0;
/** V param vertex index in classList */
private static final int VALUE_PARAM_INDEX = 1;
/** E param vertex index in classList */
private static final int EDGE_PARAM_INDEX = 2;
/** M param vertex combiner index in classList */
private static final int MSG_COMBINER_PARAM_INDEX = 1;
/** E param edge input format index in classList */
private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
/** E param vertex edges index in classList */
private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
/** V param vertex value factory index in classList */
private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
/** V param vertex value combiner index in classList */
private static final int VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX = 0;
/**
* The Configuration object for use in the validation test.
*/
private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor to execute the validation test, throws
* unchecked exception to end job run on failure.
*
* @param conf the Configuration for this run.
*/
public GiraphConfigurationValidator(Configuration conf) {
this.conf = new ImmutableClassesGiraphConfiguration(conf);
}
/**
* Get vertex id type
*
* @return vertex id type
*/
private Class<? extends WritableComparable> vertexIndexType() {
return conf.getGiraphTypes().getVertexIdClass();
}
/**
* Get vertex value type
*
* @return vertex value type
*/
private Class<? extends Writable> vertexValueType() {
return conf.getGiraphTypes().getVertexValueClass();
}
/**
* Get edge value type
*
* @return edge value type
*/
private Class<? extends Writable> edgeValueType() {
return conf.getGiraphTypes().getEdgeValueClass();
}
/**
* Get outgoing message value type
*
* @return outgoing message value type
*/
private Class<? extends Writable> outgoingMessageValueType() {
return conf.getOutgoingMessageValueClass();
}
/**
* Make sure that all registered classes have matching types. This
* is a little tricky due to type erasure, cannot simply get them from
* the class type arguments. Also, set the vertex index, vertex value,
* edge value and message value classes.
*/
public void validateConfiguration() {
checkConfiguration();
verifyOutEdgesGenericTypes();
verifyVertexInputFormatGenericTypes();
verifyEdgeInputFormatGenericTypes();
verifyVertexOutputFormatGenericTypes();
verifyEdgeOutputFormatGenericTypes();
verifyVertexResolverGenericTypes();
verifyVertexValueCombinerGenericTypes();
verifyMessageCombinerGenericTypes();
verifyVertexValueFactoryGenericTypes();
}
/**
* Make sure the configuration is set properly by the user prior to
* submitting the job.
*/
private void checkConfiguration() {
if (conf.getMaxWorkers() < 0) {
throw new RuntimeException("checkConfiguration: No valid " +
GiraphConstants.MAX_WORKERS);
}
if (conf.getMinPercentResponded() <= 0.0f ||
conf.getMinPercentResponded() > 100.0f) {
throw new IllegalArgumentException(
"checkConfiguration: Invalid " + conf.getMinPercentResponded() +
" for " + GiraphConstants.MIN_PERCENT_RESPONDED.getKey());
}
if (conf.getMinWorkers() < 0) {
throw new IllegalArgumentException("checkConfiguration: No valid " +
GiraphConstants.MIN_WORKERS);
}
conf.createComputationFactory().checkConfiguration(conf);
if (conf.getVertexInputFormatClass() == null &&
conf.getEdgeInputFormatClass() == null) {
throw new IllegalArgumentException("checkConfiguration: One of " +
GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey() + " and " +
GiraphConstants.EDGE_INPUT_FORMAT_CLASS.getKey() +
" must be non-null");
}
if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
VERTEX_RESOLVER_CLASS.getKey() +
", defaulting to " +
VERTEX_RESOLVER_CLASS.getDefaultClass().getCanonicalName());
}
}
if (conf.getOutEdgesClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
VERTEX_EDGES_CLASS.getKey() + ", defaulting to " +
VERTEX_EDGES_CLASS.getDefaultClass().getCanonicalName());
}
}
}
/**
* Verify matching generic types for a specific OutEdges class.
*
* @param outEdgesClass {@link org.apache.giraph.edge.OutEdges} class to check
*/
private void verifyOutEdgesGenericTypesClass(
Class<? extends OutEdges<I, E>> outEdgesClass) {
Class<?>[] classList = getTypeArguments(OutEdges.class, outEdgesClass);
checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
OutEdges.class, "vertex index");
checkAssignable(classList, EDGE_PARAM_OUT_EDGES_INDEX, edgeValueType(),
OutEdges.class, "edge value");
}
/** Verify matching generic types in OutEdges. */
private void verifyOutEdgesGenericTypes() {
Class<? extends OutEdges<I, E>> outEdgesClass =
conf.getOutEdgesClass();
Class<? extends OutEdges<I, E>> inputOutEdgesClass =
conf.getInputOutEdgesClass();
verifyOutEdgesGenericTypesClass(outEdgesClass);
verifyOutEdgesGenericTypesClass(inputOutEdgesClass);
}
/** Verify matching generic types in VertexInputFormat. */
private void verifyVertexInputFormatGenericTypes() {
Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
conf.getVertexInputFormatClass();
if (vertexInputFormatClass != null) {
Class<?>[] classList =
getTypeArguments(VertexInputFormat.class, vertexInputFormatClass);
checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
VertexInputFormat.class, "vertex index");
checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
VertexInputFormat.class, "vertex value");
checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
VertexInputFormat.class, "edge value");
}
}
/** Verify matching generic types in EdgeInputFormat. */
private void verifyEdgeInputFormatGenericTypes() {
Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
conf.getEdgeInputFormatClass();
if (edgeInputFormatClass != null) {
Class<?>[] classList =
getTypeArguments(EdgeInputFormat.class, edgeInputFormatClass);
checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
EdgeInputFormat.class, "vertex index");
checkAssignable(classList, EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX,
edgeValueType(), EdgeInputFormat.class, "edge value");
}
}
/**
* If there is a vertex value combiner type, verify its
* generic params match the job.
*/
private void verifyVertexValueCombinerGenericTypes() {
Class<? extends VertexValueCombiner<V>> vertexValueCombiner =
conf.getVertexValueCombinerClass();
if (vertexValueCombiner != null) {
Class<?>[] classList =
getTypeArguments(VertexValueCombiner.class, vertexValueCombiner);
checkAssignable(classList, VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX,
vertexValueType(), VertexValueCombiner.class, "vertex value");
}
}
/**
* If there is a message combiner type, verify its
* generic params match the job.
*/
private void verifyMessageCombinerGenericTypes() {
MessageCombiner<I, M2> messageCombiner =
conf.createOutgoingMessageCombiner();
if (messageCombiner != null) {
Class<?>[] classList =
getTypeArguments(MessageCombiner.class, messageCombiner.getClass());
checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
MessageCombiner.class, "vertex index");
checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
outgoingMessageValueType(), MessageCombiner.class, "message value");
}
}
/** Verify that the vertex output format's generic params match the job. */
private void verifyVertexOutputFormatGenericTypes() {
Class<? extends EdgeOutputFormat<I, V, E>>
edgeOutputFormatClass = conf.getEdgeOutputFormatClass();
if (conf.hasEdgeOutputFormat()) {
Class<?>[] classList =
getTypeArguments(EdgeOutputFormat.class, edgeOutputFormatClass);
checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
VertexOutputFormat.class, "vertex index");
checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
VertexOutputFormat.class, "vertex value");
checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
VertexOutputFormat.class, "edge value");
}
}
/** Verify that the edge output format's generic params match the job. */
private void verifyEdgeOutputFormatGenericTypes() {
Class<? extends VertexOutputFormat<I, V, E>>
vertexOutputFormatClass = conf.getVertexOutputFormatClass();
if (conf.hasVertexOutputFormat()) {
Class<?>[] classList =
getTypeArguments(VertexOutputFormat.class, vertexOutputFormatClass);
checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
VertexOutputFormat.class, "vertex index");
checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
VertexOutputFormat.class, "vertex value");
checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
VertexOutputFormat.class, "edge value");
}
}
/** Verify that the vertex value factory's type matches the job */
private void verifyVertexValueFactoryGenericTypes() {
Class<? extends VertexValueFactory<V>>
vvfClass = conf.getVertexValueFactoryClass();
if (DefaultVertexValueFactory.class.equals(vvfClass)) {
return;
}
Class<?>[] classList = getTypeArguments(VertexValueFactory.class, vvfClass);
checkEquals(classList, VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX,
vertexValueType(), VertexValueFactory.class, "vertex value");
}
/**
* If there is a vertex resolver,
* validate the generic parameter types.
* */
private void verifyVertexResolverGenericTypes() {
Class<? extends VertexResolver<I, V, E>>
vrClass = conf.getVertexResolverClass();
if (DefaultVertexResolver.class.equals(vrClass)) {
return;
}
Class<?>[] classList =
getTypeArguments(VertexResolver.class, vrClass);
checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
VertexResolver.class, "vertex index");
checkEquals(classList, VALUE_PARAM_INDEX, vertexValueType(),
VertexResolver.class, "vertex value");
checkEquals(classList, EDGE_PARAM_INDEX, edgeValueType(),
VertexResolver.class, "edge value");
}
/**
* Check that the type from computation equals the type from the class.
*
* @param classList classes from type
* @param index array index of class to check
* @param classFromComputation class from computation
* @param klass Class type we're checking, only used for printing name
* @param typeName Name of type we're checking
*/
private static void checkEquals(Class<?>[] classList, int index,
Class<?> classFromComputation, Class klass, String typeName) {
if (classList[index] == null) {
LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
} else if (!classList[index].equals(classFromComputation)) {
throw new IllegalStateException(
"checkClassTypes: " + typeName + " types not equal, " +
"computation - " + classFromComputation +
", " + klass.getSimpleName() + " - " +
classList[index]);
}
}
/**
* Check that the type from computation is assignable to type from the class.
*
* @param classList classes from type
* @param index array index of class to check
* @param classFromComputation class from computation
* @param klass Class type we're checking, only used for printing name
* @param typeName Name of type we're checking
*/
private static void checkAssignable(Class<?>[] classList, int index,
Class<?> classFromComputation, Class klass, String typeName) {
if (classList[index] == null) {
LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
} else if (!classList[index].isAssignableFrom(classFromComputation)) {
throw new IllegalStateException(
"checkClassTypes: " + typeName + " types not assignable, " +
"computation - " + classFromComputation +
", " + klass.getSimpleName() + " - " +
classList[EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX]);
}
}
}