blob: a03c81267284f3a6b0025c214022820be240629e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.slive;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.slive.Constants.OperationType;
import org.apache.hadoop.util.StringUtils;
/**
* Simple access layer onto of a configuration object that extracts the slive
* specific configuration values needed for slive running
*/
class ConfigExtractor {
private static final Log LOG = LogFactory.getLog(ConfigExtractor.class);
private Configuration config;
ConfigExtractor(Configuration cfg) {
this.config = cfg;
}
/**
* @return the wrapped configuration that this extractor will use
*/
Configuration getConfig() {
return this.config;
}
/**
* @return the location of where data should be written to
*/
Path getDataPath() {
Path base = getBaseDirectory();
if (base == null) {
return null;
}
return new Path(base, Constants.DATA_DIR);
}
/**
* @return the location of where the reducer should write its data to
*/
Path getOutputPath() {
Path base = getBaseDirectory();
if (base == null) {
return null;
}
return new Path(base, Constants.OUTPUT_DIR);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
*
* @return the base directory where output & data should be stored using
* primary,config,default (in that order)
*/
Path getBaseDirectory(String primary) {
String path = primary;
if (path == null) {
path = config.get(ConfigOption.BASE_DIR.getCfgOption());
}
if (path == null) {
path = ConfigOption.BASE_DIR.getDefault();
}
if (path == null) {
return null;
}
return new Path(path);
}
/**
* @return the base directory using only config and default values
*/
Path getBaseDirectory() {
return getBaseDirectory(null);
}
/**
* @return whether the mapper or reducer should exit when they get there first
* error using only config and default values
*/
boolean shouldExitOnFirstError() {
return shouldExitOnFirstError(null);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
*
* @return the boolean of whether the mapper/reducer should exit when they
* first error from primary,config,default (in that order)
*/
boolean shouldExitOnFirstError(String primary) {
String val = primary;
if (val == null) {
val = config.get(ConfigOption.EXIT_ON_ERROR.getCfgOption());
}
if (val == null) {
return ConfigOption.EXIT_ON_ERROR.getDefault();
}
return Boolean.parseBoolean(val);
}
/**
* @return the number of reducers to use
*/
Integer getReducerAmount() {
// should be slive.reduces
return getInteger(null, ConfigOption.REDUCES);
}
/**
* @return the number of mappers to use using config and default values for
* lookup
*/
Integer getMapAmount() {
return getMapAmount(null);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the reducer amount to use
*/
Integer getMapAmount(String primary) {
return getInteger(primary, ConfigOption.MAPS);
}
/**
* @return the duration in seconds (or null or Integer.MAX for no limit) using
* the configuration and default as lookup
*/
Integer getDuration() {
return getDuration(null);
}
/**
* @return the duration in milliseconds or null if no limit using config and
* default as lookup
*/
Integer getDurationMilliseconds() {
Integer seconds = getDuration();
if (seconds == null || seconds == Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
int milliseconds = (seconds * 1000);
if (milliseconds < 0) {
milliseconds = 0;
}
return milliseconds;
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the duration in seconds (or null or Integer.MAX for no limit)
*/
Integer getDuration(String primary) {
return getInteger(primary, ConfigOption.DURATION);
}
/**
* @return the total number of operations to run using config and default as
* lookup
*/
Integer getOpCount() {
return getOpCount(null);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the total number of operations to run
*/
Integer getOpCount(String primary) {
return getInteger(primary, ConfigOption.OPS);
}
/**
* @return the total number of files per directory using config and default as
* lookup
*/
Integer getDirSize() {
return getDirSize(null);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the total number of files per directory
*/
Integer getDirSize(String primary) {
return getInteger(primary, ConfigOption.DIR_SIZE);
}
/**
* @param primary
* the primary string to attempt to convert into a integer
* @param opt
* the option to use as secondary + default if no primary given
* @return a parsed integer
*/
private Integer getInteger(String primary, ConfigOption<Integer> opt) {
String value = primary;
if (value == null) {
value = config.get(opt.getCfgOption());
}
if (value == null) {
return opt.getDefault();
}
return Integer.parseInt(value);
}
/**
* @return the total number of files allowed using configuration and default
* for lookup
*/
Integer getTotalFiles() {
return getTotalFiles(null);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the total number of files allowed
*/
Integer getTotalFiles(String primary) {
return getInteger(primary, ConfigOption.FILES);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the random seed start point or null if none
*/
Long getRandomSeed(String primary) {
String seed = primary;
if (seed == null) {
seed = config.get(ConfigOption.RANDOM_SEED.getCfgOption());
}
if (seed == null) {
return null;
}
return Long.parseLong(seed);
}
/**
* @return the random seed start point or null if none using config and then
* default as lookup
*/
Long getRandomSeed() {
return getRandomSeed(null);
}
/**
* @return the result file location or null if none using config and then
* default as lookup
*/
String getResultFile() {
return getResultFile(null);
}
/**
* Gets the grid queue name to run on using config and default only
*
* @return String
*/
String getQueueName() {
return getQueueName(null);
}
/**
* Gets the grid queue name to run on using the primary string or config or
* default
*
* @param primary
*
* @return String
*/
String getQueueName(String primary) {
String q = primary;
if (q == null) {
q = config.get(ConfigOption.QUEUE_NAME.getCfgOption());
}
if (q == null) {
q = ConfigOption.QUEUE_NAME.getDefault();
}
return q;
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the result file location
*/
String getResultFile(String primary) {
String fn = primary;
if (fn == null) {
fn = config.get(ConfigOption.RESULT_FILE.getCfgOption());
}
if (fn == null) {
fn = ConfigOption.RESULT_FILE.getDefault();
}
return fn;
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
* @return the integer range allowed for the block size
*/
Range<Long> getBlockSize(String primary) {
return getMinMaxBytes(ConfigOption.BLOCK_SIZE, primary);
}
/**
* @return the integer range allowed for the block size using config and
* default for lookup
*/
Range<Long> getBlockSize() {
return getBlockSize(null);
}
/**
* @param cfgopt
* the configuration option to use for config and default lookup
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the parsed short range from primary, config, default
*/
private Range<Short> getMinMaxShort(ConfigOption<Short> cfgopt, String primary) {
String sval = primary;
if (sval == null) {
sval = config.get(cfgopt.getCfgOption());
}
Range<Short> range = null;
if (sval != null) {
String pieces[] = Helper.getTrimmedStrings(sval);
if (pieces.length == 2) {
String min = pieces[0];
String max = pieces[1];
short minVal = Short.parseShort(min);
short maxVal = Short.parseShort(max);
if (minVal > maxVal) {
short tmp = minVal;
minVal = maxVal;
maxVal = tmp;
}
range = new Range<Short>(minVal, maxVal);
}
}
if (range == null) {
Short def = cfgopt.getDefault();
if (def != null) {
range = new Range<Short>(def, def);
}
}
return range;
}
/**
* @param cfgopt
* the configuration option to use for config and default lookup
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the parsed long range from primary, config, default
*/
private Range<Long> getMinMaxLong(ConfigOption<Long> cfgopt, String primary) {
String sval = primary;
if (sval == null) {
sval = config.get(cfgopt.getCfgOption());
}
Range<Long> range = null;
if (sval != null) {
String pieces[] = Helper.getTrimmedStrings(sval);
if (pieces.length == 2) {
String min = pieces[0];
String max = pieces[1];
long minVal = Long.parseLong(min);
long maxVal = Long.parseLong(max);
if (minVal > maxVal) {
long tmp = minVal;
minVal = maxVal;
maxVal = tmp;
}
range = new Range<Long>(minVal, maxVal);
}
}
if (range == null) {
Long def = cfgopt.getDefault();
if (def != null) {
range = new Range<Long>(def, def);
}
}
return range;
}
/**
* @param cfgopt
* the configuration option to use for config and default lookup
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the parsed integer byte range from primary, config, default
*/
private Range<Long> getMinMaxBytes(ConfigOption<Long> cfgopt, String primary) {
String sval = primary;
if (sval == null) {
sval = config.get(cfgopt.getCfgOption());
}
Range<Long> range = null;
if (sval != null) {
String pieces[] = Helper.getTrimmedStrings(sval);
if (pieces.length == 2) {
String min = pieces[0];
String max = pieces[1];
long tMin = StringUtils.TraditionalBinaryPrefix.string2long(min);
long tMax = StringUtils.TraditionalBinaryPrefix.string2long(max);
if (tMin > tMax) {
long tmp = tMin;
tMin = tMax;
tMax = tmp;
}
range = new Range<Long>(tMin, tMax);
}
}
if (range == null) {
Long def = cfgopt.getDefault();
if (def != null) {
range = new Range<Long>(def, def);
}
}
return range;
}
/**
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the replication range
*/
Range<Short> getReplication(String primary) {
return getMinMaxShort(ConfigOption.REPLICATION_AM, primary);
}
/**
* @return the replication range using config and default for lookup
*/
Range<Short> getReplication() {
return getReplication(null);
}
/**
* @return the map of operations to perform using config (percent may be null
* if unspecified)
*/
Map<OperationType, OperationData> getOperations() {
Map<OperationType, OperationData> operations = new HashMap<OperationType, OperationData>();
for (OperationType type : OperationType.values()) {
String opname = type.lowerName();
String keyname = String.format(Constants.OP, opname);
String kval = config.get(keyname);
if (kval == null) {
continue;
}
operations.put(type, new OperationData(kval));
}
return operations;
}
/**
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the append byte size range (or null if none)
*/
Range<Long> getAppendSize(String primary) {
return getMinMaxBytes(ConfigOption.APPEND_SIZE, primary);
}
/**
* @return the append byte size range (or null if none) using config and
* default for lookup
*/
Range<Long> getAppendSize() {
return getAppendSize(null);
}
/**
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the sleep range (or null if none)
*/
Range<Long> getSleepRange(String primary) {
return getMinMaxLong(ConfigOption.SLEEP_TIME, primary);
}
/**
* @return the sleep range (or null if none) using config and default for
* lookup
*/
Range<Long> getSleepRange() {
return getSleepRange(null);
}
/**
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the write byte size range (or null if none)
*/
Range<Long> getWriteSize(String primary) {
return getMinMaxBytes(ConfigOption.WRITE_SIZE, primary);
}
/**
* @return the write byte size range (or null if none) using config and
* default for lookup
*/
Range<Long> getWriteSize() {
return getWriteSize(null);
}
/**
* Returns whether the write range should use the block size range
*
* @return true|false
*/
boolean shouldWriteUseBlockSize() {
Range<Long> writeRange = getWriteSize();
if (writeRange == null
|| (writeRange.getLower() == writeRange.getUpper() && (writeRange
.getUpper() == Long.MAX_VALUE))) {
return true;
}
return false;
}
/**
* Returns whether the append range should use the block size range
*
* @return true|false
*/
boolean shouldAppendUseBlockSize() {
Range<Long> appendRange = getAppendSize();
if (appendRange == null
|| (appendRange.getLower() == appendRange.getUpper() && (appendRange
.getUpper() == Long.MAX_VALUE))) {
return true;
}
return false;
}
/**
* Returns whether the read range should use the entire file
*
* @return true|false
*/
boolean shouldReadFullFile() {
Range<Long> readRange = getReadSize();
if (readRange == null
|| (readRange.getLower() == readRange.getUpper() && (readRange
.getUpper() == Long.MAX_VALUE))) {
return true;
}
return false;
}
/**
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the read byte size range (or null if none)
*/
Range<Long> getReadSize(String primary) {
return getMinMaxBytes(ConfigOption.READ_SIZE, primary);
}
/**
* Gets the bytes per checksum (if it exists or null if not)
*
* @return Long
*/
Long getByteCheckSum() {
String val = config.get(Constants.BYTES_PER_CHECKSUM);
if(val == null) {
return null;
}
return Long.parseLong(val);
}
/**
* @return the read byte size range (or null if none) using config and default
* for lookup
*/
Range<Long> getReadSize() {
return getReadSize(null);
}
/**
* Dumps out the given options for the given config extractor
*
* @param cfg
* the config to write to the log
*/
static void dumpOptions(ConfigExtractor cfg) {
if (cfg == null) {
return;
}
LOG.info("Base directory = " + cfg.getBaseDirectory());
LOG.info("Data directory = " + cfg.getDataPath());
LOG.info("Output directory = " + cfg.getOutputPath());
LOG.info("Result file = " + cfg.getResultFile());
LOG.info("Grid queue = " + cfg.getQueueName());
LOG.info("Should exit on first error = " + cfg.shouldExitOnFirstError());
{
String duration = "Duration = ";
if (cfg.getDurationMilliseconds() == Integer.MAX_VALUE) {
duration += "unlimited";
} else {
duration += cfg.getDurationMilliseconds() + " milliseconds";
}
LOG.info(duration);
}
LOG.info("Map amount = " + cfg.getMapAmount());
LOG.info("Reducer amount = " + cfg.getReducerAmount());
LOG.info("Operation amount = " + cfg.getOpCount());
LOG.info("Total file limit = " + cfg.getTotalFiles());
LOG.info("Total dir file limit = " + cfg.getDirSize());
{
String read = "Read size = ";
if (cfg.shouldReadFullFile()) {
read += "entire file";
} else {
read += cfg.getReadSize() + " bytes";
}
LOG.info(read);
}
{
String write = "Write size = ";
if (cfg.shouldWriteUseBlockSize()) {
write += "blocksize";
} else {
write += cfg.getWriteSize() + " bytes";
}
LOG.info(write);
}
{
String append = "Append size = ";
if (cfg.shouldAppendUseBlockSize()) {
append += "blocksize";
} else {
append += cfg.getAppendSize() + " bytes";
}
LOG.info(append);
}
{
String bsize = "Block size = ";
bsize += cfg.getBlockSize() + " bytes";
LOG.info(bsize);
}
if (cfg.getRandomSeed() != null) {
LOG.info("Random seed = " + cfg.getRandomSeed());
}
if (cfg.getSleepRange() != null) {
LOG.info("Sleep range = " + cfg.getSleepRange() + " milliseconds");
}
LOG.info("Replication amount = " + cfg.getReplication());
LOG.info("Operations are:");
NumberFormat percFormatter = Formatter.getPercentFormatter();
Map<OperationType, OperationData> operations = cfg.getOperations();
for (OperationType type : operations.keySet()) {
String name = type.name();
LOG.info(name);
OperationData opInfo = operations.get(type);
LOG.info(" " + opInfo.getDistribution().name());
if (opInfo.getPercent() != null) {
LOG.info(" " + percFormatter.format(opInfo.getPercent()));
} else {
LOG.info(" ???");
}
}
}
}