blob: 4d35ca5a45483cce8a272919e6896a27e024ee15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework;
import java.util.List;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.conf.BulkConfigurator;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.StrConfOption;
import org.apache.giraph.edge.IdAndNullArrayEdges;
import org.apache.giraph.edge.IdAndValueArrayEdges;
import org.apache.giraph.edge.LongDiffNullArrayEdges;
import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
import org.apache.giraph.types.ops.TypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* Default block factory abstract class, providing default methods that need
* to be/can be overridden for specifying required/most common parameters,
* to simplify setting properties.
*
* @param <S> Execution stage type
*/
public abstract class AbstractBlockFactory<S> implements BlockFactory<S> {
/**
* Comma separated list of BulkConfigurators, that are going to be called
* to simplify specifying of large number of properties.
*/
public static final StrConfOption CONFIGURATORS = new StrConfOption(
"digraph.block_factory_configurators", null, "");
@Override
public List<String> getGcJavaOpts(Configuration conf) {
return null;
}
@Override
public final void initConfig(GiraphConfiguration conf) {
initConfigurators(conf);
GiraphConstants.VERTEX_ID_CLASS.setIfUnset(conf, getVertexIDClass(conf));
GiraphConstants.VERTEX_VALUE_CLASS.setIfUnset(
conf, getVertexValueClass(conf));
GiraphConstants.EDGE_VALUE_CLASS.setIfUnset(conf, getEdgeValueClass(conf));
GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.setIfUnset(
conf, shouldCreateVertexOnMsgs(conf));
if (shouldSendOneMessageToAll(conf)) {
GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset(
conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION);
}
BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.setIfUnset(
conf, getWorkerContextValueClass(conf));
// optimize edge structure, if available and not set already
if (!GiraphConstants.VERTEX_EDGES_CLASS.contains(conf)) {
@SuppressWarnings("rawtypes")
Class<? extends WritableComparable> vertexIDClass =
GiraphConstants.VERTEX_ID_CLASS.get(conf);
Class<? extends Writable> edgeValueClass =
GiraphConstants.EDGE_VALUE_CLASS.get(conf);
@SuppressWarnings("rawtypes")
PrimitiveIdTypeOps<? extends WritableComparable> idTypeOps =
TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIDClass);
if (edgeValueClass.equals(NullWritable.class)) {
if (vertexIDClass.equals(LongWritable.class)) {
GiraphConstants.VERTEX_EDGES_CLASS.set(
conf, LongDiffNullArrayEdges.class);
} else if (idTypeOps != null) {
GiraphConstants.VERTEX_EDGES_CLASS.set(
conf, IdAndNullArrayEdges.class);
}
} else {
TypeOps<?> edgeValueTypeOps =
TypeOpsUtils.getTypeOpsOrNull(edgeValueClass);
if (edgeValueTypeOps != null && idTypeOps != null) {
GiraphConstants.VERTEX_EDGES_CLASS.set(
conf, IdAndValueArrayEdges.class);
}
}
}
additionalInitConfig(conf);
}
@Override
public void registerOutputs(GiraphConfiguration conf) {
}
private void initConfigurators(GiraphConfiguration conf) {
String configurators = CONFIGURATORS.get(conf);
if (configurators != null) {
String[] split = configurators.split(",");
for (String configurator : split) {
runConfigurator(conf, configurator);
}
}
}
private void runConfigurator(GiraphConfiguration conf, String configurator) {
String[] packages = getConvenienceConfiguratorPackages();
String[] prefixes = new String[packages.length + 1];
prefixes[0] = "";
for (int i = 0; i < packages.length; i++) {
prefixes[i + 1] = packages[i] + ".";
}
for (String prefix : prefixes) {
try {
@SuppressWarnings({ "unchecked", "rawtypes" })
Class<BulkConfigurator> confClass =
(Class) Class.forName(prefix + configurator);
BulkConfigurator c = ReflectionUtils.newInstance(confClass);
c.configure(conf);
return;
// CHECKSTYLE: stop EmptyBlock
// ignore ClassNotFoundException, and continue the loop
} catch (ClassNotFoundException e) {
}
// CHECKSTYLE: resume EmptyBlock
}
throw new IllegalStateException(
"Configurator " + configurator + " not found");
}
/**
* Additional configuration initialization, other then overriding
* class specification.
*/
protected void additionalInitConfig(GiraphConfiguration conf) {
}
/**
* Concrete vertex id class application will use.
*/
@SuppressWarnings("rawtypes")
protected abstract Class<? extends WritableComparable> getVertexIDClass(
GiraphConfiguration conf);
/**
* Concrete vertex value class application will use.
*/
protected abstract Class<? extends Writable> getVertexValueClass(
GiraphConfiguration conf);
/**
* Concrete edge value class application will use.
*/
protected abstract Class<? extends Writable> getEdgeValueClass(
GiraphConfiguration conf);
/**
* Concrete worker context value class application will use, if overridden.
*/
protected Class<?> getWorkerContextValueClass(GiraphConfiguration conf) {
return Object.class;
}
/**
* Override if vertices shouldn't be created by default, if message is sent
* to a vertex that doesn't exist.
*/
protected boolean shouldCreateVertexOnMsgs(GiraphConfiguration conf) {
return true;
}
// TODO - see if it should be deprecated
protected boolean shouldSendOneMessageToAll(GiraphConfiguration conf) {
return false;
}
/**
* Provide list of strings representing packages where configurators will
* be searched for, allowing that full path is not required for
* CONFIGURATORS option.
*/
protected String[] getConvenienceConfiguratorPackages() {
return new String[] { };
}
}