blob: 9c2f7c3dcf1f210d8479f7996a5da22b3a85b2c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.api;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.annotation.ConfigurationClass;
import org.apache.tez.common.annotation.ConfigurationProperty;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
/**
* Meant for user configurable job properties.
* <p/>
* Note for developers: Whenever a new key is added to this file, it must also be added to the set of
* known tezRuntimeKeys.
* @see <a href="../../../../../../configs/TezRuntimeConfiguration.html">Detailed Configuration Information</a>
* @see <a href="../../../../../configs/tez-runtime-default-template.xml">XML-based Config Template</a>
*/
// TODO EVENTUALLY A description for each property.
@Public
@Evolving
@ConfigurationClass(templateFileName = "tez-runtime-default-template.xml")
public class TezRuntimeConfiguration {
private static final String TEZ_RUNTIME_PREFIX = "tez.runtime.";
private static final Set<String> tezRuntimeKeys = new HashSet<String>();
private static Set<String> umnodifiableTezRuntimeKeySet;
private static final Set<String> otherKeys = new HashSet<String>();
private static Set<String> unmodifiableOtherKeySet;
private static Configuration defaultConf = new Configuration(false);
private static final Map<String, String> tezRuntimeConfMap = new HashMap<String, String>();
private static final Map<String, String> otherConfMap = new HashMap<String, String>();
/**
* Prefixes from Hadoop configuration which are allowed.
*/
private static final List<String> allowedPrefixes = new ArrayList<String>();
private static List<String> unmodifiableAllowedPrefixes;
static {
Configuration.addDeprecation("tez.runtime.sort.threads",
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
}
/**
* Configuration key to enable/disable IFile readahead.
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_IFILE_READAHEAD = TEZ_RUNTIME_PREFIX +
"ifile.readahead";
public static final boolean TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT = true;
/**
* Configuration key to set the IFile readahead length in bytes.
*/
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_IFILE_READAHEAD_BYTES = TEZ_RUNTIME_PREFIX +
"ifile.readahead.bytes";
public static final int TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT =
4 * 1024 * 1024;
public static final int TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT = -1;
/**
* This is copy of io.file.buffer.size from Hadoop, which is used in several places such
* as compression codecs, buffer sizes in IFile, while fetching etc.
* Variable exists so that it can be referenced, instead of using the string name directly.
*/
public static final String TEZ_RUNTIME_IO_FILE_BUFFER_SIZE = "io.file.buffer.size";
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_IO_SORT_FACTOR = TEZ_RUNTIME_PREFIX +
"io.sort.factor";
public static final int TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT = 100;
@ConfigurationProperty(type = "float")
public static final String TEZ_RUNTIME_SORT_SPILL_PERCENT = TEZ_RUNTIME_PREFIX +
"sort.spill.percent";
public static final float TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT = 0.8f;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_IO_SORT_MB = TEZ_RUNTIME_PREFIX + "io.sort.mb";
public static final int TEZ_RUNTIME_IO_SORT_MB_DEFAULT = 100;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES = TEZ_RUNTIME_PREFIX +
"index.cache.memory.limit.bytes";
public static final int TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT =
1024 * 1024;
// TODO Use the default value
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_COMBINE_MIN_SPILLS = TEZ_RUNTIME_PREFIX +
"combine.min.spills";
public static final int TEZ_RUNTIME_COMBINE_MIN_SPILLS_DEFAULT = 3;
/**
* Tries to allocate @link{#TEZ_RUNTIME_IO_SORT_MB} in chunks specified in
* this parameter.
*/
@ConfigurationProperty(type = "integer")
public static final String
TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB = TEZ_RUNTIME_PREFIX +
"pipelined.sorter.min-block.size.in.mb";
public static final int
TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT = 2000;
/**
* Setting this to true would enable sorter
* to auto-allocate memory on need basis in progressive fashion.
*
* Setting to false would allocate all available memory during
* initialization of sorter. In such cases,@link{#TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB}
* would be honored and memory specified in @link{#TEZ_RUNTIME_IO_SORT_MB}
* would be initialized upfront.
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY = TEZ_RUNTIME_PREFIX +
"pipelined.sorter.lazy-allocate.memory";
public static final boolean
TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT = false;
/**
* String value.
* Which sorter implementation to use.
* Valid values:
* - LEGACY
* - PIPELINED ( default )
* {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl}
*/
@ConfigurationProperty
public static final String TEZ_RUNTIME_SORTER_CLASS = TEZ_RUNTIME_PREFIX +
"sorter.class";
public static final String TEZ_RUNTIME_SORTER_CLASS_DEFAULT = SorterImpl.PIPELINED.name();
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS = TEZ_RUNTIME_PREFIX +
"pipelined.sorter.sort.threads";
public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2;
/**
* Integer value. Percentage of buffer to be filled before we spill to disk. Default value is 0,
* which will spill for every buffer.
*/
@ConfigurationProperty(type="int")
public static final String TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT =
TEZ_RUNTIME_PREFIX + "unordered-partitioned-kvwriter.buffer-merge-percent";
public static final int TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT_DEFAULT =
0;
/**
* Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496
* This can be enabled/disabled at vertex level.
* {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats}
* defines the list of values that can be specified.
* TODO TEZ-3303 Given ShuffleVertexManager doesn't consume precise stats
* yet. So do not set the value to "precise" yet when ShuffleVertexManager is used.
*/
@ConfigurationProperty
public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS =
TEZ_RUNTIME_PREFIX + "report.partition.stats";
public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT =
ReportPartitionStats.MEMORY_OPTIMIZED.getType();
/**
* Size of the buffer to use if not writing directly to disk.
*/
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB = TEZ_RUNTIME_PREFIX +
"unordered.output.buffer.size-mb";
public static final int TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT = 100;
/**
* Maximum size for individual buffers used in the UnsortedPartitionedOutput.
* This is only meant to be used by unit tests for now.
*/
@Private
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES =
TEZ_RUNTIME_PREFIX +
"unordered.output.max-per-buffer.size-bytes";
/**
* Specifies a partitioner class, which is used in Tez Runtime components
* like OnFileSortedOutput
*/
@ConfigurationProperty
public static final String TEZ_RUNTIME_PARTITIONER_CLASS =
TEZ_RUNTIME_PREFIX + "partitioner.class";
/**
* Specifies a combiner class (primarily for Shuffle)
*/
@ConfigurationProperty
public static final String TEZ_RUNTIME_COMBINER_CLASS = TEZ_RUNTIME_PREFIX + "combiner.class";
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES = TEZ_RUNTIME_PREFIX +
"shuffle.parallel.copies";
public static final int TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT = 20;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = TEZ_RUNTIME_PREFIX +
"shuffle.fetch.failures.limit";
public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5;
/**
* Specifies in milliseconds the maximum delay a penalized host can have before being retried,
* defaults to 10 minutes.
*/
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS = TEZ_RUNTIME_PREFIX +
"shuffle.host.penalty.time.limit";
public static final int TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT = 600000;
@Private
@Unstable
@ConfigurationProperty(type = "integer")
/**
* Expert setting made available only for debugging. Do not change it. Sets
* the number of retries before giving up on downloading from source
* attempt by consumer. Code internally handles the threshold if set to -1.
*/
public static final String
TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT =
TEZ_RUNTIME_PREFIX + "shuffle.src-attempt.abort.limit";
public static final int
TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT = -1;
@Private
@Unstable
@ConfigurationProperty(type = "float")
/**
* Expert setting made available only for debugging. Do not change it. Setting
* to determine if failures happened across a percentage of nodes. This
* helps in determining if the consumer has to be restarted on continuous
* failures. Setting it to lower value can make consumer restarts more
* aggressive on failures.
*/
public static final String
TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION =
TEZ_RUNTIME_PREFIX + "shuffle.acceptable.host-fetch.failure.fraction";
public static final float
TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT = 0.2f;
@Private
@Unstable
@ConfigurationProperty(type = "integer")
/**
* Expert setting made available only for debugging. Do not change it. Setting
* to determine if the consumer has to be restarted on continuous
* failures across nodes. Used along with {@link
* TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION}.
*/
public static final String
TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST =
TEZ_RUNTIME_PREFIX + "shuffle.min.failures.per.host";
public static final int TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT = 4;
@Private
@Unstable
@ConfigurationProperty(type = "float")
/**
* Expert setting made available only for debugging. Do not change it.
* Maximum percentage of time (compared to overall progress), the fetcher is
* allowed before concluding that it is stalled.
*/
public static final String TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION =
TEZ_RUNTIME_PREFIX + "shuffle.max.stall.time.fraction";
public static final float
TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT = 0.5f;
@Private
@Unstable
@ConfigurationProperty(type = "float")
/**
* Expert setting made available only for debugging. Do not change it.
* Fraction to determine whether the shuffle has progressed enough or not
* If it has not progressed enough, it could be qualified for the consumer.
*/
public static final String
TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION =
TEZ_RUNTIME_PREFIX + "shuffle.min.required.progress.fraction";
public static final float
TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT = 0.5f;
@Private
@Unstable
@ConfigurationProperty(type = "float")
/**
* Expert setting made available only for debugging. Do not change it.
* Provides threshold for determining whether fetching has to be marked
* unhealthy based on the ratio of (failures/(failures+completed))
*/
public static final String
TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION =
TEZ_RUNTIME_PREFIX + "shuffle.max.allowed.failed.fetch.fraction";
public static final float
TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT = 0.5f;
@Private
@Unstable
@ConfigurationProperty(type = "boolean")
/**
* Expert setting made available only for debugging. Do not change it.
* Provides threshold for determining whether fetching has to be marked
* unhealthy based on the ratio of (failures/(failures+completed))
*/
public static final String
TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION =
TEZ_RUNTIME_PREFIX + "shuffle.failed.check.since-last.completion";
public static final boolean
TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT = true;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE =
TEZ_RUNTIME_PREFIX +
"shuffle.fetch.max.task.output.at.once";
public final static int TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT
= 20;
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR = TEZ_RUNTIME_PREFIX +
"shuffle.notify.readerror";
public static final boolean TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT = true;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT = TEZ_RUNTIME_PREFIX +
"shuffle.connect.timeout";
public static final int TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT =
3 * 60 * 1000;
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED = TEZ_RUNTIME_PREFIX +
"shuffle.keep-alive.enabled";
public static final boolean TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT = false;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS = TEZ_RUNTIME_PREFIX +
"shuffle.keep-alive.max.connections";
public static final int TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT = 20;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT =
TEZ_RUNTIME_PREFIX + "shuffle.read.timeout";
public final static int TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT =
3 * 60 * 1000;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE =
TEZ_RUNTIME_PREFIX + "shuffle.buffersize";
public final static int TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT =
8 * 1024;
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP = TEZ_RUNTIME_PREFIX +
"shuffle.use.async.http";
public static final boolean TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP_DEFAULT = false;
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_ENABLE_SSL = TEZ_RUNTIME_PREFIX +
"shuffle.ssl.enable";
public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT = false;
/**
* Controls verification of data checksums when fetching data directly to
* disk. Enabling verification allows the fetcher to detect corrupted data
* and report the failure against the upstream task before the data reaches
* the Processor and causes the fetching task to fail.
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM =
TEZ_RUNTIME_PREFIX + "shuffle.fetch.verify-disk-checksum";
public static final boolean TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT = true;
@ConfigurationProperty(type = "float")
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX +
"shuffle.fetch.buffer.percent";
public static final float TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT =
0.90f;
@ConfigurationProperty(type = "float")
public static final String TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT = TEZ_RUNTIME_PREFIX +
"shuffle.memory.limit.percent";
public static final float TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT =
0.25f;
// Rename to fraction
@ConfigurationProperty(type = "float")
public static final String TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT = TEZ_RUNTIME_PREFIX +
"shuffle.merge.percent";
public static final float TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT = 0.90f;
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS = TEZ_RUNTIME_PREFIX +
"shuffle.memory-to-memory.segments";
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM = TEZ_RUNTIME_PREFIX +
"shuffle.memory-to-memory.enable";
public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT =
false;
@Private
@Unstable
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL = TEZ_RUNTIME_PREFIX +
"shuffle.fetcher.use-shared-pool";
public static final boolean TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT = false;
@ConfigurationProperty(type = "float")
public static final String TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX +
"task.input.post-merge.buffer.percent";
public static final float TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT = 0.0f;
@ConfigurationProperty
public static final String TEZ_RUNTIME_GROUP_COMPARATOR_CLASS = TEZ_RUNTIME_PREFIX +
"group.comparator.class";
@ConfigurationProperty
public static final String TEZ_RUNTIME_INTERNAL_SORTER_CLASS = TEZ_RUNTIME_PREFIX +
"internal.sorter.class";
@ConfigurationProperty
public static final String TEZ_RUNTIME_KEY_COMPARATOR_CLASS =
TEZ_RUNTIME_PREFIX + "key.comparator.class";
@ConfigurationProperty
public static final String TEZ_RUNTIME_KEY_CLASS = TEZ_RUNTIME_PREFIX + "key.class";
@ConfigurationProperty
public static final String TEZ_RUNTIME_VALUE_CLASS = TEZ_RUNTIME_PREFIX + "value.class";
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_COMPRESS = TEZ_RUNTIME_PREFIX + "compress";
@ConfigurationProperty
public static final String TEZ_RUNTIME_COMPRESS_CODEC = TEZ_RUNTIME_PREFIX + "compress.codec";
// TODO Move this key to MapReduce
@ConfigurationProperty
public static final String TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS =
TEZ_RUNTIME_PREFIX + "key.secondary.comparator.class";
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED =
TEZ_RUNTIME_PREFIX +
"empty.partitions.info-via-events.enabled";
public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true;
@Private
public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED =
TEZ_RUNTIME_PREFIX + "transfer.data-via-events.enabled";
@Private
public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
@Private
public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE =
TEZ_RUNTIME_PREFIX + "transfer.data-via-events.max-size";
@Private
public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 512;
@Private
public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE =
TEZ_RUNTIME_PREFIX + "transfer.data-via-events.support.in-mem.file";
@Private
public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT
= true;
/**
* If the shuffle input is on the local host bypass the http fetch and access the files directly
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH = TEZ_RUNTIME_PREFIX + "optimize.local.fetch";
public static final boolean TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT = true;
/**
* Expert level setting. Enable pipelined shuffle in ordered outputs and in unordered
* partitioned outputs. In ordered cases, it works with PipelinedSorter.
* set tez.runtime.sort.threads to greater than 1 to enable pipelinedsorter.
* Ensure to set tez.runtime.enable.final-merge.in.output=false.
* Speculative execution needs to be turned off when using this parameter. //TODO: TEZ-2132
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED =
TEZ_RUNTIME_PREFIX + "pipelined-shuffle.enabled";
public static final boolean TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT = false;
/**
* Expert level setting. Enable final merge in ordered (defaultsorter/pipelinedsorter) outputs.
* Speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT =
TEZ_RUNTIME_PREFIX + "enable.final-merge.in.output";
public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT = true;
/**
* Expert level setting. How long should @link{ShuffleManager} wait for batching
* before sending the events in milliseconds. Set to -1 to not wait.
*/
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_BATCH_WAIT =
TEZ_RUNTIME_PREFIX + "shuffle.batch.wait";
public static final int TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT = -1;
/**
* Share data fetched between tasks running on the same host if applicable
*/
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH = TEZ_RUNTIME_PREFIX
+ "optimize.shared.fetch";
/**
* shared mode bypassing the http fetch is not enabled by default till we have unit tests in.
*/
public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false;
/**
* Used only for internal testing. Strictly not recommended to be used elsewhere. This
* parameter could be changed/dropped later.
*/
@Unstable
@Private
@ConfigurationProperty(type = "boolean")
public static final String TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT = TEZ_RUNTIME_PREFIX
+ "cleanup.files.on.interrupt";
public static final boolean TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT = false;
// TODO TEZ-1233 - allow this property to be set per vertex
// TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
/**
* Value: Boolean
* Whether to publish configuration information to History logger. Default false.
*/
@ConfigurationProperty
public static final String TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT =
TEZ_RUNTIME_PREFIX + "convert.user-payload.to.history-text";
public static final boolean TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT = false;
@Unstable
@Private
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS = TEZ_RUNTIME_PREFIX +
"merge.progress.records";
public static final long TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT = 10000;
static {
tezRuntimeKeys.add(TEZ_RUNTIME_IFILE_READAHEAD);
tezRuntimeKeys.add(TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
tezRuntimeKeys.add(TEZ_RUNTIME_IO_SORT_FACTOR);
tezRuntimeKeys.add(TEZ_RUNTIME_SORT_SPILL_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_IO_SORT_MB);
tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
tezRuntimeKeys.add(
TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB);
tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_COMBINER_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
tezRuntimeKeys.add
(TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_KEY_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_VALUE_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS);
tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS_CODEC);
tezRuntimeKeys.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BATCH_WAIT);
defaultConf.addResource("core-default.xml");
defaultConf.addResource("core-site.xml");
defaultConf.addResource("tez-site.xml");
for (Map.Entry<String, String> confEntry : defaultConf) {
if (tezRuntimeKeys.contains(confEntry.getKey())) {
tezRuntimeConfMap.put(confEntry.getKey(), confEntry.getValue());
} else {
// TODO TEZ-1232 Filter out parameters from TezConfiguration, and Task specific confs
otherConfMap.put(confEntry.getKey(), confEntry.getValue());
otherKeys.add(confEntry.getKey());
}
}
// Do NOT need all prefixes from the following list. Only specific ones are allowed
// "hadoop.", "hadoop.security", "io.", "fs.", "ipc.", "net.", "file.", "dfs.", "ha.", "s3.", "nfs3.", "rpc.", "ssl."
allowedPrefixes.add("io.");
allowedPrefixes.add("file.");
allowedPrefixes.add("fs.");
allowedPrefixes.add("ssl.");
umnodifiableTezRuntimeKeySet = Collections.unmodifiableSet(tezRuntimeKeys);
unmodifiableOtherKeySet = Collections.unmodifiableSet(otherKeys);
unmodifiableAllowedPrefixes = Collections.unmodifiableList(allowedPrefixes);
}
@Private
public static Set<String> getRuntimeConfigKeySet() {
return umnodifiableTezRuntimeKeySet;
}
@Private
public static Set<String> getRuntimeAdditionalConfigKeySet() {
return unmodifiableOtherKeySet;
}
@Private
public static List<String> getAllowedPrefixes() {
return allowedPrefixes;
}
@Private
public static Map<String, String> getTezRuntimeConfigDefaults() {
return Collections.unmodifiableMap(tezRuntimeConfMap);
}
@Private
public static Map<String, String> getOtherConfigDefaults() {
return Collections.unmodifiableMap(otherConfMap);
}
public enum ReportPartitionStats {
@Deprecated
/**
* Don't report partition stats. It is the same as NONE.
* It is defined to maintain backward compatibility given
* Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
* to be boolean type.
*/
DISABLED("false"),
@Deprecated
/**
* Report partition stats. It is the same as MEMORY_OPTIMIZED.
* It is defined to maintain backward compatibility given
* Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
* to be boolean type.
*/
ENABLED("true"),
/**
* Don't report partition stats.
*/
NONE("none"),
/**
* Report partition stats with less precision to reduce
* memory and CPU overhead
*/
MEMORY_OPTIMIZED("memory_optimized"),
/**
* Report precise partition stats in MB.
*/
PRECISE("precise");
private final String type;
private ReportPartitionStats(String type) {
this.type = type;
}
public final String getType() {
return type;
}
public boolean isEnabled() {
return !equals(ReportPartitionStats.DISABLED) &&
!equals(ReportPartitionStats.NONE);
}
public boolean isPrecise() {
return equals(ReportPartitionStats.PRECISE);
}
public static ReportPartitionStats fromString(String type) {
if (type != null) {
for (ReportPartitionStats b : ReportPartitionStats.values()) {
if (type.equalsIgnoreCase(b.type)) {
return b;
}
}
}
throw new IllegalArgumentException("Invalid type " + type);
}
}
}