blob: 02fd48a071b9f7e9843c697de296f4d00a0a952d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes.util;
import java.lang.reflect.Field;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
* A default parser for MapReduce job configuration properties.
* MapReduce job configuration properties are represented as key-value pairs.
* Each key represents a configuration knob which controls or affects the
* behavior of a MapReduce job or a job's task. The value associated with the
* configuration key represents its value. Some of the keys are deprecated. As a
* result of deprecation some keys change or are preferred over other keys,
* across versions. {@link MapReduceJobPropertiesParser} is a utility class that
* parses MapReduce job configuration properties and converts the value into a
* well defined {@link DataType}. Users can use the
* {@link #parseJobProperty(String, String)} API to process job
* configuration parameters. This API will parse a job property represented as a
* key-value pair and return the value wrapped inside a {@link DataType}.
* Callers can then use the returned {@link DataType} for further processing.
*
* {@link MapReduceJobPropertiesParser} thrives on the key name to decide which
* {@link DataType} to wrap the value with. Values for keys representing
* job-name, queue-name, user-name etc are wrapped inside {@link JobName},
* {@link QueueName}, {@link UserName} etc respectively. Keys ending with *dir*
* are considered as a directory and hence gets be wrapped inside
* {@link FileName}. Similarly key ending with *codec*, *log*, *class* etc are
* also handled accordingly. Values representing basic java data-types like
* integer, float, double, boolean etc are wrapped inside
* {@link DefaultDataType}. If the key represents some jvm-level settings then
* only standard settings are extracted and gets wrapped inside
* {@link DefaultDataType}. Currently only '-Xmx' and '-Xms' settings are
* considered while the rest are ignored.
*
* Note that the {@link #parseJobProperty(String, String)} API
* maps the keys to a configuration parameter listed in
* {@link MRJobConfig}. This not only filters non-framework specific keys thus
* ignoring user-specific and hard-to-parse keys but also provides a consistent
* view for all possible inputs. So if users invoke the
* {@link #parseJobProperty(String, String)} API with either
* <"mapreduce.job.user.name", "bob"> or <"user.name", "bob">,
* then the result would be a {@link UserName} {@link DataType} wrapping
* the user-name "bob".
*/
@SuppressWarnings("deprecation")
public class MapReduceJobPropertiesParser implements JobPropertyParser {
private Field[] mrFields = MRJobConfig.class.getFields();
private DecimalFormat format = new DecimalFormat();
private JobConf configuration = new JobConf(false);
private static final Pattern MAX_HEAP_PATTERN =
Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
private static final Pattern MIN_HEAP_PATTERN =
Pattern.compile("-Xms[0-9]+[kKmMgGtT]?+");
// turn off the warning w.r.t deprecated mapreduce keys
static {
Logger.getLogger(Configuration.class).setLevel(Level.OFF);
}
// Accepts a key if there is a corresponding key in the current mapreduce
// configuration
private boolean accept(String key) {
return getLatestKeyName(key) != null;
}
// Finds a corresponding key for the specified key in the current mapreduce
// setup.
// Note that this API uses a cached copy of the Configuration object. This is
// purely for performance reasons.
private String getLatestKeyName(String key) {
// set the specified key
configuration.set(key, key);
try {
// check if keys in MRConfig maps to the specified key.
for (Field f : mrFields) {
String mrKey = f.get(f.getName()).toString();
if (configuration.get(mrKey) != null) {
return mrKey;
}
}
// unset the key
return null;
} catch (IllegalAccessException iae) {
throw new RuntimeException(iae);
} finally {
// clean up!
configuration.clear();
}
}
@Override
public DataType<?> parseJobProperty(String key, String value) {
if (accept(key)) {
return fromString(key, value);
}
return null;
}
/**
* Extracts the -Xmx heap option from the specified string.
*/
public static void extractMaxHeapOpts(final String javaOptions,
List<String> heapOpts,
List<String> others) {
for (String opt : javaOptions.split(" ")) {
Matcher matcher = MAX_HEAP_PATTERN.matcher(opt);
if (matcher.find()) {
heapOpts.add(opt);
} else {
others.add(opt);
}
}
}
/**
* Extracts the -Xms heap option from the specified string.
*/
public static void extractMinHeapOpts(String javaOptions,
List<String> heapOpts, List<String> others) {
for (String opt : javaOptions.split(" ")) {
Matcher matcher = MIN_HEAP_PATTERN.matcher(opt);
if (matcher.find()) {
heapOpts.add(opt);
} else {
others.add(opt);
}
}
}
// Maps the value of the specified key.
private DataType<?> fromString(String key, String value) {
DefaultDataType defaultValue = new DefaultDataType(value);
if (value != null) {
// check known configs
// job-name
String latestKey = getLatestKeyName(key);
if (MRJobConfig.JOB_NAME.equals(latestKey)) {
return new JobName(value);
}
// user-name
if (MRJobConfig.USER_NAME.equals(latestKey)) {
return new UserName(value);
}
// queue-name
if (MRJobConfig.QUEUE_NAME.equals(latestKey)) {
return new QueueName(value);
}
if (MRJobConfig.MAP_JAVA_OPTS.equals(latestKey)
|| MRJobConfig.REDUCE_JAVA_OPTS.equals(latestKey)) {
List<String> heapOptions = new ArrayList<String>();
extractMaxHeapOpts(value, heapOptions, new ArrayList<String>());
extractMinHeapOpts(value, heapOptions, new ArrayList<String>());
return new DefaultDataType(StringUtils.join(heapOptions, ' '));
}
//TODO compression?
//TODO Other job configs like FileOutputFormat/FileInputFormat etc
// check if the config parameter represents a number
try {
format.parse(value);
return defaultValue;
} catch (ParseException pe) {}
// check if the config parameters represents a boolean
// avoiding exceptions
if ("true".equals(value) || "false".equals(value)) {
return defaultValue;
}
// check if the config parameter represents a class
if (latestKey.endsWith(".class") || latestKey.endsWith(".codec")) {
return new ClassName(value);
}
// handle distributed cache sizes and timestamps
if (latestKey.endsWith("sizes")
|| latestKey.endsWith(".timestamps")) {
return defaultValue;
}
// check if the config parameter represents a file-system path
//TODO: Make this concrete .location .path .dir .jar?
if (latestKey.endsWith(".dir") || latestKey.endsWith(".location")
|| latestKey.endsWith(".jar") || latestKey.endsWith(".path")
|| latestKey.endsWith(".logfile") || latestKey.endsWith(".file")
|| latestKey.endsWith(".files") || latestKey.endsWith(".archives")) {
try {
return new FileName(value);
} catch (Exception ioe) {}
}
}
return null;
}
}