blob: b7be8d85e204fff0bb248dd679dee1c54d4e658a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.slive;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
import org.apache.hadoop.fs.slive.Constants.Distribution;
import org.apache.hadoop.fs.slive.Constants.OperationType;
import org.apache.hadoop.util.StringUtils;
/**
* Class which merges options given from a config file and the command line and
* performs some basic verification of the data retrieved and sets the verified
* values back into the configuration object for return
*/
class ConfigMerger {
/**
* Exception that represents config problems...
*/
static class ConfigException extends IOException {
private static final long serialVersionUID = 2047129184917444550L;
ConfigException(String msg) {
super(msg);
}
ConfigException(String msg, Throwable e) {
super(msg, e);
}
}
/**
* Merges the given command line parsed output with the given configuration
* object and returns the new configuration object with the correct options
* overwritten
*
* @param opts
* the parsed command line option output
* @param base
* the base configuration to merge with
* @return merged configuration object
* @throws ConfigException
* when configuration errors or verification occur
*/
Configuration getMerged(ParsedOutput opts, Configuration base)
throws ConfigException {
return handleOptions(opts, base);
}
/**
* Gets the base set of operations to use
*
* @return Map
*/
private Map<OperationType, OperationData> getBaseOperations() {
Map<OperationType, OperationData> base = new HashMap<OperationType, OperationData>();
// add in all the operations
// since they will all be applied unless changed
OperationType[] types = OperationType.values();
for (OperationType type : types) {
base.put(type, new OperationData(Distribution.UNIFORM, null));
}
return base;
}
/**
* Handles the specific task of merging operations from the command line or
* extractor object into the base configuration provided
*
* @param opts
* the parsed command line option output
* @param base
* the base configuration to merge with
* @param extractor
* the access object to fetch operations from if none from the
* command line
* @return merged configuration object
* @throws ConfigException
* when verification fails
*/
private Configuration handleOperations(ParsedOutput opts, Configuration base,
ConfigExtractor extractor) throws ConfigException {
// get the base set to start off with
Map<OperationType, OperationData> operations = getBaseOperations();
// merge with what is coming from config
Map<OperationType, OperationData> cfgOperations = extractor.getOperations();
for (OperationType opType : cfgOperations.keySet()) {
operations.put(opType, cfgOperations.get(opType));
}
// see if any coming in from the command line
for (OperationType opType : OperationType.values()) {
String opName = opType.lowerName();
String opVal = opts.getValue(opName);
if (opVal != null) {
operations.put(opType, new OperationData(opVal));
}
}
// remove those with <= zero percent
{
Map<OperationType, OperationData> cleanedOps = new HashMap<OperationType, OperationData>();
for (OperationType opType : operations.keySet()) {
OperationData data = operations.get(opType);
if (data.getPercent() == null || data.getPercent() > 0.0d) {
cleanedOps.put(opType, data);
}
}
operations = cleanedOps;
}
if (operations.isEmpty()) {
throw new ConfigException("No operations provided!");
}
// verify and adjust
double currPct = 0;
int needFill = 0;
for (OperationType type : operations.keySet()) {
OperationData op = operations.get(type);
if (op.getPercent() != null) {
currPct += op.getPercent();
} else {
needFill++;
}
}
if (currPct > 1) {
throw new ConfigException(
"Unable to have accumlative percent greater than 100%");
}
if (needFill > 0 && currPct < 1) {
double leftOver = 1.0 - currPct;
Map<OperationType, OperationData> mpcp = new HashMap<OperationType, OperationData>();
for (OperationType type : operations.keySet()) {
OperationData op = operations.get(type);
if (op.getPercent() == null) {
op = new OperationData(op.getDistribution(), (leftOver / needFill));
}
mpcp.put(type, op);
}
operations = mpcp;
} else if (needFill == 0 && currPct < 1) {
// redistribute
double leftOver = 1.0 - currPct;
Map<OperationType, OperationData> mpcp = new HashMap<OperationType, OperationData>();
double each = leftOver / operations.keySet().size();
for (OperationType t : operations.keySet()) {
OperationData op = operations.get(t);
op = new OperationData(op.getDistribution(), (op.getPercent() + each));
mpcp.put(t, op);
}
operations = mpcp;
} else if (needFill > 0 && currPct >= 1) {
throw new ConfigException(needFill
+ " unfilled operations but no percentage left to fill with");
}
// save into base
for (OperationType opType : operations.keySet()) {
String opName = opType.lowerName();
OperationData opData = operations.get(opType);
String distr = opData.getDistribution().lowerName();
String ratio = new Double(opData.getPercent() * 100.0d).toString();
base.set(String.format(Constants.OP, opName), opData.toString());
base.set(String.format(Constants.OP_DISTR, opName), distr);
base.set(String.format(Constants.OP_PERCENT, opName), ratio);
}
return base;
}
/**
* Handles merging all options and verifying from the given command line
* output and the given base configuration and returns the merged
* configuration
*
* @param opts
* the parsed command line option output
* @param base
* the base configuration to merge with
* @return the merged configuration
* @throws ConfigException
*/
private Configuration handleOptions(ParsedOutput opts, Configuration base)
throws ConfigException {
// ensure variables are overwritten and verified
ConfigExtractor extractor = new ConfigExtractor(base);
// overwrite the map amount and check to ensure > 0
{
Integer mapAmount = null;
try {
mapAmount = extractor.getMapAmount(opts.getValue(ConfigOption.MAPS
.getOpt()));
} catch (Exception e) {
throw new ConfigException("Error extracting & merging map amount", e);
}
if (mapAmount != null) {
if (mapAmount <= 0) {
throw new ConfigException(
"Map amount can not be less than or equal to zero");
}
base.set(ConfigOption.MAPS.getCfgOption(), mapAmount.toString());
}
}
// overwrite the reducer amount and check to ensure > 0
{
Integer reduceAmount = null;
try {
reduceAmount = extractor.getMapAmount(opts.getValue(ConfigOption.REDUCES
.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging reducer amount", e);
}
if (reduceAmount != null) {
if (reduceAmount <= 0) {
throw new ConfigException(
"Reducer amount can not be less than or equal to zero");
}
base.set(ConfigOption.REDUCES.getCfgOption(), reduceAmount.toString());
}
}
// overwrite the duration amount and ensure > 0
{
Integer duration = null;
try {
duration = extractor.getDuration(opts.getValue(ConfigOption.DURATION
.getOpt()));
} catch (Exception e) {
throw new ConfigException("Error extracting & merging duration", e);
}
if (duration != null) {
if (duration <= 0) {
throw new ConfigException(
"Duration can not be less than or equal to zero");
}
base.set(ConfigOption.DURATION.getCfgOption(), duration.toString());
}
}
// overwrite the operation amount and ensure > 0
{
Integer operationAmount = null;
try {
operationAmount = extractor.getOpCount(opts.getValue(ConfigOption.OPS
.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging operation amount", e);
}
if (operationAmount != null) {
if (operationAmount <= 0) {
throw new ConfigException(
"Operation amount can not be less than or equal to zero");
}
base.set(ConfigOption.OPS.getCfgOption(), operationAmount.toString());
}
}
// overwrite the exit on error setting
{
try {
boolean exitOnError = extractor.shouldExitOnFirstError(opts
.getValue(ConfigOption.EXIT_ON_ERROR.getOpt()));
base.setBoolean(ConfigOption.EXIT_ON_ERROR.getCfgOption(), exitOnError);
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging exit on error value", e);
}
}
// verify and set file limit and ensure > 0
{
Integer fileAm = null;
try {
fileAm = extractor.getTotalFiles(opts.getValue(ConfigOption.FILES
.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging total file limit amount", e);
}
if (fileAm != null) {
if (fileAm <= 0) {
throw new ConfigException(
"File amount can not be less than or equal to zero");
}
base.set(ConfigOption.FILES.getCfgOption(), fileAm.toString());
}
}
// set the grid queue to run on
{
try {
String qname = extractor.getQueueName(opts
.getValue(ConfigOption.QUEUE_NAME.getOpt()));
if (qname != null) {
base.set(ConfigOption.QUEUE_NAME.getCfgOption(), qname);
}
} catch (Exception e) {
throw new ConfigException("Error extracting & merging queue name", e);
}
}
// verify and set the directory limit and ensure > 0
{
Integer directoryLimit = null;
try {
directoryLimit = extractor.getDirSize(opts
.getValue(ConfigOption.DIR_SIZE.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging directory file limit", e);
}
if (directoryLimit != null) {
if (directoryLimit <= 0) {
throw new ConfigException(
"Directory file limit can not be less than or equal to zero");
}
base.set(ConfigOption.DIR_SIZE.getCfgOption(), directoryLimit
.toString());
}
}
// set the base directory
{
Path basedir = null;
try {
basedir = extractor.getBaseDirectory(opts
.getValue(ConfigOption.BASE_DIR.getOpt()));
} catch (Exception e) {
throw new ConfigException("Error extracting & merging base directory",
e);
}
if (basedir != null) {
// always ensure in slive dir
basedir = new Path(basedir, Constants.BASE_DIR);
base.set(ConfigOption.BASE_DIR.getCfgOption(), basedir.toString());
}
}
// set the result file
{
String fn = null;
try {
fn = extractor.getResultFile(opts.getValue(ConfigOption.RESULT_FILE
.getOpt()));
} catch (Exception e) {
throw new ConfigException("Error extracting & merging result file", e);
}
if (fn != null) {
base.set(ConfigOption.RESULT_FILE.getCfgOption(), fn);
}
}
{
String fn = null;
try {
fn = extractor.getResultFile(opts.getValue(ConfigOption.RESULT_FILE
.getOpt()));
} catch (Exception e) {
throw new ConfigException("Error extracting & merging result file", e);
}
if (fn != null) {
base.set(ConfigOption.RESULT_FILE.getCfgOption(), fn);
}
}
// set the operations
{
try {
base = handleOperations(opts, base, extractor);
} catch (Exception e) {
throw new ConfigException("Error extracting & merging operations", e);
}
}
// set the replication amount range
{
Range<Short> replicationAm = null;
try {
replicationAm = extractor.getReplication(opts
.getValue(ConfigOption.REPLICATION_AM.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging replication amount range", e);
}
if (replicationAm != null) {
int minRepl = base.getInt(Constants.MIN_REPLICATION, 1);
if (replicationAm.getLower() < minRepl) {
throw new ConfigException(
"Replication amount minimum is less than property configured minimum "
+ minRepl);
}
if (replicationAm.getLower() > replicationAm.getUpper()) {
throw new ConfigException(
"Replication amount minimum is greater than its maximum");
}
if (replicationAm.getLower() <= 0) {
throw new ConfigException(
"Replication amount minimum must be greater than zero");
}
base.set(ConfigOption.REPLICATION_AM.getCfgOption(), replicationAm
.toString());
}
}
// set the sleep range
{
Range<Long> sleepRange = null;
try {
sleepRange = extractor.getSleepRange(opts
.getValue(ConfigOption.SLEEP_TIME.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging sleep size range", e);
}
if (sleepRange != null) {
if (sleepRange.getLower() > sleepRange.getUpper()) {
throw new ConfigException(
"Sleep range minimum is greater than its maximum");
}
if (sleepRange.getLower() <= 0) {
throw new ConfigException(
"Sleep range minimum must be greater than zero");
}
base.set(ConfigOption.SLEEP_TIME.getCfgOption(), sleepRange.toString());
}
}
// set the packet size if given
{
String pSize = opts.getValue(ConfigOption.PACKET_SIZE.getOpt());
if (pSize == null) {
pSize = ConfigOption.PACKET_SIZE.getDefault();
}
if (pSize != null) {
try {
Long packetSize = StringUtils.TraditionalBinaryPrefix
.string2long(pSize);
base.set(ConfigOption.PACKET_SIZE.getCfgOption(), packetSize
.toString());
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging write packet size", e);
}
}
}
// set the block size range
{
Range<Long> blockSize = null;
try {
blockSize = extractor.getBlockSize(opts
.getValue(ConfigOption.BLOCK_SIZE.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging block size range", e);
}
if (blockSize != null) {
if (blockSize.getLower() > blockSize.getUpper()) {
throw new ConfigException(
"Block size minimum is greater than its maximum");
}
if (blockSize.getLower() <= 0) {
throw new ConfigException(
"Block size minimum must be greater than zero");
}
// ensure block size is a multiple of BYTES_PER_CHECKSUM
// if a value is set in the configuration
Long bytesPerChecksum = extractor.getByteCheckSum();
if (bytesPerChecksum != null) {
if ((blockSize.getLower() % bytesPerChecksum) != 0) {
throw new ConfigException(
"Blocksize lower bound must be a multiple of "
+ bytesPerChecksum);
}
if ((blockSize.getUpper() % bytesPerChecksum) != 0) {
throw new ConfigException(
"Blocksize upper bound must be a multiple of "
+ bytesPerChecksum);
}
}
base.set(ConfigOption.BLOCK_SIZE.getCfgOption(), blockSize.toString());
}
}
// set the read size range
{
Range<Long> readSize = null;
try {
readSize = extractor.getReadSize(opts.getValue(ConfigOption.READ_SIZE
.getOpt()));
} catch (Exception e) {
throw new ConfigException("Error extracting & merging read size range",
e);
}
if (readSize != null) {
if (readSize.getLower() > readSize.getUpper()) {
throw new ConfigException(
"Read size minimum is greater than its maximum");
}
if (readSize.getLower() < 0) {
throw new ConfigException(
"Read size minimum must be greater than or equal to zero");
}
base.set(ConfigOption.READ_SIZE.getCfgOption(), readSize.toString());
}
}
// set the write size range
{
Range<Long> writeSize = null;
try {
writeSize = extractor.getWriteSize(opts
.getValue(ConfigOption.WRITE_SIZE.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging write size range", e);
}
if (writeSize != null) {
if (writeSize.getLower() > writeSize.getUpper()) {
throw new ConfigException(
"Write size minimum is greater than its maximum");
}
if (writeSize.getLower() < 0) {
throw new ConfigException(
"Write size minimum must be greater than or equal to zero");
}
base.set(ConfigOption.WRITE_SIZE.getCfgOption(), writeSize.toString());
}
}
// set the append size range
{
Range<Long> appendSize = null;
try {
appendSize = extractor.getAppendSize(opts
.getValue(ConfigOption.APPEND_SIZE.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging append size range", e);
}
if (appendSize != null) {
if (appendSize.getLower() > appendSize.getUpper()) {
throw new ConfigException(
"Append size minimum is greater than its maximum");
}
if (appendSize.getLower() < 0) {
throw new ConfigException(
"Append size minimum must be greater than or equal to zero");
}
base
.set(ConfigOption.APPEND_SIZE.getCfgOption(), appendSize.toString());
}
}
// set the seed
{
Long seed = null;
try {
seed = extractor.getRandomSeed(opts.getValue(ConfigOption.RANDOM_SEED
.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging random number seed", e);
}
if (seed != null) {
base.set(ConfigOption.RANDOM_SEED.getCfgOption(), seed.toString());
}
}
return base;
}
}