blob: 54b6bbdc44c7b6beedbe6a984ee731bd7a85fd2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.job;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
import java.lang.reflect.Type;
import java.util.List;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
/**
* GiraphConfigurationValidator attempts to verify the consistency of
* user-chosen InputFormat, OutputFormat, and Vertex generic type
* parameters as well as the general Configuration settings
* before the job run actually begins.
*
* @param <I> the Vertex ID type
* @param <V> the Vertex Value type
* @param <E> the Edge Value type
* @param <M> the Message type
*/
public class GiraphConfigurationValidator<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
/**
* Class logger object.
*/
private static Logger LOG =
Logger.getLogger(GiraphConfigurationValidator.class);
/** I param vertex index in classList */
private static final int ID_PARAM_INDEX = 0;
/** V param vertex index in classList */
private static final int VALUE_PARAM_INDEX = 1;
/** E param vertex index in classList */
private static final int EDGE_PARAM_INDEX = 2;
/** M param vertex index in classList */
private static final int MSG_PARAM_INDEX = 3;
/** M param vertex combiner index in classList */
private static final int MSG_COMBINER_PARAM_INDEX = 1;
/** E param edge input format index in classList */
private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
/** E param vertex edges index in classList */
private static final int EDGE_PARAM_VERTEX_EDGES_INDEX = 1;
/** V param vertex value factory index in classList */
private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
/** Vertex Index Type */
private Type vertexIndexType;
/** Vertex Index Type */
private Type vertexValueType;
/** Vertex Index Type */
private Type edgeValueType;
/** Vertex Index Type */
private Type messageValueType;
/**
* The Configuration object for use in the validation test.
*/
private ImmutableClassesGiraphConfiguration conf;
/**
* Constructor to execute the validation test, throws
* unchecked exception to end job run on failure.
*
* @param conf the Configuration for this run.
*/
public GiraphConfigurationValidator(Configuration conf) {
this.conf = new ImmutableClassesGiraphConfiguration(conf);
}
/**
* Make sure that all registered classes have matching types. This
* is a little tricky due to type erasure, cannot simply get them from
* the class type arguments. Also, set the vertex index, vertex value,
* edge value and message value classes.
*/
public void validateConfiguration() {
checkConfiguration();
Class<? extends Vertex<I, V, E, M>> vertexClass =
conf.getVertexClass();
List<Class<?>> classList = ReflectionUtils.getTypeArguments(
Vertex.class, vertexClass);
vertexIndexType = classList.get(ID_PARAM_INDEX);
vertexValueType = classList.get(VALUE_PARAM_INDEX);
edgeValueType = classList.get(EDGE_PARAM_INDEX);
messageValueType = classList.get(MSG_PARAM_INDEX);
verifyVertexEdgesGenericTypes();
verifyVertexInputFormatGenericTypes();
verifyEdgeInputFormatGenericTypes();
verifyVertexOutputFormatGenericTypes();
verifyVertexResolverGenericTypes();
verifyVertexCombinerGenericTypes();
verifyVertexValueFactoryGenericTypes();
}
/**
* Make sure the configuration is set properly by the user prior to
* submitting the job.
*/
private void checkConfiguration() {
if (conf.getMaxWorkers() < 0) {
throw new RuntimeException("checkConfiguration: No valid " +
GiraphConstants.MAX_WORKERS);
}
if (conf.getMinPercentResponded() <= 0.0f ||
conf.getMinPercentResponded() > 100.0f) {
throw new IllegalArgumentException(
"checkConfiguration: Invalid " + conf.getMinPercentResponded() +
" for " + GiraphConstants.MIN_PERCENT_RESPONDED.getKey());
}
if (conf.getMinWorkers() < 0) {
throw new IllegalArgumentException("checkConfiguration: No valid " +
GiraphConstants.MIN_WORKERS);
}
if (conf.getVertexClass() == null) {
throw new IllegalArgumentException("checkConfiguration: Null " +
GiraphConstants.VERTEX_CLASS.getKey());
}
if (conf.getVertexInputFormatClass() == null &&
conf.getEdgeInputFormatClass() == null) {
throw new IllegalArgumentException("checkConfiguration: One of " +
GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey() + " and " +
GiraphConstants.EDGE_INPUT_FORMAT_CLASS.getKey() +
" must be non-null");
}
if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
VERTEX_RESOLVER_CLASS.getKey() +
", defaulting to " +
VERTEX_RESOLVER_CLASS.getDefaultClass().getCanonicalName());
}
}
if (conf.getVertexEdgesClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
VERTEX_EDGES_CLASS.getKey() + ", defaulting to " +
VERTEX_EDGES_CLASS.getDefaultClass().getCanonicalName());
}
}
}
/**
* Verify matching generic types for a specific VertexEdges class.
*
* @param vertexEdgesClass {@link VertexEdges} class to check
*/
private void verifyVertexEdgesGenericTypesClass(
Class<? extends VertexEdges<I, E>> vertexEdgesClass) {
List<Class<?>> classList = ReflectionUtils.getTypeArguments(
VertexEdges.class, vertexEdgesClass);
// VertexEdges implementations can be generic, in which case there are no
// types to check.
if (classList.isEmpty()) {
return;
}
if (classList.get(ID_PARAM_INDEX) != null &&
!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
", vertex edges - " + classList.get(ID_PARAM_INDEX));
}
if (classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX) != null &&
!edgeValueType.equals(classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Edge value types don't match, " +
"vertex - " + edgeValueType +
", vertex edges - " +
classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX));
}
}
/** Verify matching generic types in VertexEdges. */
private void verifyVertexEdgesGenericTypes() {
Class<? extends VertexEdges<I, E>> vertexEdgesClass =
conf.getVertexEdgesClass();
Class<? extends VertexEdges<I, E>> inputVertexEdgesClass =
conf.getInputVertexEdgesClass();
verifyVertexEdgesGenericTypesClass(vertexEdgesClass);
verifyVertexEdgesGenericTypesClass(inputVertexEdgesClass);
}
/** Verify matching generic types in VertexInputFormat. */
private void verifyVertexInputFormatGenericTypes() {
Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
conf.getVertexInputFormatClass();
if (vertexInputFormatClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
VertexInputFormat.class, vertexInputFormatClass);
if (classList.get(ID_PARAM_INDEX) == null) {
LOG.warn("Input format vertex index type is not known");
} else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
", vertex input format - " + classList.get(ID_PARAM_INDEX));
}
if (classList.get(VALUE_PARAM_INDEX) == null) {
LOG.warn("Input format vertex value type is not known");
} else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex value types don't match, " +
"vertex - " + vertexValueType +
", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
}
if (classList.get(EDGE_PARAM_INDEX) == null) {
LOG.warn("Input format edge value type is not known");
} else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Edge value types don't match, " +
"vertex - " + edgeValueType +
", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
}
}
}
/** Verify matching generic types in EdgeInputFormat. */
private void verifyEdgeInputFormatGenericTypes() {
Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
conf.getEdgeInputFormatClass();
if (edgeInputFormatClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
EdgeInputFormat.class, edgeInputFormatClass);
if (classList.get(ID_PARAM_INDEX) == null) {
LOG.warn("Input format vertex index type is not known");
} else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
", edge input format - " + classList.get(ID_PARAM_INDEX));
}
if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
LOG.warn("Input format edge value type is not known");
} else if (!edgeValueType.equals(
classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Edge value types don't match, " +
"vertex - " + edgeValueType +
", edge input format - " +
classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
}
}
}
/** If there is a combiner type, verify its generic params match the job. */
private void verifyVertexCombinerGenericTypes() {
Class<? extends Combiner<I, M>> vertexCombinerClass =
conf.getCombinerClass();
if (vertexCombinerClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
Combiner.class, vertexCombinerClass);
if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
", vertex combiner - " + classList.get(ID_PARAM_INDEX));
}
if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Message value types don't match, " +
"vertex - " + messageValueType +
", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
}
}
}
/** Verify that the output format's generic params match the job. */
private void verifyVertexOutputFormatGenericTypes() {
Class<? extends VertexOutputFormat<I, V, E>>
vertexOutputFormatClass = conf.getVertexOutputFormatClass();
if (vertexOutputFormatClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
VertexOutputFormat.class, vertexOutputFormatClass);
if (classList.get(ID_PARAM_INDEX) == null) {
LOG.warn("Output format vertex index type is not known");
} else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
", vertex output format - " + classList.get(ID_PARAM_INDEX));
}
if (classList.get(VALUE_PARAM_INDEX) == null) {
LOG.warn("Output format vertex value type is not known");
} else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex value types don't match, " +
"vertex - " + vertexValueType +
", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
}
if (classList.get(EDGE_PARAM_INDEX) == null) {
LOG.warn("Output format edge value type is not known");
} else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Edge value types don't match, " +
"vertex - " + edgeValueType +
", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
}
}
}
/** Verify that the vertex value factory's type matches the job */
private void verifyVertexValueFactoryGenericTypes() {
Class<? extends VertexValueFactory<V>>
vvfClass = conf.getVertexValueFactoryClass();
if (DefaultVertexValueFactory.class.equals(vvfClass)) {
return;
}
List<Class<?>> classList = ReflectionUtils.getTypeArguments(
VertexValueFactory.class, vvfClass);
if (classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX) != null &&
!vertexValueType.equals(
classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex value types don't match, " +
"vertex - " + vertexValueType +
", vertex value factory - " +
classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX));
}
}
/** If there is a vertex resolver,
* validate the generic parameter types. */
private void verifyVertexResolverGenericTypes() {
Class<? extends VertexResolver<I, V, E, M>>
vrClass = conf.getVertexResolverClass();
if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
return;
}
Class<? extends DefaultVertexResolver<I, V, E, M>>
dvrClass =
(Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
DefaultVertexResolver.class, dvrClass);
if (classList.get(ID_PARAM_INDEX) != null &&
!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
", vertex resolver - " + classList.get(ID_PARAM_INDEX));
}
if (classList.get(VALUE_PARAM_INDEX) != null &&
!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Vertex value types don't match, " +
"vertex - " + vertexValueType +
", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
}
if (classList.get(EDGE_PARAM_INDEX) != null &&
!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Edge value types don't match, " +
"vertex - " + edgeValueType +
", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
}
if (classList.get(MSG_PARAM_INDEX) != null &&
!messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Message value types don't match, " +
"vertex - " + messageValueType +
", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
}
}
}