blob: f8884625cbdaa88f97e90dade3db27fd83b8fbee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.junit.Test;
/**
* Test {@link ExecutionSummarizer} and {@link ClusterSummarizer}.
*/
public class TestGridmixSummary {
/**
* Test {@link DataStatistics}.
*/
@Test
public void testDataStatistics() throws Exception {
// test data-statistics getters with compression enabled
DataStatistics stats = new DataStatistics(10, 2, true);
assertEquals("Data size mismatch", 10, stats.getDataSize());
assertEquals("Num files mismatch", 2, stats.getNumFiles());
assertTrue("Compression configuration mismatch", stats.isDataCompressed());
// test data-statistics getters with compression disabled
stats = new DataStatistics(100, 5, false);
assertEquals("Data size mismatch", 100, stats.getDataSize());
assertEquals("Num files mismatch", 5, stats.getNumFiles());
assertFalse("Compression configuration mismatch", stats.isDataCompressed());
// test publish data stats
Configuration conf = new Configuration();
Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp"));
Path testDir = new Path(rootTempDir, "testDataStatistics");
FileSystem fs = testDir.getFileSystem(conf);
fs.delete(testDir, true);
Path testInputDir = new Path(testDir, "test");
fs.mkdirs(testInputDir);
// test empty folder (compression = true)
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
Boolean failed = null;
try {
GenerateData.publishDataStatistics(testInputDir, 1024L, conf);
failed = false;
} catch (RuntimeException e) {
failed = true;
}
assertNotNull("Expected failure!", failed);
assertTrue("Compression data publishing error", failed);
// test with empty folder (compression = off)
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
stats = GenerateData.publishDataStatistics(testInputDir, 1024L, conf);
assertEquals("Data size mismatch", 0, stats.getDataSize());
assertEquals("Num files mismatch", 0, stats.getNumFiles());
assertFalse("Compression configuration mismatch", stats.isDataCompressed());
// test with some plain input data (compression = off)
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
Path inputDataFile = new Path(testInputDir, "test");
long size =
UtilsForTests.createTmpFileDFS(fs, inputDataFile,
FsPermission.createImmutable((short)777), "hi hello bye").size();
stats = GenerateData.publishDataStatistics(testInputDir, -1, conf);
assertEquals("Data size mismatch", size, stats.getDataSize());
assertEquals("Num files mismatch", 1, stats.getNumFiles());
assertFalse("Compression configuration mismatch", stats.isDataCompressed());
// test with some plain input data (compression = on)
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
failed = null;
try {
GenerateData.publishDataStatistics(testInputDir, 1234L, conf);
failed = false;
} catch (RuntimeException e) {
failed = true;
}
assertNotNull("Expected failure!", failed);
assertTrue("Compression data publishing error", failed);
// test with some compressed input data (compression = off)
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
fs.delete(inputDataFile, false);
inputDataFile = new Path(testInputDir, "test.gz");
size =
UtilsForTests.createTmpFileDFS(fs, inputDataFile,
FsPermission.createImmutable((short)777), "hi hello").size();
stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf);
assertEquals("Data size mismatch", size, stats.getDataSize());
assertEquals("Num files mismatch", 1, stats.getNumFiles());
assertFalse("Compression configuration mismatch", stats.isDataCompressed());
// test with some compressed input data (compression = on)
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf);
assertEquals("Data size mismatch", size, stats.getDataSize());
assertEquals("Num files mismatch", 1, stats.getNumFiles());
assertTrue("Compression configuration mismatch", stats.isDataCompressed());
}
/**
* A fake {@link JobFactory}.
*/
@SuppressWarnings("rawtypes")
private static class FakeJobFactory extends JobFactory {
/**
* A fake {@link JobStoryProducer} for {@link FakeJobFactory}.
*/
private static class FakeJobStoryProducer implements JobStoryProducer {
@Override
public void close() throws IOException {
}
@Override
public JobStory getNextJob() throws IOException {
return null;
}
}
FakeJobFactory(Configuration conf) {
super(null, new FakeJobStoryProducer(), null, conf, null, null);
}
@Override
public void update(Object item) {
}
@Override
protected Thread createReaderThread() {
return new Thread();
}
}
/**
* Test {@link ExecutionSummarizer}.
*/
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testExecutionSummarizer() throws IOException {
Configuration conf = new Configuration();
ExecutionSummarizer es = new ExecutionSummarizer();
assertEquals("ExecutionSummarizer init failed",
Summarizer.NA, es.getCommandLineArgsString());
long startTime = System.currentTimeMillis();
// test configuration parameters
String[] initArgs = new String[] {"-Xmx20m", "-Dtest.args='test'"};
es = new ExecutionSummarizer(initArgs);
assertEquals("ExecutionSummarizer init failed",
"-Xmx20m -Dtest.args='test'",
es.getCommandLineArgsString());
// test start time
assertTrue("Start time mismatch", es.getStartTime() >= startTime);
assertTrue("Start time mismatch",
es.getStartTime() <= System.currentTimeMillis());
// test start() of ExecutionSummarizer
es.update(null);
assertEquals("ExecutionSummarizer init failed", 0,
es.getSimulationStartTime());
testExecutionSummarizer(0, 0, 0, 0, 0, 0, 0, es);
long simStartTime = System.currentTimeMillis();
es.start(null);
assertTrue("Simulation start time mismatch",
es.getSimulationStartTime() >= simStartTime);
assertTrue("Simulation start time mismatch",
es.getSimulationStartTime() <= System.currentTimeMillis());
// test with job stats
JobStats stats = generateFakeJobStats(1, 10, true, false);
es.update(stats);
testExecutionSummarizer(1, 10, 0, 1, 1, 0, 0, es);
// test with failed job
stats = generateFakeJobStats(5, 1, false, false);
es.update(stats);
testExecutionSummarizer(6, 11, 0, 2, 1, 1, 0, es);
// test with successful but lost job
stats = generateFakeJobStats(1, 1, true, true);
es.update(stats);
testExecutionSummarizer(7, 12, 0, 3, 1, 1, 1, es);
// test with failed but lost job
stats = generateFakeJobStats(2, 2, false, true);
es.update(stats);
testExecutionSummarizer(9, 14, 0, 4, 1, 1, 2, es);
// test finalize
// define a fake job factory
JobFactory factory = new FakeJobFactory(conf);
// fake the num jobs in trace
factory.numJobsInTrace = 3;
Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp"));
Path testDir = new Path(rootTempDir, "testGridmixSummary");
Path testTraceFile = new Path(testDir, "test-trace.json");
FileSystem fs = FileSystem.getLocal(conf);
fs.create(testTraceFile).close();
// finalize the summarizer
UserResolver resolver = new RoundRobinUserResolver();
DataStatistics dataStats = new DataStatistics(100, 2, true);
String policy = GridmixJobSubmissionPolicy.REPLAY.name();
conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
es.finalize(factory, testTraceFile.toString(), 1024L, resolver, dataStats,
conf);
// test num jobs in trace
assertEquals("Mismtach in num jobs in trace", 3, es.getNumJobsInTrace());
// test trace signature
String tid =
ExecutionSummarizer.getTraceSignature(testTraceFile.toString());
assertEquals("Mismatch in trace signature",
tid, es.getInputTraceSignature());
// test trace location
Path qPath = fs.makeQualified(testTraceFile);
assertEquals("Mismatch in trace filename",
qPath.toString(), es.getInputTraceLocation());
// test expected data size
assertEquals("Mismatch in expected data size",
"1 K", es.getExpectedDataSize());
// test input data statistics
assertEquals("Mismatch in input data statistics",
ExecutionSummarizer.stringifyDataStatistics(dataStats),
es.getInputDataStatistics());
// test user resolver
assertEquals("Mismatch in user resolver",
resolver.getClass().getName(), es.getUserResolver());
// test policy
assertEquals("Mismatch in policy", policy, es.getJobSubmissionPolicy());
// test data stringification using large data
es.finalize(factory, testTraceFile.toString(), 1024*1024*1024*10L, resolver,
dataStats, conf);
assertEquals("Mismatch in expected data size",
"10 G", es.getExpectedDataSize());
// test trace signature uniqueness
// touch the trace file
fs.delete(testTraceFile, false);
// sleep for 1 sec
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
fs.create(testTraceFile).close();
es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats,
conf);
// test missing expected data size
assertEquals("Mismatch in trace data size",
Summarizer.NA, es.getExpectedDataSize());
assertFalse("Mismatch in trace signature",
tid.equals(es.getInputTraceSignature()));
// get the new identifier
tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString());
assertEquals("Mismatch in trace signature",
tid, es.getInputTraceSignature());
testTraceFile = new Path(testDir, "test-trace2.json");
fs.create(testTraceFile).close();
es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats,
conf);
assertFalse("Mismatch in trace signature",
tid.equals(es.getInputTraceSignature()));
// get the new identifier
tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString());
assertEquals("Mismatch in trace signature",
tid, es.getInputTraceSignature());
// finalize trace identifier '-' input
es.finalize(factory, "-", 0L, resolver, dataStats, conf);
assertEquals("Mismatch in trace signature",
Summarizer.NA, es.getInputTraceSignature());
assertEquals("Mismatch in trace file location",
Summarizer.NA, es.getInputTraceLocation());
}
// test the ExecutionSummarizer
private static void testExecutionSummarizer(int numMaps, int numReds,
int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob,
int numFailedJobs, int numLostJobs, ExecutionSummarizer es) {
assertEquals("ExecutionSummarizer test failed [num-maps]",
numMaps, es.getNumMapTasksLaunched());
assertEquals("ExecutionSummarizer test failed [num-reducers]",
numReds, es.getNumReduceTasksLaunched());
assertEquals("ExecutionSummarizer test failed [num-jobs-in-trace]",
totalJobsInTrace, es.getNumJobsInTrace());
assertEquals("ExecutionSummarizer test failed [num-submitted jobs]",
totalJobSubmitted, es.getNumSubmittedJobs());
assertEquals("ExecutionSummarizer test failed [num-successful-jobs]",
numSuccessfulJob, es.getNumSuccessfulJobs());
assertEquals("ExecutionSummarizer test failed [num-failed jobs]",
numFailedJobs, es.getNumFailedJobs());
assertEquals("ExecutionSummarizer test failed [num-lost jobs]",
numLostJobs, es.getNumLostJobs());
}
// generate fake job stats
@SuppressWarnings("deprecation")
private static JobStats generateFakeJobStats(final int numMaps,
final int numReds, final boolean isSuccessful, final boolean lost)
throws IOException {
// A fake job
Job fakeJob = new Job() {
@Override
public int getNumReduceTasks() {
return numReds;
};
@Override
public boolean isSuccessful() throws IOException {
if (lost) {
throw new IOException("Test failure!");
}
return isSuccessful;
};
};
return new JobStats(numMaps, numReds, fakeJob);
}
/**
* Test {@link ClusterSummarizer}.
*/
@Test
public void testClusterSummarizer() throws IOException {
ClusterSummarizer cs = new ClusterSummarizer();
Configuration conf = new Configuration();
String jt = "test-jt:1234";
String nn = "test-nn:5678";
conf.set(JTConfig.JT_IPC_ADDRESS, jt);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn);
cs.start(conf);
assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo());
assertEquals("NN name mismatch", nn, cs.getNamenodeInfo());
ClusterStats cStats = ClusterStats.getClusterStats();
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local");
JobClient jc = new JobClient(conf);
cStats.setClusterMetric(jc.getClusterStatus());
cs.update(cStats);
// test
assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks());
assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks());
assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers());
assertEquals("Cluster summary test failed!", 0,
cs.getNumBlacklistedTrackers());
}
}