blob: 7d934815811056e3e314d9e0e11b1eee3b88a8be [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.utils;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.dag.api.TezConfiguration;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
/**
* Placeholder to store additional launch command options to be specified for specific tasks in
* different vertices.
*/
public class TaskSpecificLaunchCmdOption {
private static final Logger LOG = LoggerFactory.getLogger(TaskSpecificLaunchCmdOption.class);
//To check any characters apart from "a-zA-Z_0-9 : ; , [] - space" anywhere in input.
final static Pattern INVALID_TASK_NAME_REGEX = Pattern
.compile("[^(\\w\\s;:,\\[\\]\\-)]");
/**
* Regex to validate the task ranges. Vertex name can only have [a-zA-Z_0-9], -, and
* space. Task id is expected to be a number. : is used for specifying task id range. , is used
* as task id separator.
*/
final static Pattern TASKS_REGEX = Pattern
.compile("([\\w\\s\\-]+)\\[([\\d:,\\s]*)\\];?");
//Range regex where ':' should always be prepended and appended with digit.
final static Pattern RANGE_REGEX = Pattern.compile("(\\d+):(\\d+)");
private final Map<String, BitSet> tasksMap;
//Task specific launch-cmd options
private final String tsLaunchCmdOpts;
//Task specific log options
private final String[] tsLogParams;
public TaskSpecificLaunchCmdOption(Configuration conf) {
this.tsLaunchCmdOpts =
conf.getTrimmed(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS);
this.tsLogParams = TezClientUtils
.parseLogParams(conf.getTrimmed(TezConfiguration.TEZ_TASK_SPECIFIC_LOG_LEVEL));
if (shouldParseSpecificTaskList()) {
this.tasksMap = getSpecificTasks(conf);
} else {
this.tasksMap = null;
}
}
/**
* Get task specific command options in a vertex
*
* @param launchCmdOpts
* vertex specific command option
* @param vertexName
* @param taskIdx
* @return tsLaunchCmdOpts
*/
public String getTaskSpecificOption(String launchCmdOpts, String vertexName,
int taskIdx) {
if (this.tsLaunchCmdOpts != null) {
launchCmdOpts = (launchCmdOpts == null) ? "" : launchCmdOpts;
vertexName = vertexName.replaceAll(" ", "");
String result =
this.tsLaunchCmdOpts.replaceAll("__VERTEX_NAME__", vertexName)
.replaceAll("__TASK_INDEX__", Integer.toString(taskIdx));
result = (launchCmdOpts + " " + result);
LOG.info("Launch-cmd options added to vertexName=" + vertexName
+ ", taskIdx=" + taskIdx + ", tsLaunchCmdOpts=" + result.trim());
return result.trim();
}
return launchCmdOpts;
}
public boolean hasModifiedTaskLaunchOpts() {
return !Strings.isNullOrEmpty(tsLaunchCmdOpts);
}
/**
* Retrieve a parsed form of the log string specified for per-task usage. </p>
* The first element of the array is the general log level. </p>
* The second level, if it exists, is the additional per logger configuration.
*
* @return parsed form of the log string specified. null if none specified
*/
public String[] getTaskSpecificLogParams() {
return this.tsLogParams;
}
public boolean hasModifiedLogProperties() {
return this.tsLogParams != null;
}
/**
* Find if task specific launch command options have to be added.
*
* @param vertexName
* @param taskId
* @return boolean
*/
public boolean addTaskSpecificLaunchCmdOption(String vertexName, int taskId) {
if (tasksMap == null || taskId < 0) {
return false;
}
BitSet taskSet = tasksMap.get(vertexName);
// profile all tasks in the vertex, if taskSet is empty
return (taskSet == null) ? false : ((taskSet.isEmpty()) ? true : taskSet.get(taskId));
}
private boolean shouldParseSpecificTaskList() {
return !(Strings.isNullOrEmpty(tsLaunchCmdOpts) && tsLogParams == null);
}
/**
* <pre>
* Get the set of tasks in the job, which needs task specific launch command arguments to be
* added. Example formats are
* v[0,1,2] - Add additional options to subset of tasks in a vertex
* v[1,2,3];v2[5,6,7] - Add additional options in tasks in multiple vertices
* v[1:5,20,30];v2[2:5,60,7] - To support range of tasks in vertices. Partial
* ranges are not supported (e.g v[:5],v2[2:]).
* v[] - For all tasks in a vertex
* </pre>
*
* @param conf
* @return a map from the vertex name to a BitSet representing tasks to be instruemented. null if
* the provided configuration is empty or invalid
*/
private Map<String, BitSet> getSpecificTasks(Configuration conf) {
String specificTaskList =
conf.getTrimmed(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST);
if (!Strings.isNullOrEmpty(specificTaskList) && isValid(specificTaskList)) {
final Map<String, BitSet> resultSet = new HashMap<String, BitSet>();
Matcher matcher = TASKS_REGEX.matcher(specificTaskList);
while (matcher.find()) {
String vertexName = matcher.group(1).trim();
BitSet taskSet = parseTasks(matcher.group(2).trim());
resultSet.put(vertexName, taskSet);
}
LOG.info("Specific tasks with additional launch-cmd options=" + resultSet);
return resultSet;
} else {
return null;
}
}
private boolean isValid(String specificTaskList) {
if (INVALID_TASK_NAME_REGEX.matcher(specificTaskList).find()) {
LOG.warn("Invalid option specified, "
+ TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST + "=" + specificTaskList);
return false;
}
return true;
}
/**
* Get the set of tasks that need additional launch command options within a vertex
*
* @param tasksInVertex
* @return Set<Integer> containing the task indexes to be profiled
*/
private BitSet parseTasks(String tasksInVertex) {
BitSet taskSet = new BitSet();
if (Strings.isNullOrEmpty(tasksInVertex)) {
return taskSet;
}
Iterable<String> tasks =
Splitter.on(",").omitEmptyStrings().trimResults().split(tasksInVertex);
for (String task : tasks) {
/**
* TODO: this is horrible way to check the ranges.
* Should use RangeSet when guava is upgraded. Also, need to support partial
* ranges like "1:", ":50". With current implementation partial ranges are not
* allowed.
*/
if (task.endsWith(":") || task.startsWith(":")) {
//invalid range. e.g :20, 6: are not supported.
LOG.warn("Partial range is considered as an invalid option");
return null;
}
Matcher taskMatcher = RANGE_REGEX.matcher(task);
if (taskMatcher.find()) {
int start = Integer.parseInt((taskMatcher.group(1).trim()));
int end = Integer.parseInt((taskMatcher.group(2).trim()));
for (int i = Math.min(start, end); i <= Math.max(start, end); i++) {
taskSet.set(i);
}
} else {
taskSet.set(Integer.parseInt(task.trim()));
}
}
return taskSet;
}
}