| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.giraph.conf; |
| |
| import org.apache.giraph.aggregators.AggregatorWriter; |
| import org.apache.giraph.aggregators.TextAggregatorWriter; |
| import org.apache.giraph.bsp.BspOutputFormat; |
| import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker; |
| import org.apache.giraph.bsp.checkpoints.DefaultCheckpointSupportedChecker; |
| import org.apache.giraph.combiner.MessageCombiner; |
| import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; |
| import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; |
| import org.apache.giraph.comm.messages.MessageStoreFactory; |
| import org.apache.giraph.edge.ByteArrayEdges; |
| import org.apache.giraph.edge.EdgeStoreFactory; |
| import org.apache.giraph.edge.InMemoryEdgeStoreFactory; |
| import org.apache.giraph.edge.OutEdges; |
| import org.apache.giraph.factories.ComputationFactory; |
| import org.apache.giraph.factories.DefaultComputationFactory; |
| import org.apache.giraph.factories.DefaultEdgeValueFactory; |
| import org.apache.giraph.factories.DefaultMessageValueFactory; |
| import org.apache.giraph.factories.DefaultVertexIdFactory; |
| import org.apache.giraph.factories.DefaultVertexValueFactory; |
| import org.apache.giraph.factories.EdgeValueFactory; |
| import org.apache.giraph.factories.MessageValueFactory; |
| import org.apache.giraph.factories.VertexIdFactory; |
| import org.apache.giraph.factories.VertexValueFactory; |
| import org.apache.giraph.graph.Computation; |
| import org.apache.giraph.graph.DefaultVertex; |
| import org.apache.giraph.graph.DefaultVertexResolver; |
| import org.apache.giraph.graph.DefaultVertexValueCombiner; |
| import org.apache.giraph.graph.Language; |
| import org.apache.giraph.graph.MapperObserver; |
| import org.apache.giraph.graph.Vertex; |
| import org.apache.giraph.graph.VertexResolver; |
| import org.apache.giraph.graph.VertexValueCombiner; |
| import org.apache.giraph.io.EdgeInputFormat; |
| import org.apache.giraph.io.EdgeOutputFormat; |
| import org.apache.giraph.io.MappingInputFormat; |
| import org.apache.giraph.io.VertexInputFormat; |
| import org.apache.giraph.io.VertexOutputFormat; |
| import org.apache.giraph.io.filters.DefaultEdgeInputFilter; |
| import org.apache.giraph.io.filters.DefaultVertexInputFilter; |
| import org.apache.giraph.io.filters.EdgeInputFilter; |
| import org.apache.giraph.io.filters.VertexInputFilter; |
| import org.apache.giraph.job.DefaultGiraphJobRetryChecker; |
| import org.apache.giraph.job.DefaultJobObserver; |
| import org.apache.giraph.job.DefaultJobProgressTrackerService; |
| import org.apache.giraph.job.GiraphJobObserver; |
| import org.apache.giraph.job.GiraphJobRetryChecker; |
| import org.apache.giraph.job.HaltApplicationUtils; |
| import org.apache.giraph.job.JobProgressTrackerService; |
| import org.apache.giraph.mapping.MappingStore; |
| import org.apache.giraph.mapping.MappingStoreOps; |
| import org.apache.giraph.mapping.translate.TranslateEdge; |
| import org.apache.giraph.master.DefaultMasterCompute; |
| import org.apache.giraph.master.MasterCompute; |
| import org.apache.giraph.master.MasterObserver; |
| import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; |
| import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor; |
| import org.apache.giraph.ooc.policy.OutOfCoreOracle; |
| import org.apache.giraph.ooc.policy.ThresholdBasedOracle; |
| import org.apache.giraph.partition.GraphPartitionerFactory; |
| import org.apache.giraph.partition.HashPartitionerFactory; |
| import org.apache.giraph.partition.Partition; |
| import org.apache.giraph.partition.SimplePartition; |
| import org.apache.giraph.worker.DefaultWorkerContext; |
| import org.apache.giraph.worker.WorkerContext; |
| import org.apache.giraph.worker.WorkerObserver; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| |
| /** |
| * Constants used all over Giraph for configuration. |
| */ |
| // CHECKSTYLE: stop InterfaceIsTypeCheck |
| public interface GiraphConstants { |
| /** 1KB in bytes */ |
| int ONE_KB = 1024; |
| /** 1MB in bytes */ |
| int ONE_MB = 1024 * 1024; |
| |
| /** Mapping related information */ |
| ClassConfOption<MappingStore> MAPPING_STORE_CLASS = |
| ClassConfOption.create("giraph.mappingStoreClass", null, |
| MappingStore.class, "MappingStore Class"); |
| |
| /** Class to use for performing read operations on mapping store */ |
| ClassConfOption<MappingStoreOps> MAPPING_STORE_OPS_CLASS = |
| ClassConfOption.create("giraph.mappingStoreOpsClass", null, |
| MappingStoreOps.class, "MappingStoreOps class"); |
| |
| /** Upper value of LongByteMappingStore */ |
| IntConfOption LB_MAPPINGSTORE_UPPER = |
| new IntConfOption("giraph.lbMappingStoreUpper", -1, |
| "'upper' value used by lbmappingstore"); |
| /** Lower value of LongByteMappingStore */ |
| IntConfOption LB_MAPPINGSTORE_LOWER = |
| new IntConfOption("giraph.lbMappingStoreLower", -1, |
| "'lower' value used by lbMappingstore"); |
| /** Class used to conduct expensive edge translation during vertex input */ |
| ClassConfOption EDGE_TRANSLATION_CLASS = |
| ClassConfOption.create("giraph.edgeTranslationClass", null, |
| TranslateEdge.class, "Class used to conduct expensive edge " + |
| "translation during vertex input phase"); |
| |
| /** Computation class - required */ |
| ClassConfOption<Computation> COMPUTATION_CLASS = |
| ClassConfOption.create("giraph.computationClass", null, |
| Computation.class, "Computation class - required"); |
| /** Computation factory class - optional */ |
| ClassConfOption<ComputationFactory> COMPUTATION_FACTORY_CLASS = |
| ClassConfOption.create("giraph.computation.factory.class", |
| DefaultComputationFactory.class, ComputationFactory.class, |
| "Computation factory class - optional"); |
| |
| /** TypesHolder, used if Computation not set - optional */ |
| ClassConfOption<TypesHolder> TYPES_HOLDER_CLASS = |
| ClassConfOption.create("giraph.typesHolder", null, |
| TypesHolder.class, |
| "TypesHolder, used if Computation not set - optional"); |
| |
| /** Edge Store Factory */ |
| ClassConfOption<EdgeStoreFactory> EDGE_STORE_FACTORY_CLASS = |
| ClassConfOption.create("giraph.edgeStoreFactoryClass", |
| InMemoryEdgeStoreFactory.class, |
| EdgeStoreFactory.class, |
| "Edge Store Factory class to use for creating edgeStore"); |
| |
| /** Message Store Factory */ |
| ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS = |
| ClassConfOption.create("giraph.messageStoreFactoryClass", |
| InMemoryMessageStoreFactory.class, |
| MessageStoreFactory.class, |
| "Message Store Factory Class that is to be used"); |
| |
| /** Language user's graph types are implemented in */ |
| PerGraphTypeEnumConfOption<Language> GRAPH_TYPE_LANGUAGES = |
| PerGraphTypeEnumConfOption.create("giraph.types.language", |
| Language.class, Language.JAVA, |
| "Language user graph types (IVEMM) are implemented in"); |
| |
| /** Whether user graph types need wrappers */ |
| PerGraphTypeBooleanConfOption GRAPH_TYPES_NEEDS_WRAPPERS = |
| new PerGraphTypeBooleanConfOption("giraph.jython.type.wrappers", |
| false, "Whether user graph types (IVEMM) need Jython wrappers"); |
| |
| /** Vertex id factory class - optional */ |
| ClassConfOption<VertexIdFactory> VERTEX_ID_FACTORY_CLASS = |
| ClassConfOption.create("giraph.vertexIdFactoryClass", |
| DefaultVertexIdFactory.class, VertexIdFactory.class, |
| "Vertex ID factory class - optional"); |
| /** Vertex value factory class - optional */ |
| ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS = |
| ClassConfOption.create("giraph.vertexValueFactoryClass", |
| DefaultVertexValueFactory.class, VertexValueFactory.class, |
| "Vertex value factory class - optional"); |
| /** Edge value factory class - optional */ |
| ClassConfOption<EdgeValueFactory> EDGE_VALUE_FACTORY_CLASS = |
| ClassConfOption.create("giraph.edgeValueFactoryClass", |
| DefaultEdgeValueFactory.class, EdgeValueFactory.class, |
| "Edge value factory class - optional"); |
| /** Outgoing message value factory class - optional */ |
| ClassConfOption<MessageValueFactory> |
| OUTGOING_MESSAGE_VALUE_FACTORY_CLASS = |
| ClassConfOption.create("giraph.outgoingMessageValueFactoryClass", |
| DefaultMessageValueFactory.class, MessageValueFactory.class, |
| "Outgoing message value factory class - optional"); |
| |
| /** Vertex edges class - optional */ |
| ClassConfOption<OutEdges> VERTEX_EDGES_CLASS = |
| ClassConfOption.create("giraph.outEdgesClass", ByteArrayEdges.class, |
| OutEdges.class, "Vertex edges class - optional"); |
| /** Vertex edges class to be used during edge input only - optional */ |
| ClassConfOption<OutEdges> INPUT_VERTEX_EDGES_CLASS = |
| ClassConfOption.create("giraph.inputOutEdgesClass", |
| ByteArrayEdges.class, OutEdges.class, |
| "Vertex edges class to be used during edge input only - optional"); |
| |
| /** Class for Master - optional */ |
| ClassConfOption<MasterCompute> MASTER_COMPUTE_CLASS = |
| ClassConfOption.create("giraph.masterComputeClass", |
| DefaultMasterCompute.class, MasterCompute.class, |
| "Class for Master - optional"); |
| /** Classes for Master Observer - optional */ |
| ClassConfOption<MasterObserver> MASTER_OBSERVER_CLASSES = |
| ClassConfOption.create("giraph.master.observers", |
| null, MasterObserver.class, "Classes for Master Observer - optional"); |
| /** Classes for Worker Observer - optional */ |
| ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES = |
| ClassConfOption.create("giraph.worker.observers", null, |
| WorkerObserver.class, "Classes for Worker Observer - optional"); |
| /** Classes for Mapper Observer - optional */ |
| ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES = |
| ClassConfOption.create("giraph.mapper.observers", null, |
| MapperObserver.class, "Classes for Mapper Observer - optional"); |
| /** Message combiner class - optional */ |
| ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS = |
| ClassConfOption.create("giraph.messageCombinerClass", null, |
| MessageCombiner.class, "Message combiner class - optional"); |
| /** Vertex resolver class - optional */ |
| ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS = |
| ClassConfOption.create("giraph.vertexResolverClass", |
| DefaultVertexResolver.class, VertexResolver.class, |
| "Vertex resolver class - optional"); |
| /** Vertex value combiner class - optional */ |
| ClassConfOption<VertexValueCombiner> VERTEX_VALUE_COMBINER_CLASS = |
| ClassConfOption.create("giraph.vertexValueCombinerClass", |
| DefaultVertexValueCombiner.class, VertexValueCombiner.class, |
| "Vertex value combiner class - optional"); |
| |
| /** Which language computation is implemented in */ |
| EnumConfOption<Language> COMPUTATION_LANGUAGE = |
| EnumConfOption.create("giraph.computation.language", |
| Language.class, Language.JAVA, |
| "Which language computation is implemented in"); |
| |
| /** |
| * Option of whether to create vertexes that were not existent before but |
| * received messages |
| */ |
| BooleanConfOption RESOLVER_CREATE_VERTEX_ON_MSGS = |
| new BooleanConfOption("giraph.vertex.resolver.create.on.msgs", true, |
| "Option of whether to create vertexes that were not existent " + |
| "before but received messages"); |
| /** Graph partitioner factory class - optional */ |
| ClassConfOption<GraphPartitionerFactory> GRAPH_PARTITIONER_FACTORY_CLASS = |
| ClassConfOption.create("giraph.graphPartitionerFactoryClass", |
| HashPartitionerFactory.class, GraphPartitionerFactory.class, |
| "Graph partitioner factory class - optional"); |
| |
| /** Observer class to watch over job status - optional */ |
| ClassConfOption<GiraphJobObserver> JOB_OBSERVER_CLASS = |
| ClassConfOption.create("giraph.jobObserverClass", |
| DefaultJobObserver.class, GiraphJobObserver.class, |
| "Observer class to watch over job status - optional"); |
| |
| /** Observer class to watch over job status - optional */ |
| ClassConfOption<GiraphJobRetryChecker> JOB_RETRY_CHECKER_CLASS = |
| ClassConfOption.create("giraph.jobRetryCheckerClass", |
| DefaultGiraphJobRetryChecker.class, GiraphJobRetryChecker.class, |
| "Class which decides whether a failed job should be retried - " + |
| "optional"); |
| |
| /** |
| * Maximum allowed time for job to run after getting all resources before it |
| * will be killed, in milliseconds (-1 if it has no limit) |
| */ |
| LongConfOption MAX_ALLOWED_JOB_TIME_MS = |
| new LongConfOption("giraph.maxAllowedJobTimeMilliseconds", -1, |
| "Maximum allowed time for job to run after getting all resources " + |
| "before it will be killed, in milliseconds " + |
| "(-1 if it has no limit)"); |
| |
| // At least one of the input format classes is required. |
| /** VertexInputFormat class */ |
| ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS = |
| ClassConfOption.create("giraph.vertexInputFormatClass", null, |
| VertexInputFormat.class, "VertexInputFormat class (at least " + |
| "one of the input format classes is required)"); |
| /** EdgeInputFormat class */ |
| ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS = |
| ClassConfOption.create("giraph.edgeInputFormatClass", null, |
| EdgeInputFormat.class, "EdgeInputFormat class"); |
| /** MappingInputFormat class */ |
| ClassConfOption<MappingInputFormat> MAPPING_INPUT_FORMAT_CLASS = |
| ClassConfOption.create("giraph.mappingInputFormatClass", null, |
| MappingInputFormat.class, "MappingInputFormat class"); |
| |
| /** EdgeInputFilter class */ |
| ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS = |
| ClassConfOption.create("giraph.edgeInputFilterClass", |
| DefaultEdgeInputFilter.class, EdgeInputFilter.class, |
| "EdgeInputFilter class"); |
| /** VertexInputFilter class */ |
| ClassConfOption<VertexInputFilter> VERTEX_INPUT_FILTER_CLASS = |
| ClassConfOption.create("giraph.vertexInputFilterClass", |
| DefaultVertexInputFilter.class, VertexInputFilter.class, |
| "VertexInputFilter class"); |
| /** Vertex class */ |
| ClassConfOption<Vertex> VERTEX_CLASS = |
| ClassConfOption.create("giraph.vertexClass", |
| DefaultVertex.class, Vertex.class, |
| "Vertex class"); |
| /** VertexOutputFormat class */ |
| ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS = |
| ClassConfOption.create("giraph.vertexOutputFormatClass", null, |
| VertexOutputFormat.class, "VertexOutputFormat class"); |
| /** EdgeOutputFormat sub-directory */ |
| StrConfOption VERTEX_OUTPUT_FORMAT_SUBDIR = |
| new StrConfOption("giraph.vertex.output.subdir", "", |
| "VertexOutputFormat sub-directory"); |
| /** EdgeOutputFormat class */ |
| ClassConfOption<EdgeOutputFormat> EDGE_OUTPUT_FORMAT_CLASS = |
| ClassConfOption.create("giraph.edgeOutputFormatClass", null, |
| EdgeOutputFormat.class, "EdgeOutputFormat class"); |
| /** EdgeOutputFormat sub-directory */ |
| StrConfOption EDGE_OUTPUT_FORMAT_SUBDIR = |
| new StrConfOption("giraph.edge.output.subdir", "", |
| "EdgeOutputFormat sub-directory"); |
| |
| /** GiraphTextOuputFormat Separator */ |
| StrConfOption GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR = |
| new StrConfOption("giraph.textoutputformat.separator", "\t", |
| "GiraphTextOuputFormat Separator"); |
| /** Reverse values in the output */ |
| BooleanConfOption GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE = |
| new BooleanConfOption("giraph.textoutputformat.reverse", false, |
| "Reverse values in the output"); |
| |
| /** |
| * If you use this option, instead of having saving vertices in the end of |
| * application, saveVertex will be called right after each vertex.compute() |
| * is called. |
| * NOTE: This feature doesn't work well with checkpointing - if you restart |
| * from a checkpoint you won't have any output from previous supersteps. |
| */ |
| BooleanConfOption DO_OUTPUT_DURING_COMPUTATION = |
| new BooleanConfOption("giraph.doOutputDuringComputation", false, |
| "If you use this option, instead of having saving vertices in the " + |
| "end of application, saveVertex will be called right after each " + |
| "vertex.compute() is called." + |
| "NOTE: This feature doesn't work well with checkpointing - if you " + |
| "restart from a checkpoint you won't have any ouptut from previous " + |
| "supresteps."); |
| /** |
| * Vertex output format thread-safe - if your VertexOutputFormat allows |
| * several vertexWriters to be created and written to in parallel, |
| * you should set this to true. |
| */ |
| BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE = |
| new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false, |
| "Vertex output format thread-safe - if your VertexOutputFormat " + |
| "allows several vertexWriters to be created and written to in " + |
| "parallel, you should set this to true."); |
| /** Number of threads for writing output in the end of the application */ |
| IntConfOption NUM_OUTPUT_THREADS = |
| new IntConfOption("giraph.numOutputThreads", 1, |
| "Number of threads for writing output in the end of the application"); |
| |
| /** conf key for comma-separated list of jars to export to YARN workers */ |
| StrConfOption GIRAPH_YARN_LIBJARS = |
| new StrConfOption("giraph.yarn.libjars", "", |
| "conf key for comma-separated list of jars to export to YARN workers"); |
| /** Name of the XML file that will export our Configuration to YARN workers */ |
| String GIRAPH_YARN_CONF_FILE = "giraph-conf.xml"; |
| /** Giraph default heap size for all tasks when running on YARN profile */ |
| int GIRAPH_YARN_TASK_HEAP_MB_DEFAULT = 1024; |
| /** Name of Giraph property for user-configurable heap memory per worker */ |
| IntConfOption GIRAPH_YARN_TASK_HEAP_MB = new IntConfOption( |
| "giraph.yarn.task.heap.mb", GIRAPH_YARN_TASK_HEAP_MB_DEFAULT, |
| "Name of Giraph property for user-configurable heap memory per worker"); |
| /** Default priority level in YARN for our task containers */ |
| int GIRAPH_YARN_PRIORITY = 10; |
| /** Is this a pure YARN job (i.e. no MapReduce layer managing Giraph tasks) */ |
| BooleanConfOption IS_PURE_YARN_JOB = |
| new BooleanConfOption("giraph.pure.yarn.job", false, |
| "Is this a pure YARN job (i.e. no MapReduce layer managing Giraph " + |
| "tasks)"); |
| |
| /** Vertex index class */ |
| ClassConfOption<WritableComparable> VERTEX_ID_CLASS = |
| ClassConfOption.create("giraph.vertexIdClass", null, |
| WritableComparable.class, "Vertex index class"); |
| /** Vertex value class */ |
| ClassConfOption<Writable> VERTEX_VALUE_CLASS = |
| ClassConfOption.create("giraph.vertexValueClass", null, Writable.class, |
| "Vertex value class"); |
| /** Edge value class */ |
| ClassConfOption<Writable> EDGE_VALUE_CLASS = |
| ClassConfOption.create("giraph.edgeValueClass", null, Writable.class, |
| "Edge value class"); |
| /** Outgoing message value class */ |
| ClassConfOption<Writable> OUTGOING_MESSAGE_VALUE_CLASS = |
| ClassConfOption.create("giraph.outgoingMessageValueClass", null, |
| Writable.class, "Outgoing message value class"); |
| /** Worker context class */ |
| ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS = |
| ClassConfOption.create("giraph.workerContextClass", |
| DefaultWorkerContext.class, WorkerContext.class, |
| "Worker contextclass"); |
| /** AggregatorWriter class - optional */ |
| ClassConfOption<AggregatorWriter> AGGREGATOR_WRITER_CLASS = |
| ClassConfOption.create("giraph.aggregatorWriterClass", |
| TextAggregatorWriter.class, AggregatorWriter.class, |
| "AggregatorWriter class - optional"); |
| |
| /** Partition class - optional */ |
| ClassConfOption<Partition> PARTITION_CLASS = |
| ClassConfOption.create("giraph.partitionClass", SimplePartition.class, |
| Partition.class, "Partition class - optional"); |
| |
| /** |
| * Minimum number of simultaneous workers before this job can run (int) |
| */ |
| String MIN_WORKERS = "giraph.minWorkers"; |
| /** |
| * Maximum number of simultaneous worker tasks started by this job (int). |
| */ |
| String MAX_WORKERS = "giraph.maxWorkers"; |
| |
| /** |
| * Separate the workers and the master tasks. This is required |
| * to support dynamic recovery. (boolean) |
| */ |
| BooleanConfOption SPLIT_MASTER_WORKER = |
| new BooleanConfOption("giraph.SplitMasterWorker", true, |
| "Separate the workers and the master tasks. This is required to " + |
| "support dynamic recovery. (boolean)"); |
| |
| /** Indicates whether this job is run in an internal unit test */ |
| BooleanConfOption LOCAL_TEST_MODE = |
| new BooleanConfOption("giraph.localTestMode", false, |
| "Indicates whether this job is run in an internal unit test"); |
| |
| /** Override the Hadoop log level and set the desired log level. */ |
| StrConfOption LOG_LEVEL = new StrConfOption("giraph.logLevel", "info", |
| "Override the Hadoop log level and set the desired log level."); |
| |
| /** Use thread level debugging? */ |
| BooleanConfOption LOG_THREAD_LAYOUT = |
| new BooleanConfOption("giraph.logThreadLayout", false, |
| "Use thread level debugging?"); |
| |
| /** Configuration key to enable jmap printing */ |
| BooleanConfOption JMAP_ENABLE = |
| new BooleanConfOption("giraph.jmap.histo.enable", false, |
| "Configuration key to enable jmap printing"); |
| |
| /** Configuration key for msec to sleep between calls */ |
| IntConfOption JMAP_SLEEP_MILLIS = |
| new IntConfOption("giraph.jmap.histo.msec", SECONDS.toMillis(30), |
| "Configuration key for msec to sleep between calls"); |
| |
| /** Configuration key for how many lines to print */ |
| IntConfOption JMAP_PRINT_LINES = |
| new IntConfOption("giraph.jmap.histo.print_lines", 30, |
| "Configuration key for how many lines to print"); |
| |
| /** |
| * Configuration key for printing live objects only |
| * This option will trigger Full GC for every jmap dump |
| * and so can significantly hinder performance. |
| */ |
| BooleanConfOption JMAP_LIVE_ONLY = |
| new BooleanConfOption("giraph.jmap.histo.live", false, |
| "Only print live objects in jmap?"); |
| |
| /** |
| * Option used by ReactiveJMapHistoDumper to check for an imminent |
| * OOM in worker or master process |
| */ |
| IntConfOption MIN_FREE_MBS_ON_HEAP = |
| new IntConfOption("giraph.heap.minFreeMb", 128, "Option used by " + |
| "worker and master observers to check for imminent OOM exception"); |
| /** |
| * Option can be used to enable reactively dumping jmap histo when |
| * OOM is imminent |
| */ |
| BooleanConfOption REACTIVE_JMAP_ENABLE = |
| new BooleanConfOption("giraph.heap.enableReactiveJmapDumping", false, |
| "Option to enable dumping jmap histogram reactively based on " + |
| "free memory on heap"); |
| |
| /** |
| * Minimum percent of the maximum number of workers that have responded |
| * in order to continue progressing. (float) |
| */ |
| FloatConfOption MIN_PERCENT_RESPONDED = |
| new FloatConfOption("giraph.minPercentResponded", 100.0f, |
| "Minimum percent of the maximum number of workers that have " + |
| "responded in order to continue progressing. (float)"); |
| |
| /** Enable the Metrics system */ |
| BooleanConfOption METRICS_ENABLE = |
| new BooleanConfOption("giraph.metrics.enable", false, |
| "Enable the Metrics system"); |
| |
| /** Directory in HDFS to write master metrics to, instead of stderr */ |
| StrConfOption METRICS_DIRECTORY = |
| new StrConfOption("giraph.metrics.directory", "", |
| "Directory in HDFS to write master metrics to, instead of stderr"); |
| |
| /** |
| * ZooKeeper comma-separated list (if not set, |
| * will start up ZooKeeper locally). Consider that after locally-starting |
| * zookeeper, this parameter will updated the configuration with the corrent |
| * configuration value. |
| */ |
| StrConfOption ZOOKEEPER_LIST = |
| new StrConfOption("giraph.zkList", "", |
| "ZooKeeper comma-separated list (if not set, will start up " + |
| "ZooKeeper locally). Consider that after locally-starting " + |
| "zookeeper, this parameter will updated the configuration with " + |
| "the corrent configuration value."); |
| |
| /** |
| * Zookeeper List will always hold a value during the computation while |
| * this option provides information regarding whether the zookeeper was |
| * internally started or externally provided. |
| */ |
| BooleanConfOption ZOOKEEPER_IS_EXTERNAL = |
| new BooleanConfOption("giraph.zkIsExternal", true, |
| "Zookeeper List will always hold a value during " + |
| "the computation while this option provides " + |
| "information regarding whether the zookeeper was " + |
| "internally started or externally provided."); |
| |
| /** ZooKeeper session millisecond timeout */ |
| IntConfOption ZOOKEEPER_SESSION_TIMEOUT = |
| new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1), |
| "ZooKeeper session millisecond timeout"); |
| |
| /** Polling interval to check for the ZooKeeper server data */ |
| IntConfOption ZOOKEEPER_SERVERLIST_POLL_MSECS = |
| new IntConfOption("giraph.zkServerlistPollMsecs", SECONDS.toMillis(3), |
| "Polling interval to check for the ZooKeeper server data"); |
| |
| /** ZooKeeper port to use */ |
| IntConfOption ZOOKEEPER_SERVER_PORT = |
| new IntConfOption("giraph.zkServerPort", 22181, "ZooKeeper port to use"); |
| |
| /** Local ZooKeeper directory to use */ |
| String ZOOKEEPER_DIR = "giraph.zkDir"; |
| |
| /** Max attempts for handling ZooKeeper connection loss */ |
| IntConfOption ZOOKEEPER_OPS_MAX_ATTEMPTS = |
| new IntConfOption("giraph.zkOpsMaxAttempts", 3, |
| "Max attempts for handling ZooKeeper connection loss"); |
| |
| /** |
| * Msecs to wait before retrying a failed ZooKeeper op due to connection loss. |
| */ |
| IntConfOption ZOOKEEPER_OPS_RETRY_WAIT_MSECS = |
| new IntConfOption("giraph.zkOpsRetryWaitMsecs", SECONDS.toMillis(5), |
| "Msecs to wait before retrying a failed ZooKeeper op due to " + |
| "connection loss."); |
| |
| /** TCP backlog (defaults to number of workers) */ |
| IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1, |
| "TCP backlog (defaults to number of workers)"); |
| |
| /** Use netty pooled memory buffer allocator */ |
| BooleanConfOption NETTY_USE_POOLED_ALLOCATOR = new BooleanConfOption( |
| "giraph.useNettyPooledAllocator", false, "Should netty use pooled " + |
| "memory allocator?"); |
| |
| /** Use direct memory buffers in netty */ |
| BooleanConfOption NETTY_USE_DIRECT_MEMORY = new BooleanConfOption( |
| "giraph.useNettyDirectMemory", false, "Should netty use direct " + |
| "memory buffers"); |
| |
| /** How big to make the encoder buffer? */ |
| IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE = |
| new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB, |
| "How big to make the encoder buffer?"); |
| |
| /** Netty client threads */ |
| IntConfOption NETTY_CLIENT_THREADS = |
| new IntConfOption("giraph.nettyClientThreads", 4, "Netty client threads"); |
| |
| /** Netty server threads */ |
| IntConfOption NETTY_SERVER_THREADS = |
| new IntConfOption("giraph.nettyServerThreads", 16, |
| "Netty server threads"); |
| |
| /** Use the execution handler in netty on the client? */ |
| BooleanConfOption NETTY_CLIENT_USE_EXECUTION_HANDLER = |
| new BooleanConfOption("giraph.nettyClientUseExecutionHandler", true, |
| "Use the execution handler in netty on the client?"); |
| |
| /** Netty client execution threads (execution handler) */ |
| IntConfOption NETTY_CLIENT_EXECUTION_THREADS = |
| new IntConfOption("giraph.nettyClientExecutionThreads", 8, |
| "Netty client execution threads (execution handler)"); |
| |
| /** Where to place the netty client execution handle? */ |
| StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER = |
| new StrConfOption("giraph.nettyClientExecutionAfterHandler", |
| "request-encoder", |
| "Where to place the netty client execution handle?"); |
| |
| /** Use the execution handler in netty on the server? */ |
| BooleanConfOption NETTY_SERVER_USE_EXECUTION_HANDLER = |
| new BooleanConfOption("giraph.nettyServerUseExecutionHandler", true, |
| "Use the execution handler in netty on the server?"); |
| |
| /** Netty server execution threads (execution handler) */ |
| IntConfOption NETTY_SERVER_EXECUTION_THREADS = |
| new IntConfOption("giraph.nettyServerExecutionThreads", 8, |
| "Netty server execution threads (execution handler)"); |
| |
| /** Where to place the netty server execution handle? */ |
| StrConfOption NETTY_SERVER_EXECUTION_AFTER_HANDLER = |
| new StrConfOption("giraph.nettyServerExecutionAfterHandler", |
| "requestFrameDecoder", |
| "Where to place the netty server execution handle?"); |
| |
| /** Netty simulate a first request closed */ |
| BooleanConfOption NETTY_SIMULATE_FIRST_REQUEST_CLOSED = |
| new BooleanConfOption("giraph.nettySimulateFirstRequestClosed", false, |
| "Netty simulate a first request closed"); |
| |
| /** Netty simulate a first response failed */ |
| BooleanConfOption NETTY_SIMULATE_FIRST_RESPONSE_FAILED = |
| new BooleanConfOption("giraph.nettySimulateFirstResponseFailed", false, |
| "Netty simulate a first response failed"); |
| |
| /** Netty - set which compression to use */ |
| StrConfOption NETTY_COMPRESSION_ALGORITHM = |
| new StrConfOption("giraph.nettyCompressionAlgorithm", "", |
| "Which compression algorithm to use in netty"); |
| |
| /** |
| * Whether netty should pro-actively read requests and feed them to its |
| * processing pipeline |
| */ |
| BooleanConfOption NETTY_AUTO_READ = |
| new BooleanConfOption("giraph.nettyAutoRead", true, |
| "Whether netty should pro-actively read requests and feed them to " + |
| "its processing pipeline"); |
| |
| /** Max resolve address attempts */ |
| IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS = |
| new IntConfOption("giraph.maxResolveAddressAttempts", 5, |
| "Max resolve address attempts"); |
| |
| /** Msecs to wait between waiting for all requests to finish */ |
| IntConfOption WAITING_REQUEST_MSECS = |
| new IntConfOption("giraph.waitingRequestMsecs", SECONDS.toMillis(15), |
| "Msecs to wait between waiting for all requests to finish"); |
| |
| /** Millseconds to wait for an event before continuing */ |
| IntConfOption EVENT_WAIT_MSECS = |
| new IntConfOption("giraph.eventWaitMsecs", SECONDS.toMillis(30), |
| "Millseconds to wait for an event before continuing"); |
| |
| /** |
| * Maximum milliseconds to wait before giving up trying to get the minimum |
| * number of workers before a superstep (int). |
| */ |
| IntConfOption MAX_MASTER_SUPERSTEP_WAIT_MSECS = |
| new IntConfOption("giraph.maxMasterSuperstepWaitMsecs", |
| MINUTES.toMillis(10), |
| "Maximum milliseconds to wait before giving up trying to get the " + |
| "minimum number of workers before a superstep (int)."); |
| |
| /** Milliseconds for a request to complete (or else resend) */ |
| IntConfOption MAX_REQUEST_MILLISECONDS = |
| new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10), |
| "Milliseconds for a request to complete (or else resend)"); |
| |
| /** Netty max connection failures */ |
| IntConfOption NETTY_MAX_CONNECTION_FAILURES = |
| new IntConfOption("giraph.nettyMaxConnectionFailures", 1000, |
| "Netty max connection failures"); |
| |
| /** How long to wait before trying to reconnect failed connections */ |
| IntConfOption WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS = |
| new IntConfOption("giraph.waitTimeBetweenConnectionRetriesMs", 500, |
| ""); |
| |
| /** Initial port to start using for the IPC communication */ |
| IntConfOption IPC_INITIAL_PORT = |
| new IntConfOption("giraph.ipcInitialPort", 30000, |
| "Initial port to start using for the IPC communication"); |
| |
| /** Maximum bind attempts for different IPC ports */ |
| IntConfOption MAX_IPC_PORT_BIND_ATTEMPTS = |
| new IntConfOption("giraph.maxIpcPortBindAttempts", 20, |
| "Maximum bind attempts for different IPC ports"); |
| /** |
| * Fail first IPC port binding attempt, simulate binding failure |
| * on real grid testing |
| */ |
| BooleanConfOption FAIL_FIRST_IPC_PORT_BIND_ATTEMPT = |
| new BooleanConfOption("giraph.failFirstIpcPortBindAttempt", false, |
| "Fail first IPC port binding attempt, simulate binding failure " + |
| "on real grid testing"); |
| |
| /** Client send buffer size */ |
| IntConfOption CLIENT_SEND_BUFFER_SIZE = |
| new IntConfOption("giraph.clientSendBufferSize", 512 * ONE_KB, |
| "Client send buffer size"); |
| |
| /** Client receive buffer size */ |
| IntConfOption CLIENT_RECEIVE_BUFFER_SIZE = |
| new IntConfOption("giraph.clientReceiveBufferSize", 32 * ONE_KB, |
| "Client receive buffer size"); |
| |
| /** Server send buffer size */ |
| IntConfOption SERVER_SEND_BUFFER_SIZE = |
| new IntConfOption("giraph.serverSendBufferSize", 32 * ONE_KB, |
| "Server send buffer size"); |
| |
| /** Server receive buffer size */ |
| IntConfOption SERVER_RECEIVE_BUFFER_SIZE = |
| new IntConfOption("giraph.serverReceiveBufferSize", 512 * ONE_KB, |
| "Server receive buffer size"); |
| |
| /** Maximum size of messages (in bytes) per peer before flush */ |
| IntConfOption MAX_MSG_REQUEST_SIZE = |
| new IntConfOption("giraph.msgRequestSize", 512 * ONE_KB, |
| "Maximum size of messages (in bytes) per peer before flush"); |
| |
| /** |
| * How much bigger than the average per partition size to make initial per |
| * partition buffers. |
| * If this value is A, message request size is M, |
| * and a worker has P partitions, than its initial partition buffer size |
| * will be (M / P) * (1 + A). |
| */ |
| FloatConfOption ADDITIONAL_MSG_REQUEST_SIZE = |
| new FloatConfOption("giraph.additionalMsgRequestSize", 0.2f, |
| "How much bigger than the average per partition size to make " + |
| "initial per partition buffers. If this value is A, message " + |
| "request size is M, and a worker has P partitions, than its " + |
| "initial partition buffer size will be (M / P) * (1 + A)."); |
| |
| |
| /** Warn if msg request size exceeds default size by this factor */ |
| FloatConfOption REQUEST_SIZE_WARNING_THRESHOLD = new FloatConfOption( |
| "giraph.msgRequestWarningThreshold", 2.0f, |
| "If request sizes are bigger than the buffer size by this factor " + |
| "warnings are printed to the log and to the command line"); |
| |
| /** Maximum size of vertices (in bytes) per peer before flush */ |
| IntConfOption MAX_VERTEX_REQUEST_SIZE = |
| new IntConfOption("giraph.vertexRequestSize", 512 * ONE_KB, |
| "Maximum size of vertices (in bytes) per peer before flush"); |
| |
| /** |
| * Additional size (expressed as a ratio) of each per-partition buffer on |
| * top of the average size for vertices. |
| */ |
| FloatConfOption ADDITIONAL_VERTEX_REQUEST_SIZE = |
| new FloatConfOption("giraph.additionalVertexRequestSize", 0.2f, |
| "Additional size (expressed as a ratio) of each per-partition " + |
| "buffer on top of the average size."); |
| |
| /** Maximum size of edges (in bytes) per peer before flush */ |
| IntConfOption MAX_EDGE_REQUEST_SIZE = |
| new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB, |
| "Maximum size of edges (in bytes) per peer before flush"); |
| |
| /** |
| * Additional size (expressed as a ratio) of each per-partition buffer on |
| * top of the average size for edges. |
| */ |
| FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE = |
| new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f, |
| "Additional size (expressed as a ratio) of each per-partition " + |
| "buffer on top of the average size."); |
| |
| /** Maximum number of mutations per partition before flush */ |
| IntConfOption MAX_MUTATIONS_PER_REQUEST = |
| new IntConfOption("giraph.maxMutationsPerRequest", 100, |
| "Maximum number of mutations per partition before flush"); |
| |
| /** |
| * Use message size encoding (typically better for complex objects, |
| * not meant for primitive wrapped messages) |
| */ |
| BooleanConfOption USE_MESSAGE_SIZE_ENCODING = |
| new BooleanConfOption("giraph.useMessageSizeEncoding", false, |
| "Use message size encoding (typically better for complex objects, " + |
| "not meant for primitive wrapped messages)"); |
| |
| /** Number of channels used per server */ |
| IntConfOption CHANNELS_PER_SERVER = |
| new IntConfOption("giraph.channelsPerServer", 1, |
| "Number of channels used per server"); |
| |
| /** Number of flush threads per peer */ |
| String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads"; |
| |
| /** Number of threads for vertex computation */ |
| IntConfOption NUM_COMPUTE_THREADS = |
| new IntConfOption("giraph.numComputeThreads", 1, |
| "Number of threads for vertex computation"); |
| |
| /** Number of threads for input split loading */ |
| IntConfOption NUM_INPUT_THREADS = |
| new IntConfOption("giraph.numInputThreads", 1, |
| "Number of threads for input split loading"); |
| |
| /** Minimum stragglers of the superstep before printing them out */ |
| IntConfOption PARTITION_LONG_TAIL_MIN_PRINT = |
| new IntConfOption("giraph.partitionLongTailMinPrint", 1, |
| "Minimum stragglers of the superstep before printing them out"); |
| |
| /** Use superstep counters? (boolean) */ |
| BooleanConfOption USE_SUPERSTEP_COUNTERS = |
| new BooleanConfOption("giraph.useSuperstepCounters", true, |
| "Use superstep counters? (boolean)"); |
| |
| /** |
| * Input split sample percent - Used only for sampling and testing, rather |
| * than an actual job. The idea is that to test, you might only want a |
| * fraction of the actual input splits from your VertexInputFormat to |
| * load (values should be [0, 100]). |
| */ |
| FloatConfOption INPUT_SPLIT_SAMPLE_PERCENT = |
| new FloatConfOption("giraph.inputSplitSamplePercent", 100f, |
| "Input split sample percent - Used only for sampling and testing, " + |
| "rather than an actual job. The idea is that to test, you might " + |
| "only want a fraction of the actual input splits from your " + |
| "VertexInputFormat to load (values should be [0, 100])."); |
| |
| /** |
| * To limit outlier vertex input splits from producing too many vertices or |
| * to help with testing, the number of vertices loaded from an input split |
| * can be limited. By default, everything is loaded. |
| */ |
| LongConfOption INPUT_SPLIT_MAX_VERTICES = |
| new LongConfOption("giraph.InputSplitMaxVertices", -1, |
| "To limit outlier vertex input splits from producing too many " + |
| "vertices or to help with testing, the number of vertices " + |
| "loaded from an input split can be limited. By default, " + |
| "everything is loaded."); |
| |
| /** |
| * To limit outlier vertex input splits from producing too many vertices or |
| * to help with testing, the number of edges loaded from an input split |
| * can be limited. By default, everything is loaded. |
| */ |
| LongConfOption INPUT_SPLIT_MAX_EDGES = |
| new LongConfOption("giraph.InputSplitMaxEdges", -1, |
| "To limit outlier vertex input splits from producing too many " + |
| "vertices or to help with testing, the number of edges loaded " + |
| "from an input split can be limited. By default, everything is " + |
| "loaded."); |
| |
| /** |
| * To minimize network usage when reading input splits, |
| * each worker can prioritize splits that reside on its host. |
| * This, however, comes at the cost of increased load on ZooKeeper. |
| * Hence, users with a lot of splits and input threads (or with |
| * configurations that can't exploit locality) may want to disable it. |
| */ |
| BooleanConfOption USE_INPUT_SPLIT_LOCALITY = |
| new BooleanConfOption("giraph.useInputSplitLocality", true, |
| "To minimize network usage when reading input splits, each worker " + |
| "can prioritize splits that reside on its host. " + |
| "This, however, comes at the cost of increased load on ZooKeeper. " + |
| "Hence, users with a lot of splits and input threads (or with " + |
| "configurations that can't exploit locality) may want to disable " + |
| "it."); |
| |
| /** Multiplier for the current workers squared */ |
| FloatConfOption PARTITION_COUNT_MULTIPLIER = |
| new FloatConfOption("giraph.masterPartitionCountMultiplier", 1.0f, |
| "Multiplier for the current workers squared"); |
| |
| /** Minimum number of partitions to have per compute thread */ |
| IntConfOption MIN_PARTITIONS_PER_COMPUTE_THREAD = |
| new IntConfOption("giraph.minPartitionsPerComputeThread", 1, |
| "Minimum number of partitions to have per compute thread"); |
| |
| /** Overrides default partition count calculation if not -1 */ |
| IntConfOption USER_PARTITION_COUNT = |
| new IntConfOption("giraph.userPartitionCount", -1, |
| "Overrides default partition count calculation if not -1"); |
| |
| /** Vertex key space size for |
| * {@link org.apache.giraph.partition.WorkerGraphPartitionerImpl} |
| */ |
| String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize"; |
| |
| /** |
| * How often to checkpoint (i.e. 0, means no checkpoint, |
| * 1 means every superstep, 2 is every two supersteps, etc.). |
| */ |
| IntConfOption CHECKPOINT_FREQUENCY = |
| new IntConfOption("giraph.checkpointFrequency", 0, |
| "How often to checkpoint (i.e. 0, means no checkpoint, 1 means " + |
| "every superstep, 2 is every two supersteps, etc.)."); |
| |
| /** |
| * Delete checkpoints after a successful job run? |
| */ |
| BooleanConfOption CLEANUP_CHECKPOINTS_AFTER_SUCCESS = |
| new BooleanConfOption("giraph.cleanupCheckpointsAfterSuccess", true, |
| "Delete checkpoints after a successful job run?"); |
| |
| /** |
| * An application can be restarted manually by selecting a superstep. The |
| * corresponding checkpoint must exist for this to work. The user should |
| * set a long value. Default is start from scratch. |
| */ |
| String RESTART_SUPERSTEP = "giraph.restartSuperstep"; |
| |
| /** |
| * If application is restarted manually we need to specify job ID |
| * to restart from. |
| */ |
| StrConfOption RESTART_JOB_ID = new StrConfOption("giraph.restart.jobId", |
| null, "Which job ID should I try to restart?"); |
| |
| /** |
| * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root |
| * znode on the cluster beginning with "/" |
| */ |
| String BASE_ZNODE_KEY = "giraph.zkBaseZNode"; |
| |
| /** |
| * If ZOOKEEPER_LIST is not set, then use this directory to manage |
| * ZooKeeper |
| */ |
| StrConfOption ZOOKEEPER_MANAGER_DIRECTORY = |
| new StrConfOption("giraph.zkManagerDirectory", |
| "_bsp/_defaultZkManagerDir", |
| "If ZOOKEEPER_LIST is not set, then use this directory to manage " + |
| "ZooKeeper"); |
| |
| /** Number of ZooKeeper client connection attempts before giving up. */ |
| IntConfOption ZOOKEEPER_CONNECTION_ATTEMPTS = |
| new IntConfOption("giraph.zkConnectionAttempts", 10, |
| "Number of ZooKeeper client connection attempts before giving up."); |
| |
| /** This directory has/stores the available checkpoint files in HDFS. */ |
| StrConfOption CHECKPOINT_DIRECTORY = |
| new StrConfOption("giraph.checkpointDirectory", "_bsp/_checkpoints/", |
| "This directory has/stores the available checkpoint files in HDFS."); |
| |
| /** |
| * Comma-separated list of directories in the local filesystem for |
| * out-of-core partitions. |
| */ |
| StrConfOption PARTITIONS_DIRECTORY = |
| new StrConfOption("giraph.partitionsDirectory", "_bsp/_partitions", |
| "Comma-separated list of directories in the local filesystem for " + |
| "out-of-core partitions."); |
| |
| /** |
| * Number of IO threads used in out-of-core mechanism. If local disk is used |
| * for spilling data to and reading data from, this number should be equal to |
| * the number of available disks on each machine. In such case, one should |
| * use giraph.partitionsDirectory to specify directories mounted on different |
| * disks. |
| */ |
| IntConfOption NUM_OUT_OF_CORE_THREADS = |
| new IntConfOption("giraph.numOutOfCoreThreads", 1, "Number of IO " + |
| "threads used in out-of-core mechanism. If using local disk to " + |
| "spill data, this should be equal to the number of available " + |
| "disks. In such case, use giraph.partitionsDirectory to specify " + |
| "mount points on different disks."); |
| |
| /** Enable out-of-core graph. */ |
| BooleanConfOption USE_OUT_OF_CORE_GRAPH = |
| new BooleanConfOption("giraph.useOutOfCoreGraph", false, |
| "Enable out-of-core graph."); |
| |
| /** Data accessor resource/object */ |
| ClassConfOption<OutOfCoreDataAccessor> OUT_OF_CORE_DATA_ACCESSOR = |
| ClassConfOption.create("giraph.outOfCoreDataAccessor", |
| LocalDiskDataAccessor.class, OutOfCoreDataAccessor.class, |
| "Data accessor used in out-of-core computation (local-disk, " + |
| "in-memory, HDFS, etc.)"); |
| |
| /** |
| * Out-of-core oracle that is to be used for adaptive out-of-core engine. If |
| * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written |
| * to be `FixedPartitionsOracle`. |
| */ |
| ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE = |
| ClassConfOption.create("giraph.outOfCoreOracle", |
| ThresholdBasedOracle.class, OutOfCoreOracle.class, |
| "Out-of-core oracle that is to be used for adaptive out-of-core " + |
| "engine"); |
| |
| /** Maximum number of partitions to hold in memory for each worker. */ |
| IntConfOption MAX_PARTITIONS_IN_MEMORY = |
| new IntConfOption("giraph.maxPartitionsInMemory", 0, |
| "Maximum number of partitions to hold in memory for each worker. By" + |
| " default it is set to 0 (for adaptive out-of-core mechanism"); |
| |
| /** Directory to write YourKit snapshots to */ |
| String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir"; |
| /** Default directory to write YourKit snapshots to */ |
| String YOURKIT_OUTPUT_DIR_DEFAULT = "/tmp/giraph/%JOB_ID%/%TASK_ID%"; |
| |
| /** Keep the zookeeper output for debugging? Default is to remove it. */ |
| BooleanConfOption KEEP_ZOOKEEPER_DATA = |
| new BooleanConfOption("giraph.keepZooKeeperData", false, |
| "Keep the zookeeper output for debugging? Default is to remove it."); |
| |
| /** Default ZooKeeper tick time. */ |
| int DEFAULT_ZOOKEEPER_TICK_TIME = 6000; |
| /** Default ZooKeeper maximum client connections. */ |
| int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000; |
| /** Number of snapshots to be retained after purge */ |
| int ZOOKEEPER_SNAP_RETAIN_COUNT = 3; |
| /** Zookeeper purge interval in hours */ |
| int ZOOKEEPER_PURGE_INTERVAL = 1; |
| /** ZooKeeper minimum session timeout */ |
| IntConfOption ZOOKEEPER_MIN_SESSION_TIMEOUT = |
| new IntConfOption("giraph.zKMinSessionTimeout", MINUTES.toMillis(10), |
| "ZooKeeper minimum session timeout"); |
| /** ZooKeeper maximum session timeout */ |
| IntConfOption ZOOKEEPER_MAX_SESSION_TIMEOUT = |
| new IntConfOption("giraph.zkMaxSessionTimeout", MINUTES.toMillis(15), |
| "ZooKeeper maximum session timeout"); |
| |
| /** |
| * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate |
| * and authorize Netty BSP Clients to Servers. |
| */ |
| BooleanConfOption AUTHENTICATE = |
| new BooleanConfOption("giraph.authenticate", false, |
| "Whether to use SASL with DIGEST and Hadoop Job Tokens to " + |
| "authenticate and authorize Netty BSP Clients to Servers."); |
| |
| /** Use unsafe serialization? */ |
| BooleanConfOption USE_UNSAFE_SERIALIZATION = |
| new BooleanConfOption("giraph.useUnsafeSerialization", true, |
| "Use unsafe serialization?"); |
| |
| /** |
| * Use BigDataIO for messages? If there are super-vertices in the |
| * graph which receive a lot of messages (total serialized size of messages |
| * goes beyond the maximum size of a byte array), setting this option to true |
| * will remove that limit. The maximum memory available for a single vertex |
| * will be limited to the maximum heap size available. |
| */ |
| BooleanConfOption USE_BIG_DATA_IO_FOR_MESSAGES = |
| new BooleanConfOption("giraph.useBigDataIOForMessages", false, |
| "Use BigDataIO for messages?"); |
| |
| /** |
| * Maximum number of attempts a master/worker will retry before killing |
| * the job. This directly maps to the number of map task attempts in |
| * Hadoop. |
| */ |
| IntConfOption MAX_TASK_ATTEMPTS = |
| new IntConfOption("mapred.map.max.attempts", -1, |
| "Maximum number of attempts a master/worker will retry before " + |
| "killing the job. This directly maps to the number of map task " + |
| "attempts in Hadoop."); |
| |
| /** Interface to use for hostname resolution */ |
| StrConfOption DNS_INTERFACE = |
| new StrConfOption("giraph.dns.interface", "default", |
| "Interface to use for hostname resolution"); |
| /** Server for hostname resolution */ |
| StrConfOption DNS_NAMESERVER = |
| new StrConfOption("giraph.dns.nameserver", "default", |
| "Server for hostname resolution"); |
| |
| /** |
| * The application will halt after this many supersteps is completed. For |
| * instance, if it is set to 3, the application will run at most 0, 1, |
| * and 2 supersteps and then go into the shutdown superstep. |
| */ |
| IntConfOption MAX_NUMBER_OF_SUPERSTEPS = |
| new IntConfOption("giraph.maxNumberOfSupersteps", 1, |
| "The application will halt after this many supersteps is " + |
| "completed. For instance, if it is set to 3, the application will " + |
| "run at most 0, 1, and 2 supersteps and then go into the shutdown " + |
| "superstep."); |
| |
| /** |
| * The application will not mutate the graph topology (the edges). It is used |
| * to optimise out-of-core graph, by not writing back edges every time. |
| */ |
| BooleanConfOption STATIC_GRAPH = |
| new BooleanConfOption("giraph.isStaticGraph", false, |
| "The application will not mutate the graph topology (the edges). " + |
| "It is used to optimise out-of-core graph, by not writing back " + |
| "edges every time."); |
| |
| /** |
| * This option will tell which message encode & store enum to use when |
| * combining is not enabled |
| */ |
| EnumConfOption<MessageEncodeAndStoreType> MESSAGE_ENCODE_AND_STORE_TYPE = |
| EnumConfOption.create("giraph.messageEncodeAndStoreType", |
| MessageEncodeAndStoreType.class, |
| MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION, |
| "Select the message_encode_and_store_type to use"); |
| |
| /** |
| * This option can be used to specify if a source vertex present in edge |
| * input but not in vertex input can be created |
| */ |
| BooleanConfOption CREATE_EDGE_SOURCE_VERTICES = |
| new BooleanConfOption("giraph.createEdgeSourceVertices", true, |
| "Create a source vertex if present in edge input but not " + |
| "necessarily in vertex input"); |
| |
| /** |
| * This counter group will contain one counter whose name is the ZooKeeper |
| * server:port which this job is using |
| */ |
| String ZOOKEEPER_SERVER_PORT_COUNTER_GROUP = "Zookeeper server:port"; |
| |
| /** |
| * This counter group will contain one counter whose name is the ZooKeeper |
| * node path which should be created to trigger computation halt |
| */ |
| String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node"; |
| |
| /** |
| * This counter group will contain one counter whose name is the ZooKeeper |
| * node path which contains all data about this job |
| */ |
| String ZOOKEEPER_BASE_PATH_COUNTER_GROUP = "Zookeeper base path"; |
| |
| /** |
| * Which class to use to write instructions on how to halt the application |
| */ |
| ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter> |
| HALT_INSTRUCTIONS_WRITER_CLASS = ClassConfOption.create( |
| "giraph.haltInstructionsWriter", |
| HaltApplicationUtils.DefaultHaltInstructionsWriter.class, |
| HaltApplicationUtils.HaltInstructionsWriter.class, |
| "Class used to write instructions on how to halt the application"); |
| |
| /** |
| * Maximum timeout (in milliseconds) for waiting for all tasks |
| * to complete after the job is done. Defaults to 15 minutes. |
| */ |
| IntConfOption WAIT_TASK_DONE_TIMEOUT_MS = |
| new IntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15), |
| "Maximum timeout (in ms) for waiting for all all tasks to " + |
| "complete"); |
| |
| /** Whether to track job progress on client or not */ |
| BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT = |
| new BooleanConfOption("giraph.trackJobProgressOnClient", false, |
| "Whether to track job progress on client or not"); |
| |
| /** Class to use to track job progress on client */ |
| ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS = |
| ClassConfOption.create("giraph.jobProgressTrackerClass", |
| DefaultJobProgressTrackerService.class, |
| JobProgressTrackerService.class, |
| "Class to use to track job progress on client"); |
| |
| /** Number of retries for creating the HDFS files */ |
| IntConfOption HDFS_FILE_CREATION_RETRIES = |
| new IntConfOption("giraph.hdfs.file.creation.retries", 10, |
| "Retries to create an HDFS file before failing"); |
| |
| /** Number of milliseconds to wait before retrying HDFS file creation */ |
| IntConfOption HDFS_FILE_CREATION_RETRY_WAIT_MS = |
| new IntConfOption("giraph.hdfs.file.creation.retry.wait.ms", 30_000, |
| "Milliseconds to wait prior to retrying creation of an HDFS file"); |
| |
| /** Number of threads for writing and reading checkpoints */ |
| IntConfOption NUM_CHECKPOINT_IO_THREADS = |
| new IntConfOption("giraph.checkpoint.io.threads", 8, |
| "Number of threads for writing and reading checkpoints"); |
| |
| /** |
| * Compression algorithm to be used for checkpointing. |
| * Defined by extension for hadoop compatibility reasons. |
| */ |
| StrConfOption CHECKPOINT_COMPRESSION_CODEC = |
| new StrConfOption("giraph.checkpoint.compression.codec", |
| ".deflate", |
| "Defines compression algorithm we will be using for " + |
| "storing checkpoint. Available options include but " + |
| "not restricted to: .deflate, .gz, .bz2, .lzo"); |
| |
| /** |
| * Defines if and when checkpointing is supported by this job. |
| * By default checkpointing is always supported unless output during the |
| * computation is enabled. |
| */ |
| ClassConfOption<CheckpointSupportedChecker> CHECKPOINT_SUPPORTED_CHECKER = |
| ClassConfOption.create("giraph.checkpoint.supported.checker", |
| DefaultCheckpointSupportedChecker.class, |
| CheckpointSupportedChecker.class, |
| "This is the way to specify if checkpointing is " + |
| "supported by the job"); |
| |
| |
| /** Number of threads to use in async message store, 0 means |
| * we should not use async message processing */ |
| IntConfOption ASYNC_MESSAGE_STORE_THREADS_COUNT = |
| new IntConfOption("giraph.async.message.store.threads", 0, |
| "Number of threads to be used in async message store."); |
| |
| /** Output format class for hadoop to use (for committing) */ |
| ClassConfOption<OutputFormat> HADOOP_OUTPUT_FORMAT_CLASS = |
| ClassConfOption.create("giraph.hadoopOutputFormatClass", |
| BspOutputFormat.class, OutputFormat.class, |
| "Output format class for hadoop to use (for committing)"); |
| |
| /** |
| * For worker to worker communication we can use IPs or host names, by |
| * default prefer IPs. |
| */ |
| BooleanConfOption PREFER_IP_ADDRESSES = |
| new BooleanConfOption("giraph.preferIP", false, |
| "Prefer IP addresses instead of host names"); |
| } |
| // CHECKSTYLE: resume InterfaceIsTypeCheck |