blob: 7fa0444840777c79a21693e11b664596f7bce652 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TimelineServicePerformance extends Configured implements Tool {
static final int NUM_MAPS_DEFAULT = 1;
static final int SIMPLE_ENTITY_WRITER = 1;
static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
static int mapperType = SIMPLE_ENTITY_WRITER;
static final int TIMELINE_SERVICE_VERSION_1 = 1;
static final int TIMELINE_SERVICE_VERSION_2 = 2;
static int timeline_service_version = TIMELINE_SERVICE_VERSION_1;
protected static int printUsage() {
System.err.println(
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
")\n" +
" [-v] timeline service version (default: " +
TIMELINE_SERVICE_VERSION_1 + ")\n" +
" 1. version 1.x\n" +
" 2. version 2.x\n" +
" [-mtype <mapper type in integer>] (default: " +
SIMPLE_ENTITY_WRITER + ")\n" +
" 1. simple entity write mapper\n" +
" 2. jobhistory files replay mapper\n" +
" [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
SimpleEntityWriterConstants.KBS_SENT_DEFAULT + " KB)\n" +
" [-t] package sending iterations per mapper (mtype=1, default: " +
SimpleEntityWriterConstants.TEST_TIMES_DEFAULT + ")\n" +
" [-d <path>] hdfs root path of job history files (mtype=2)\n" +
" [-r <replay mode>] (mtype=2)\n" +
" 1. write all entities for a job in one put (default)\n" +
" 2. write one entity at a time\n");
GenericOptionsParser.printGenericCommandUsage(System.err);
return -1;
}
/**
* Configure a job given argv.
*/
public static boolean parseArgs(String[] args, Job job) throws IOException {
// set the common defaults
Configuration conf = job.getConfiguration();
conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
for (int i = 0; i < args.length; i++) {
if (args.length == i + 1) {
System.out.println("ERROR: Required parameter missing from " + args[i]);
return printUsage() == 0;
}
try {
if ("-v".equals(args[i])) {
timeline_service_version = Integer.parseInt(args[++i]);
} else if ("-m".equals(args[i])) {
if (Integer.parseInt(args[++i]) > 0) {
job.getConfiguration()
.setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
}
} else if ("-mtype".equals(args[i])) {
mapperType = Integer.parseInt(args[++i]);
} else if ("-s".equals(args[i])) {
if (Integer.parseInt(args[++i]) > 0) {
conf.setInt(SimpleEntityWriterConstants.KBS_SENT,
Integer.parseInt(args[i]));
}
} else if ("-t".equals(args[i])) {
if (Integer.parseInt(args[++i]) > 0) {
conf.setInt(SimpleEntityWriterConstants.TEST_TIMES,
Integer.parseInt(args[i]));
}
} else if ("-d".equals(args[i])) {
conf.set(JobHistoryFileReplayHelper.PROCESSING_PATH, args[++i]);
} else if ("-r".equals(args[i])) {
conf.setInt(JobHistoryFileReplayHelper.REPLAY_MODE,
Integer.parseInt(args[++i]));
} else {
System.out.println("Unexpected argument: " + args[i]);
return printUsage() == 0;
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage() == 0;
} catch (Exception e) {
throw (IOException)new IOException().initCause(e);
}
}
// handle mapper-specific settings
switch (mapperType) {
case JOB_HISTORY_FILE_REPLAY_MAPPER:
String processingPath =
conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
if (processingPath == null || processingPath.isEmpty()) {
System.out.println("processing path is missing while mtype = 2");
return printUsage() == 0;
}
switch (timeline_service_version) {
case TIMELINE_SERVICE_VERSION_2:
job.setMapperClass(JobHistoryFileReplayMapperV2.class);
break;
case TIMELINE_SERVICE_VERSION_1:
default:
job.setMapperClass(JobHistoryFileReplayMapperV1.class);
break;
}
break;
case SIMPLE_ENTITY_WRITER:
default:
// use the current timestamp as the "run id" of the test: this will
// be used as simulating the cluster timestamp for apps
conf.setLong(
SimpleEntityWriterConstants.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
System.currentTimeMillis());
switch (timeline_service_version) {
case TIMELINE_SERVICE_VERSION_2:
job.setMapperClass(SimpleEntityWriterV2.class);
break;
case TIMELINE_SERVICE_VERSION_1:
default:
job.setMapperClass(SimpleEntityWriterV1.class);
break;
}
break;
}
return true;
}
/**
* TimelineServer Performance counters
*/
static enum PerfCounters {
TIMELINE_SERVICE_WRITE_TIME,
TIMELINE_SERVICE_WRITE_COUNTER,
TIMELINE_SERVICE_WRITE_FAILURES,
TIMELINE_SERVICE_WRITE_KBS,
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(TimelineServicePerformance.class);
job.setMapperClass(SimpleEntityWriterV1.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
if (!parseArgs(args, job)) {
return -1;
}
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
if (job.isSuccessful()) {
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
long writecounts =
counters.findCounter(
PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
long writefailures =
counters.findCounter(
PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).getValue();
if (writefailures > 0 && writefailures == writecounts) {
// see if we have a complete failure to write
System.out.println("Job failed: all writes failed!");
} else {
long writetime =
counters.findCounter(
PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
long writesize =
counters.findCounter(
PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
if (writetime == 0L) {
// see if write time is zero (normally shouldn't happen)
System.out.println("Job failed: write time is 0!");
} else {
double transacrate = writecounts * 1000 / (double)writetime;
double iorate = writesize * 1000 / (double)writetime;
int numMaps =
Integer.parseInt(
job.getConfiguration().get(MRJobConfig.NUM_MAPS));
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
" ops/s");
System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
System.out.println("TRANSACTION RATE (total): " +
transacrate*numMaps + " ops/s");
System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
}
}
} else {
System.out.println("Job failed: " + job.getStatus().getFailureInfo());
}
return ret;
}
public static void main(String[] args) throws Exception {
int res =
ToolRunner.run(new Configuration(), new TimelineServicePerformance(),
args);
System.exit(res);
}
}