blob: 0d8a5aef758da4f630a8578dc9e45471542ac16c [file] [log] [blame]
/*
* *
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
/**
* Configure payloads for the UnorderedPartitionedKVOutput and UnorderedKVInput pair </p>
*
* Values will be picked up from tez-site if not specified, otherwise defaults from
* {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class UnorderedPartitionedKVEdgeConfig
extends HadoopKeyValuesBasedBaseEdgeConfig {
private final UnorderedPartitionedKVOutputConfig outputConf;
private final UnorderedKVInputConfig inputConf;
private UnorderedPartitionedKVEdgeConfig(
UnorderedPartitionedKVOutputConfig outputConfiguration,
UnorderedKVInputConfig inputConfiguration) {
this.outputConf = outputConfiguration;
this.inputConf = inputConfiguration;
}
/**
* Create a builder to configure the relevant Input and Output. </p> This method should only be
* used when using a custom Partitioner which requires specific Configuration. {@link
* #newBuilder(String, String, String)} is the preferred method to crate an instance of the
* Builder
*
* @param keyClassName the key class name
* @param valueClassName the value class name
* @param partitionerClassName the partitioner class name
* @param partitionerConf the partitioner configuration. This can be null, and is a {@link
* java.util.Map} of key-value pairs. The keys should be limited to
* the ones required by the partitioner.
* @return a builder to configure the edge
*/
public static Builder newBuilder(String keyClassName, String valueClassName,
String partitionerClassName,
@Nullable Map<String, String> partitionerConf) {
return new Builder(keyClassName, valueClassName, partitionerClassName, partitionerConf);
}
/**
* Create a builder to configure the relevant Input and Output
*
* @param keyClassName the key class name
* @param valueClassName the value class name
* @param partitionerClassName the partitioner class name
* @return a builder to configure the edge
*/
public static Builder newBuilder(String keyClassName, String valueClassName,
String partitionerClassName) {
return newBuilder(keyClassName, valueClassName, partitionerClassName, null);
}
@Override
public UserPayload getOutputPayload() {
return outputConf.toUserPayload();
}
@Override
public String getOutputClassName() {
return UnorderedPartitionedKVOutput.class.getName();
}
@Override
public UserPayload getInputPayload() {
return inputConf.toUserPayload();
}
@Override
public String getOutputHistoryText() {
return outputConf.toHistoryText();
}
@Override
public String getInputHistoryText() {
return inputConf.toHistoryText();
}
@Override
public String getInputClassName() {
return UnorderedKVInput.class.getName();
}
/**
* This is a convenience method for the typical usage of this edge, and creates an instance of
* {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
* If custom edge properties are required, the methods to get the relevant payloads should be
* used. </p>
* * In this case - DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED,
* EdgeProperty.SchedulingType.SEQUENTIAL
*
* @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
*/
public EdgeProperty createDefaultEdgeProperty() {
EdgeProperty edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create(
getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(
getInputClassName()).setUserPayload(getInputPayload()));
Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
/**
* This is a convenience method for creating an Edge descriptor based on the specified
* EdgeManagerDescriptor.
*
* @param edgeManagerDescriptor the custom edge specification
* @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
*/
public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
Objects.requireNonNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
EdgeProperty edgeProperty =
EdgeProperty.create(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfig.Builder<Builder> {
private final UnorderedPartitionedKVOutputConfig.Builder outputBuilder =
new UnorderedPartitionedKVOutputConfig.Builder();
private final UnorderedPartitionedKVOutputConfig.SpecificBuilder<UnorderedPartitionedKVEdgeConfig.Builder>
specificOutputBuilder =
new UnorderedPartitionedKVOutputConfig.SpecificBuilder<UnorderedPartitionedKVEdgeConfig.Builder>(
this, outputBuilder);
private final UnorderedKVInputConfig.Builder inputBuilder =
new UnorderedKVInputConfig.Builder();
private final UnorderedKVInputConfig.SpecificBuilder<UnorderedPartitionedKVEdgeConfig.Builder>
specificInputBuilder =
new UnorderedKVInputConfig.SpecificBuilder<UnorderedPartitionedKVEdgeConfig.Builder>(
this, inputBuilder);
@InterfaceAudience.Private
Builder(String keyClassName, String valueClassName, String partitionerClassName,
Map<String, String> partitionerConf) {
outputBuilder.setKeyClassName(keyClassName);
outputBuilder.setValueClassName(valueClassName);
outputBuilder.setPartitioner(partitionerClassName, partitionerConf);
inputBuilder.setKeyClassName(keyClassName);
inputBuilder.setValueClassName(valueClassName);
}
@Override
public Builder setCompression(boolean enabled, @Nullable String compressionCodec, @Nullable Map<String, String> codecConf) {
outputBuilder.setCompression(enabled, compressionCodec, codecConf);
inputBuilder.setCompression(enabled, compressionCodec, codecConf);
return this;
}
@Override
public Builder setAdditionalConfiguration(String key, String value) {
outputBuilder.setAdditionalConfiguration(key, value);
inputBuilder.setAdditionalConfiguration(key, value);
return this;
}
@Override
public Builder setAdditionalConfiguration(Map<String, String> confMap) {
outputBuilder.setAdditionalConfiguration(confMap);
inputBuilder.setAdditionalConfiguration(confMap);
return this;
}
@Override
/**
* Edge config options are derived from client-side tez-site.xml (recommended).
* Optionally invoke setFromConfiguration to override these config options via commandline arguments.
*
* @param conf
* @return this object for further chained method calls
*/
public Builder setFromConfiguration(Configuration conf) {
outputBuilder.setFromConfiguration(conf);
inputBuilder.setFromConfiguration(conf);
return this;
}
@Override
public Builder setFromConfigurationUnfiltered(
Configuration conf) {
outputBuilder.setFromConfigurationUnfiltered(conf);
inputBuilder.setFromConfigurationUnfiltered(conf);
return this;
}
/**
* Set serialization class responsible for providing serializer/deserializer for keys.
*
* @param serializationClassName
* @param serializerConf the serializer configuration. This can be null, and is a
* {@link java.util.Map} of key-value pairs. The keys should be limited
* to the ones required by the comparator.
* @return this object for further chained method calls
*/
public Builder setKeySerializationClass(String serializationClassName,
@Nullable Map<String, String> serializerConf) {
outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
return this;
}
/**
* Set serialization class responsible for providing serializer/deserializer for values.
*
* @param serializationClassName
* @param serializerConf the serializer configuration. This can be null, and is a
* {@link java.util.Map} of key-value pairs. The keys should be limited
* to the ones required by the comparator.
* @return this object for further chained method calls
*/
public Builder setValueSerializationClass(String serializationClassName, @Nullable Map<String, String> serializerConf) {
outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
return this;
}
/**
* Configure the specific output
*
* @return a builder to configure the output
*/
public UnorderedPartitionedKVOutputConfig.SpecificBuilder<Builder> configureOutput() {
return specificOutputBuilder;
}
/**
* Configure the specific input
* @return a builder to configure the input
*/
public UnorderedKVInputConfig.SpecificBuilder<Builder> configureInput() {
return specificInputBuilder;
}
/**
* Build and return an instance of the configuration
* @return an instance of the acatual configuration
*/
public UnorderedPartitionedKVEdgeConfig build() {
return new UnorderedPartitionedKVEdgeConfig(outputBuilder.build(), inputBuilder.build());
}
}
}