| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred.gridmix; |
| |
| import static org.junit.Assert.*; |
| |
| import java.io.IOException; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.UtilsForTests; |
| import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; |
| import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; |
| import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.tools.rumen.JobStory; |
| import org.apache.hadoop.tools.rumen.JobStoryProducer; |
| import org.junit.Test; |
| |
| /** |
| * Test {@link ExecutionSummarizer} and {@link ClusterSummarizer}. |
| */ |
| public class TestGridmixSummary { |
| |
| /** |
| * Test {@link DataStatistics}. |
| */ |
| @Test |
| public void testDataStatistics() throws Exception { |
| // test data-statistics getters with compression enabled |
| DataStatistics stats = new DataStatistics(10, 2, true); |
| assertEquals("Data size mismatch", 10, stats.getDataSize()); |
| assertEquals("Num files mismatch", 2, stats.getNumFiles()); |
| assertTrue("Compression configuration mismatch", stats.isDataCompressed()); |
| |
| // test data-statistics getters with compression disabled |
| stats = new DataStatistics(100, 5, false); |
| assertEquals("Data size mismatch", 100, stats.getDataSize()); |
| assertEquals("Num files mismatch", 5, stats.getNumFiles()); |
| assertFalse("Compression configuration mismatch", stats.isDataCompressed()); |
| |
| // test publish data stats |
| Configuration conf = new Configuration(); |
| Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp")); |
| Path testDir = new Path(rootTempDir, "testDataStatistics"); |
| FileSystem fs = testDir.getFileSystem(conf); |
| fs.delete(testDir, true); |
| Path testInputDir = new Path(testDir, "test"); |
| fs.mkdirs(testInputDir); |
| |
| // test empty folder (compression = true) |
| CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); |
| Boolean failed = null; |
| try { |
| GenerateData.publishDataStatistics(testInputDir, 1024L, conf); |
| failed = false; |
| } catch (RuntimeException e) { |
| failed = true; |
| } |
| assertNotNull("Expected failure!", failed); |
| assertTrue("Compression data publishing error", failed); |
| |
| // test with empty folder (compression = off) |
| CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false); |
| stats = GenerateData.publishDataStatistics(testInputDir, 1024L, conf); |
| assertEquals("Data size mismatch", 0, stats.getDataSize()); |
| assertEquals("Num files mismatch", 0, stats.getNumFiles()); |
| assertFalse("Compression configuration mismatch", stats.isDataCompressed()); |
| |
| // test with some plain input data (compression = off) |
| CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false); |
| Path inputDataFile = new Path(testInputDir, "test"); |
| long size = |
| UtilsForTests.createTmpFileDFS(fs, inputDataFile, |
| FsPermission.createImmutable((short)777), "hi hello bye").size(); |
| stats = GenerateData.publishDataStatistics(testInputDir, -1, conf); |
| assertEquals("Data size mismatch", size, stats.getDataSize()); |
| assertEquals("Num files mismatch", 1, stats.getNumFiles()); |
| assertFalse("Compression configuration mismatch", stats.isDataCompressed()); |
| |
| // test with some plain input data (compression = on) |
| CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); |
| failed = null; |
| try { |
| GenerateData.publishDataStatistics(testInputDir, 1234L, conf); |
| failed = false; |
| } catch (RuntimeException e) { |
| failed = true; |
| } |
| assertNotNull("Expected failure!", failed); |
| assertTrue("Compression data publishing error", failed); |
| |
| // test with some compressed input data (compression = off) |
| CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false); |
| fs.delete(inputDataFile, false); |
| inputDataFile = new Path(testInputDir, "test.gz"); |
| size = |
| UtilsForTests.createTmpFileDFS(fs, inputDataFile, |
| FsPermission.createImmutable((short)777), "hi hello").size(); |
| stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf); |
| assertEquals("Data size mismatch", size, stats.getDataSize()); |
| assertEquals("Num files mismatch", 1, stats.getNumFiles()); |
| assertFalse("Compression configuration mismatch", stats.isDataCompressed()); |
| |
| // test with some compressed input data (compression = on) |
| CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); |
| stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf); |
| assertEquals("Data size mismatch", size, stats.getDataSize()); |
| assertEquals("Num files mismatch", 1, stats.getNumFiles()); |
| assertTrue("Compression configuration mismatch", stats.isDataCompressed()); |
| } |
| |
| /** |
| * A fake {@link JobFactory}. |
| */ |
| @SuppressWarnings("rawtypes") |
| private static class FakeJobFactory extends JobFactory { |
| /** |
| * A fake {@link JobStoryProducer} for {@link FakeJobFactory}. |
| */ |
| private static class FakeJobStoryProducer implements JobStoryProducer { |
| @Override |
| public void close() throws IOException { |
| } |
| |
| @Override |
| public JobStory getNextJob() throws IOException { |
| return null; |
| } |
| } |
| |
| FakeJobFactory(Configuration conf) { |
| super(null, new FakeJobStoryProducer(), null, conf, null, null); |
| } |
| |
| @Override |
| public void update(Object item) { |
| } |
| |
| @Override |
| protected Thread createReaderThread() { |
| return new Thread(); |
| } |
| } |
| |
| /** |
| * Test {@link ExecutionSummarizer}. |
| */ |
| @Test |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public void testExecutionSummarizer() throws IOException { |
| Configuration conf = new Configuration(); |
| |
| ExecutionSummarizer es = new ExecutionSummarizer(); |
| assertEquals("ExecutionSummarizer init failed", |
| Summarizer.NA, es.getCommandLineArgsString()); |
| |
| long startTime = System.currentTimeMillis(); |
| // test configuration parameters |
| String[] initArgs = new String[] {"-Xmx20m", "-Dtest.args='test'"}; |
| es = new ExecutionSummarizer(initArgs); |
| |
| assertEquals("ExecutionSummarizer init failed", |
| "-Xmx20m -Dtest.args='test'", |
| es.getCommandLineArgsString()); |
| |
| // test start time |
| assertTrue("Start time mismatch", es.getStartTime() >= startTime); |
| assertTrue("Start time mismatch", |
| es.getStartTime() <= System.currentTimeMillis()); |
| |
| // test start() of ExecutionSummarizer |
| es.update(null); |
| assertEquals("ExecutionSummarizer init failed", 0, |
| es.getSimulationStartTime()); |
| testExecutionSummarizer(0, 0, 0, 0, 0, 0, 0, es); |
| |
| long simStartTime = System.currentTimeMillis(); |
| es.start(null); |
| assertTrue("Simulation start time mismatch", |
| es.getSimulationStartTime() >= simStartTime); |
| assertTrue("Simulation start time mismatch", |
| es.getSimulationStartTime() <= System.currentTimeMillis()); |
| |
| // test with job stats |
| JobStats stats = generateFakeJobStats(1, 10, true, false); |
| es.update(stats); |
| testExecutionSummarizer(1, 10, 0, 1, 1, 0, 0, es); |
| |
| // test with failed job |
| stats = generateFakeJobStats(5, 1, false, false); |
| es.update(stats); |
| testExecutionSummarizer(6, 11, 0, 2, 1, 1, 0, es); |
| |
| // test with successful but lost job |
| stats = generateFakeJobStats(1, 1, true, true); |
| es.update(stats); |
| testExecutionSummarizer(7, 12, 0, 3, 1, 1, 1, es); |
| |
| // test with failed but lost job |
| stats = generateFakeJobStats(2, 2, false, true); |
| es.update(stats); |
| testExecutionSummarizer(9, 14, 0, 4, 1, 1, 2, es); |
| |
| // test finalize |
| // define a fake job factory |
| JobFactory factory = new FakeJobFactory(conf); |
| |
| // fake the num jobs in trace |
| factory.numJobsInTrace = 3; |
| |
| Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp")); |
| Path testDir = new Path(rootTempDir, "testGridmixSummary"); |
| Path testTraceFile = new Path(testDir, "test-trace.json"); |
| FileSystem fs = FileSystem.getLocal(conf); |
| fs.create(testTraceFile).close(); |
| |
| // finalize the summarizer |
| UserResolver resolver = new RoundRobinUserResolver(); |
| DataStatistics dataStats = new DataStatistics(100, 2, true); |
| String policy = GridmixJobSubmissionPolicy.REPLAY.name(); |
| conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy); |
| es.finalize(factory, testTraceFile.toString(), 1024L, resolver, dataStats, |
| conf); |
| |
| // test num jobs in trace |
| assertEquals("Mismtach in num jobs in trace", 3, es.getNumJobsInTrace()); |
| |
| // test trace signature |
| String tid = |
| ExecutionSummarizer.getTraceSignature(testTraceFile.toString()); |
| assertEquals("Mismatch in trace signature", |
| tid, es.getInputTraceSignature()); |
| // test trace location |
| Path qPath = fs.makeQualified(testTraceFile); |
| assertEquals("Mismatch in trace filename", |
| qPath.toString(), es.getInputTraceLocation()); |
| // test expected data size |
| assertEquals("Mismatch in expected data size", |
| "1 K", es.getExpectedDataSize()); |
| // test input data statistics |
| assertEquals("Mismatch in input data statistics", |
| ExecutionSummarizer.stringifyDataStatistics(dataStats), |
| es.getInputDataStatistics()); |
| // test user resolver |
| assertEquals("Mismatch in user resolver", |
| resolver.getClass().getName(), es.getUserResolver()); |
| // test policy |
| assertEquals("Mismatch in policy", policy, es.getJobSubmissionPolicy()); |
| |
| // test data stringification using large data |
| es.finalize(factory, testTraceFile.toString(), 1024*1024*1024*10L, resolver, |
| dataStats, conf); |
| assertEquals("Mismatch in expected data size", |
| "10 G", es.getExpectedDataSize()); |
| |
| // test trace signature uniqueness |
| // touch the trace file |
| fs.delete(testTraceFile, false); |
| // sleep for 1 sec |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) {} |
| fs.create(testTraceFile).close(); |
| es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats, |
| conf); |
| // test missing expected data size |
| assertEquals("Mismatch in trace data size", |
| Summarizer.NA, es.getExpectedDataSize()); |
| assertFalse("Mismatch in trace signature", |
| tid.equals(es.getInputTraceSignature())); |
| // get the new identifier |
| tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString()); |
| assertEquals("Mismatch in trace signature", |
| tid, es.getInputTraceSignature()); |
| |
| testTraceFile = new Path(testDir, "test-trace2.json"); |
| fs.create(testTraceFile).close(); |
| es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats, |
| conf); |
| assertFalse("Mismatch in trace signature", |
| tid.equals(es.getInputTraceSignature())); |
| // get the new identifier |
| tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString()); |
| assertEquals("Mismatch in trace signature", |
| tid, es.getInputTraceSignature()); |
| |
| // finalize trace identifier '-' input |
| es.finalize(factory, "-", 0L, resolver, dataStats, conf); |
| assertEquals("Mismatch in trace signature", |
| Summarizer.NA, es.getInputTraceSignature()); |
| assertEquals("Mismatch in trace file location", |
| Summarizer.NA, es.getInputTraceLocation()); |
| } |
| |
| // test the ExecutionSummarizer |
| private static void testExecutionSummarizer(int numMaps, int numReds, |
| int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob, |
| int numFailedJobs, int numLostJobs, ExecutionSummarizer es) { |
| assertEquals("ExecutionSummarizer test failed [num-maps]", |
| numMaps, es.getNumMapTasksLaunched()); |
| assertEquals("ExecutionSummarizer test failed [num-reducers]", |
| numReds, es.getNumReduceTasksLaunched()); |
| assertEquals("ExecutionSummarizer test failed [num-jobs-in-trace]", |
| totalJobsInTrace, es.getNumJobsInTrace()); |
| assertEquals("ExecutionSummarizer test failed [num-submitted jobs]", |
| totalJobSubmitted, es.getNumSubmittedJobs()); |
| assertEquals("ExecutionSummarizer test failed [num-successful-jobs]", |
| numSuccessfulJob, es.getNumSuccessfulJobs()); |
| assertEquals("ExecutionSummarizer test failed [num-failed jobs]", |
| numFailedJobs, es.getNumFailedJobs()); |
| assertEquals("ExecutionSummarizer test failed [num-lost jobs]", |
| numLostJobs, es.getNumLostJobs()); |
| } |
| |
| // generate fake job stats |
| @SuppressWarnings("deprecation") |
| private static JobStats generateFakeJobStats(final int numMaps, |
| final int numReds, final boolean isSuccessful, final boolean lost) |
| throws IOException { |
| // A fake job |
| Job fakeJob = new Job() { |
| @Override |
| public int getNumReduceTasks() { |
| return numReds; |
| }; |
| |
| @Override |
| public boolean isSuccessful() throws IOException { |
| if (lost) { |
| throw new IOException("Test failure!"); |
| } |
| return isSuccessful; |
| }; |
| }; |
| return new JobStats(numMaps, numReds, fakeJob); |
| } |
| |
| /** |
| * Test {@link ClusterSummarizer}. |
| */ |
| @Test |
| public void testClusterSummarizer() throws IOException { |
| ClusterSummarizer cs = new ClusterSummarizer(); |
| Configuration conf = new Configuration(); |
| |
| String jt = "test-jt:1234"; |
| String nn = "test-nn:5678"; |
| conf.set(JTConfig.JT_IPC_ADDRESS, jt); |
| conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn); |
| cs.start(conf); |
| |
| assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo()); |
| assertEquals("NN name mismatch", nn, cs.getNamenodeInfo()); |
| |
| ClusterStats cStats = ClusterStats.getClusterStats(); |
| conf.set(JTConfig.JT_IPC_ADDRESS, "local"); |
| conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local"); |
| JobClient jc = new JobClient(conf); |
| cStats.setClusterMetric(jc.getClusterStatus()); |
| |
| cs.update(cStats); |
| |
| // test |
| assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks()); |
| assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks()); |
| assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers()); |
| assertEquals("Cluster summary test failed!", 0, |
| cs.getNumBlacklistedTrackers()); |
| } |
| } |