blob: e02c5d9b07d7f00dc241dc2515af54da87ee4916 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.fs.slive;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.StringUtils;
* The slive class which sets up the mapper to be used which itself will receive
* a single dummy key and value and then in a loop run the various operations
* that have been selected and upon operation completion output the collected
* output from that operation (and repeat until finished).
public class SliveMapper extends MapReduceBase implements
Mapper<Object, Object, Text, Text> {
private static final Log LOG = LogFactory.getLog(SliveMapper.class);
private static final String OP_TYPE = SliveMapper.class.getSimpleName();
private FileSystem filesystem;
private ConfigExtractor config;
private int taskId;
* (non-Javadoc)
* @see
* org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred
* .JobConf)
@Override // MapReduceBase
public void configure(JobConf conf) {
try {
config = new ConfigExtractor(conf);
filesystem = config.getBaseDirectory().getFileSystem(conf);
} catch (Exception e) {
LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
throw new RuntimeException("Unable to setup slive configuration", e);
if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) {
this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID))
} else {
// So that branch-1/0.20 can run this same code as well
this.taskId = TaskAttemptID.forName(conf.get(""))
* Fetches the config this object uses
* @return ConfigExtractor
private ConfigExtractor getConfig() {
return config;
* Logs to the given reporter and logs to the internal logger at info level
* @param r
* the reporter to set status on
* @param msg
* the message to log
private void logAndSetStatus(Reporter r, String msg) {
* Runs the given operation and reports on its results
* @param op
* the operation to run
* @param reporter
* the status reporter to notify
* @param output
* the output to write to
* @throws IOException
private void runOperation(Operation op, Reporter reporter,
OutputCollector<Text, Text> output, long opNum) throws IOException {
if (op == null) {
logAndSetStatus(reporter, "Running operation #" + opNum + " (" + op + ")");
List<OperationOutput> opOut =;
logAndSetStatus(reporter, "Finished operation #" + opNum + " (" + op + ")");
if (opOut != null && !opOut.isEmpty()) {
for (OperationOutput outData : opOut) {
output.collect(outData.getKey(), outData.getOutputValue());
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object,
* java.lang.Object, org.apache.hadoop.mapred.OutputCollector,
* org.apache.hadoop.mapred.Reporter)
@Override // Mapper
public void map(Object key, Object value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
logAndSetStatus(reporter, "Running slive mapper for dummy key " + key
+ " and dummy value " + value);
//Add taskID to randomSeed to deterministically seed rnd.
Random rnd = config.getRandomSeed() != null ?
new Random(this.taskId + config.getRandomSeed()) : new Random();
WeightSelector selector = new WeightSelector(config, rnd);
long startTime =;
long opAm = 0;
long sleepOps = 0;
int duration = getConfig().getDurationMilliseconds();
Range<Long> sleepRange = getConfig().getSleepRange();
Operation sleeper = null;
if (sleepRange != null) {
sleeper = new SleepOp(getConfig(), rnd);
while (Timer.elapsed(startTime) < duration) {
try {
logAndSetStatus(reporter, "Attempting to select operation #"
+ (opAm + 1));
int currElapsed = (int) (Timer.elapsed(startTime));
Operation op =, duration);
if (op == null) {
// no ops left
} else {
// got a good op
runOperation(op, reporter, output, opAm);
// do a sleep??
if (sleeper != null) {
// these don't count against the number of operations
runOperation(sleeper, reporter, output, sleepOps);
} catch (Exception e) {
logAndSetStatus(reporter, "Failed at running due to "
+ StringUtils.stringifyException(e));
if (getConfig().shouldExitOnFirstError()) {
// write out any accumulated mapper stats
long timeTaken = Timer.elapsed(startTime);
OperationOutput opCount = new OperationOutput(OutputType.LONG, OP_TYPE,
ReportWriter.OP_COUNT, opAm);
output.collect(opCount.getKey(), opCount.getOutputValue());
OperationOutput overallTime = new OperationOutput(OutputType.LONG,
OP_TYPE, ReportWriter.OK_TIME_TAKEN, timeTaken);
output.collect(overallTime.getKey(), overallTime.getOutputValue());
logAndSetStatus(reporter, "Finished " + opAm + " operations in "
+ timeTaken + " milliseconds");