blob: 6ac0dbb3a8b0b105d06d859af61427643ba12610 [file] [log] [blame]
/*
* *
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
@InterfaceAudience.Public
@InterfaceStability.Evolving
/**
* Configure {@link org.apache.tez.runtime.library.input.UnorderedKVInput} </p>
*
* Values will be picked up from tez-site if not specified, otherwise defaults from
* {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
*/
public class UnorderedKVInputConfig {
/**
* Configure parameters which are specific to the Input.
*/
@InterfaceAudience.Private
public static interface SpecificConfigBuilder<T> extends BaseConfigBuilder<T> {
/**
* Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
* data.
*
* @param shuffleBufferFraction fraction of container size
* @return instance of the current builder
*/
public T setShuffleBufferFraction(float shuffleBufferFraction);
/**
* Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
* the shuffle buffer.
*
* @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
* @return instance of the current builder
*/
public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
/**
* Configure the point at which in memory segments will be merged and written out to a single
* large disk segment. This is specified as a
* fraction of the shuffle buffer. </p> Has no affect at the moment.
*
* @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
* filled, will
* trigger a merge
* @return instance of the current builder
*/
public T setMergeFraction(float mergeFraction);
}
@SuppressWarnings("rawtypes")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfig.Builder> implements
SpecificConfigBuilder<SpecificBuilder> {
private final E edgeBuilder;
private final UnorderedKVInputConfig.Builder builder;
@InterfaceAudience.Private
SpecificBuilder(E edgeBuilder, UnorderedKVInputConfig.Builder builder) {
this.edgeBuilder = edgeBuilder;
this.builder = builder;
}
@Override
public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
builder.setShuffleBufferFraction(shuffleBufferFraction);
return this;
}
@Override
public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
return this;
}
@Override
public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
builder.setMergeFraction(mergeFraction);
return this;
}
@Override
public SpecificBuilder<E> setAdditionalConfiguration(String key, String value) {
builder.setAdditionalConfiguration(key, value);
return this;
}
@Override
public SpecificBuilder<E> setAdditionalConfiguration(Map<String, String> confMap) {
builder.setAdditionalConfiguration(confMap);
return this;
}
@Override
public SpecificBuilder<E> setFromConfiguration(Configuration conf) {
builder.setFromConfiguration(conf);
return this;
}
@Override
public SpecificBuilder setFromConfigurationUnfiltered(Configuration conf) {
builder.setFromConfigurationUnfiltered(conf);
return this;
}
public E done() {
return edgeBuilder;
}
}
@InterfaceAudience.Private
@VisibleForTesting
Configuration conf;
@InterfaceAudience.Private
@VisibleForTesting
UnorderedKVInputConfig() {
}
private UnorderedKVInputConfig(Configuration conf) {
this.conf = conf;
}
/**
* Get a UserPayload representation of the Configuration
* @return a {@link org.apache.tez.dag.api.UserPayload} instance
*/
public UserPayload toUserPayload() {
try {
return TezUtils.createUserPayloadFromConf(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@InterfaceAudience.Private
public void fromUserPayload(UserPayload payload) {
try {
this.conf = TezUtils.createConfFromUserPayload(payload);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@InterfaceAudience.Private
String toHistoryText() {
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
return TezUtils.convertToHistoryText(conf);
}
return null;
}
public static Builder newBuilder(String keyClass, String valueClass) {
return new Builder(keyClass, valueClass);
}
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static class Builder implements SpecificConfigBuilder<Builder> {
private final Configuration conf = new Configuration(false);
/**
* Create a configuration builder for {@link org.apache.tez.runtime.library.input.UnorderedKVInput}
*
* @param keyClassName the key class name
* @param valueClassName the value class name
*/
@InterfaceAudience.Private
Builder(String keyClassName, String valueClassName) {
this();
Objects.requireNonNull(keyClassName, "Key class name cannot be null");
Objects.requireNonNull(valueClassName, "Value class name cannot be null");
setKeyClassName(keyClassName);
setValueClassName(valueClassName);
}
@InterfaceAudience.Private
Builder() {
Map<String, String> tezDefaults = ConfigUtils
.extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
UnorderedKVInput.getConfigurationKeySet());
ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
}
@InterfaceAudience.Private
Builder setKeyClassName(String keyClassName) {
Objects.requireNonNull(keyClassName, "Key class name cannot be null");
this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
return this;
}
@InterfaceAudience.Private
Builder setValueClassName(String valueClassName) {
Objects.requireNonNull(valueClassName, "Value class name cannot be null");
this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
return this;
}
@Override
public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
this.conf
.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, shuffleBufferFraction);
return this;
}
@Override
public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
maxSingleSegmentFraction);
return this;
}
@Override
public Builder setMergeFraction(float mergeFraction) {
this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
return this;
}
@SuppressWarnings("unchecked")
@Override
public Builder setAdditionalConfiguration(String key, String value) {
Objects.requireNonNull(key, "Key cannot be null");
if (ConfigUtils.doesKeyQualify(key,
Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
TezRuntimeConfiguration.getAllowedPrefixes())) {
if (value == null) {
this.conf.unset(key);
} else {
this.conf.set(key, value);
}
}
return this;
}
@SuppressWarnings("unchecked")
@Override
public Builder setAdditionalConfiguration(Map<String, String> confMap) {
Objects.requireNonNull(confMap, "ConfMap cannot be null");
Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
ConfigUtils.addConfigMapToConfiguration(this.conf, map);
return this;
}
@SuppressWarnings("unchecked")
@Override
public Builder setFromConfiguration(Configuration conf) {
// Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
Preconditions.checkArgument(conf != null, "Configuration cannot be null");
Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
ConfigUtils.addConfigMapToConfiguration(this.conf, map);
return this;
}
@SuppressWarnings("unchecked")
@Override
public Builder setFromConfigurationUnfiltered(Configuration conf) {
// Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
Preconditions.checkArgument(conf != null, "Configuration cannot be null");
ConfigUtils.mergeConfs(this.conf, conf);
return this;
}
public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
@Nullable Map<String, String> codecConf) {
this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
if (enabled && compressionCodec != null) {
this.conf
.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
}
if (codecConf != null) {
// Merging the confs for now. Change to be specific in the future.
ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
TezRuntimeConfiguration.getRuntimeConfigKeySet());
}
return this;
}
/**
* Set serialization class responsible for providing serializer/deserializer for key/value and
* the corresponding comparator class to be used as key comparator.
*
* @param serializationClassName
* @param serializerConf the serializer configuration. This can be null, and is a
* {@link java.util.Map} of key-value pairs. The keys should be limited
* to the ones required by the comparator.
* @return this object for further chained method calls
*/
public Builder setKeySerializationClass(String serializationClassName,
@Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
if (serializerConf != null) {
// Merging the confs for now. Change to be specific in the future.
ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
TezRuntimeConfiguration.getRuntimeConfigKeySet());
}
return this;
}
/**
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
* @param serializerConf the serializer configuration. This can be null, and is a
* {@link java.util.Map} of key-value pairs. The keys should be limited
* to the ones required by the comparator.
* @return this object for further chained method calls
*/
public Builder setValueSerializationClass(String serializationClassName,
@Nullable Map<String, String> serializerConf) {
Preconditions.checkArgument(serializationClassName != null,
"serializationClassName cannot be null");
this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+ conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
if (serializerConf != null) {
// Merging the confs for now. Change to be specific in the future.
ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
TezRuntimeConfiguration.getRuntimeConfigKeySet());
}
return this;
}
/**
* Create the actual configuration instance.
*
* @return an instance of the Configuration
*/
public UnorderedKVInputConfig build() {
return new UnorderedKVInputConfig(this.conf);
}
}
}