blob: 0bcd45e27f44f5bcc4434405f6afce7e791321c1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.tez.mapreduce.hadoop;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
public class MultiStageMRConfigUtil {
// Methods based on Stage Num //
// Returns config settings specific to stage
public static Configuration getBasicIntermediateStageConf(
Configuration baseConf, int i) {
return getBasicIntermediateStageConfInternal(baseConf,
getPropertyNameForIntermediateStage(i, ""), false, true);
// Returns and removes config settings specific to stage
public static Configuration getAndRemoveBasicIntermediateStageConf(
Configuration baseConf, int i) {
return getBasicIntermediateStageConfInternal(baseConf,
getPropertyNameForIntermediateStage(i, ""), true, true);
// TODO Get rid of this once YARNRunner starts using VertexNames.
public static Configuration getIntermediateStageConf(Configuration baseConf,
int i) {
return getBasicIntermediateStageConfInternal(baseConf,
getPropertyNameForIntermediateStage(i, ""), false, false);
// FIXME small perf hit. Change this to parse through all keys once and
// generate objects per
// stage instead of scanning through conf multiple times.
public static Configuration getAndRemoveBasicNonIntermediateStageConf(
Configuration baseConf) {
Configuration newConf = new Configuration(false);
for (String key : DeprecatedKeys.getMRToEngineParamMap().keySet()) {
if (baseConf.get(key) != null) {
newConf.set(key, baseConf.get(key));
for (String key : DeprecatedKeys.getMultiStageParamMap().keySet()) {
if (baseConf.get(key) != null) {
newConf.set(key, baseConf.get(key));
return newConf;
// TODO MRR FIXME based on conf format.
public static int getNumIntermediateStages(Configuration conf) {
return conf.getInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 0);
// TODO MRR FIXME based on conf format.
// Intermediate stage numbers should start from 1.
public static String getPropertyNameForIntermediateStage(
int intermediateStage, String originalPropertyName) {
return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "."
+ originalPropertyName;
// Methods based on Vertex Name //
private static final String INITIAL_MAP_VERTEX_NAME = "initialmap";
private static final String FINAL_REDUCE_VERTEX_NAME = "finalreduce";
private static final String INTERMEDIATE_TASK_VERTEX_NAME_PREFIX = "ivertex";
public static String getInitialMapVertexName() {
public boolean isInitialMapVertex(String vertexName) {
return vertexName.equals(INITIAL_MAP_VERTEX_NAME);
public static String getFinalReduceVertexName() {
public boolean isFinalReduceVertex(String vertexName) {
return vertexName.equals(FINAL_REDUCE_VERTEX_NAME);
public static String getIntermediateStageVertexName(int stageNum) {
public static int getIntermediateStageNum(String vertexName) {
if (vertexName.matches(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + "\\d+")) {
return Integer.parseInt(vertexName
} else {
return -1;
// Returns config settings specific to named vertex
public static Configuration getBasicConfForVertex(Configuration baseConf,
String vertexName) {
return getBasicIntermediateStageConfInternal(baseConf,
getPropertyNameForVertex(vertexName, ""), false, true);
// Returns and removes config settings specific to named vertex
public static Configuration getAndRemoveBasicConfForVertex(
Configuration baseConf, String vertexName) {
return getBasicIntermediateStageConfInternal(baseConf,
getPropertyNameForVertex(vertexName, ""), true, true);
// Returns a config with all parameters, and vertex specific params moved to
// the top level.
public static Configuration getConfForVertex(Configuration baseConf,
String vertexName) {
return getBasicIntermediateStageConfInternal(baseConf,
getPropertyNameForVertex(vertexName, ""), false, false);
public static void addConfigurationForVertex(Configuration baseConf,
String vertexName, Configuration vertexConf) {
Iterator<Entry<String, String>> confEntries = vertexConf.iterator();
while (confEntries.hasNext()) {
Entry<String, String> entry =;
baseConf.set(getPropertyNameForVertex(vertexName, entry.getKey()),
// TODO This is TezEngineLand
public static String getPropertyNameForVertex(String vertexName,
String originalPropertyName) {
return MRJobConfig.MRR_VERTEX_PREFIX + vertexName + "."
+ originalPropertyName;
// TODO Get rid of this. Temporary for testing.
public static void printConf(Configuration conf) {
Iterator<Entry<String, String>> confEntries = conf.iterator();
while (confEntries.hasNext()) {
Entry<String, String> entry =;
String key = entry.getKey();
String value = entry.getValue();
System.err.println("Key: " + key + ", Value: " + value);
static Configuration extractStageConf(Configuration baseConf,
String prefix) {
Configuration strippedConf = new Configuration(false);
Configuration conf = new Configuration(false);
Iterator<Entry<String, String>> confEntries = baseConf.iterator();
while (confEntries.hasNext()) {
Entry<String, String> entry =;
String key = entry.getKey();
if (key.startsWith(prefix)) {
// Ignore keys for other intermediate stages in case of an initial or
// final stage.
if (prefix.equals("")) {
if (key.startsWith(MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX)) {
String newKey = key.replace(prefix, "");
strippedConf.set(newKey, entry.getValue());
} else {
// Ignore keys for other intermediate stages.
if (key.startsWith(MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX)) {
// Set all base keys in the new conf
conf.set(key, entry.getValue());
// Replace values from strippedConf into the finalConf. Override values
// which may have been copied over from the baseConf root level.
Iterator<Entry<String, String>> entries = strippedConf.iterator();
while (entries.hasNext()) {
Entry<String, String> entry =;
conf.set(entry.getKey(), entry.getValue());
return conf;
// TODO MRR FIXME based on conf format.
private static Configuration getBasicIntermediateStageConfInternal(
Configuration baseConf, String prefix, boolean remove, boolean stageOnly) {
Configuration strippedConf = new Configuration(false);
Configuration conf = new Configuration(false);
Iterator<Entry<String, String>> confEntries = baseConf.iterator();
while (confEntries.hasNext()) {
Entry<String, String> entry =;
String key = entry.getKey();
if (key.startsWith(prefix)) {
if (remove) {
String newKey = key.replace(prefix, "");
strippedConf.set(newKey, entry.getValue());
} else if (!stageOnly) {
conf.set(key, entry.getValue());
// Replace values from strippedConf into the finalConf. Override values
// which may have been copied over from the baseConf root level.
if (stageOnly) {
conf = strippedConf;
} else {
Iterator<Entry<String, String>> entries = strippedConf.iterator();
while (entries.hasNext()) {
Entry<String, String> entry =;
conf.set(entry.getKey(), entry.getValue());
return conf;