blob: d8acc39cd3df6d7443d6a33906b574dc5d211a38 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.slive;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.slive.Constants.Distribution;
import org.apache.hadoop.fs.slive.Constants.OperationType;
import org.apache.hadoop.fs.slive.Weights.UniformWeight;
import org.apache.hadoop.fs.slive.ObserveableOp.Observer;
/**
* This class is the main handler that selects operations to run using the
* currently held selection object. It configures and weights each operation and
* then hands the operations + weights off to the selector object to determine
* which one should be ran. If no operations are left to be ran then it will
* return null.
*/
class WeightSelector {
// what a weight calculation means
interface Weightable {
Double weight(int elapsed, int duration);
}
private static final Log LOG = LogFactory.getLog(WeightSelector.class);
private static class OperationInfo {
Integer amountLeft;
Operation operation;
Distribution distribution;
}
private Map<OperationType, OperationInfo> operations;
private Map<Distribution, Weightable> weights;
private RouletteSelector selector;
private OperationFactory factory;
WeightSelector(ConfigExtractor cfg, Random rnd) {
selector = new RouletteSelector(rnd);
factory = new OperationFactory(cfg, rnd);
configureOperations(cfg);
configureWeights(cfg);
}
protected RouletteSelector getSelector() {
return selector;
}
private void configureWeights(ConfigExtractor e) {
weights = new HashMap<Distribution, Weightable>();
weights.put(Distribution.UNIFORM, new UniformWeight());
// weights.put(Distribution.BEG, new BeginWeight());
// weights.put(Distribution.END, new EndWeight());
// weights.put(Distribution.MID, new MidWeight());
}
/**
* Determines how many initial operations a given operation data should have
*
* @param totalAm
* the total amount of operations allowed
*
* @param opData
* the given operation information (with a valid percentage >= 0)
*
* @return the number of items to allow to run
*
* @throws IllegalArgumentException
* if negative operations are determined
*/
static int determineHowMany(int totalAm, OperationData opData,
OperationType type) {
if (totalAm <= 0) {
return 0;
}
int amLeft = (int) Math.floor(opData.getPercent() * totalAm);
if (amLeft < 0) {
throw new IllegalArgumentException("Invalid amount " + amLeft
+ " determined for operation type " + type.name());
}
return amLeft;
}
/**
* Sets up the operation using the given configuration by setting up the
* number of operations to perform (and how many are left) and setting up the
* operation objects to be used throughout selection.
*
* @param cfg
* ConfigExtractor.
*/
private void configureOperations(ConfigExtractor cfg) {
operations = new TreeMap<OperationType, OperationInfo>();
Map<OperationType, OperationData> opinfo = cfg.getOperations();
int totalAm = cfg.getOpCount();
int opsLeft = totalAm;
NumberFormat formatter = Formatter.getPercentFormatter();
for (final OperationType type : opinfo.keySet()) {
OperationData opData = opinfo.get(type);
OperationInfo info = new OperationInfo();
info.distribution = opData.getDistribution();
int amLeft = determineHowMany(totalAm, opData, type);
opsLeft -= amLeft;
LOG
.info(type.name() + " has " + amLeft + " initial operations out of "
+ totalAm + " for its ratio "
+ formatter.format(opData.getPercent()));
info.amountLeft = amLeft;
Operation op = factory.getOperation(type);
// wrap operation in finalizer so that amount left gets decrements when
// its done
if (op != null) {
Observer fn = new Observer() {
public void notifyFinished(Operation op) {
OperationInfo opInfo = operations.get(type);
if (opInfo != null) {
--opInfo.amountLeft;
}
}
public void notifyStarting(Operation op) {
}
};
info.operation = new ObserveableOp(op, fn);
operations.put(type, info);
}
}
if (opsLeft > 0) {
LOG
.info(opsLeft
+ " left over operations found (due to inability to support partial operations)");
}
}
/**
* Selects an operation from the known operation set or returns null if none
* are available by applying the weighting algorithms and then handing off the
* weight operations to the selection object.
*
* @param elapsed
* the currently elapsed time (milliseconds) of the running program
* @param duration
* the maximum amount of milliseconds of the running program
*
* @return operation or null if none left
*/
Operation select(int elapsed, int duration) {
List<OperationWeight> validOps = new ArrayList<OperationWeight>(operations
.size());
for (OperationType type : operations.keySet()) {
OperationInfo opinfo = operations.get(type);
if (opinfo == null || opinfo.amountLeft <= 0) {
continue;
}
Weightable weighter = weights.get(opinfo.distribution);
if (weighter != null) {
OperationWeight weightOp = new OperationWeight(opinfo.operation,
weighter.weight(elapsed, duration));
validOps.add(weightOp);
} else {
throw new RuntimeException("Unable to get weight for distribution "
+ opinfo.distribution);
}
}
if (validOps.isEmpty()) {
return null;
}
return getSelector().select(validOps);
}
}