blob: 1dfc8970e617f6e5b6e1cea75f7b36359a0f643d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.gridmix.Gridmix;
import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
/**
* Verify the Gridmix generated input if compression emulation turn on.
*/
public class TestGridmixCompressedInputGeneration
extends GridmixSystemTestCase {
private static final Log LOG =
LogFactory.getLog("TestGridmixCompressedInputGeneration.class");
/**
* Generate input data and verify whether input files are compressed
* or not.
* @throws Exception - if an error occurs.
*/
@Test
public void testGridmixCompressionInputGeneration() throws Exception {
final long inputSizeInMB = 1024 * 7;
final String [] runtimeValues = {"LOADJOB",
SubmitterUserResolver.class.getName(),
"STRESS",
inputSizeInMB + "m",
"file:///dev/null"};
final String [] otherArgs = {
"-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
"-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
};
LOG.info("Verify the generated compressed input data.");
runAndVerify(true, inputSizeInMB, runtimeValues, otherArgs);
}
/**
* Disable compression emulation and verify whether input files are
* compressed or not.
* @throws Exception
*/
@Test
public void testGridmixInputGenerationWithoutCompressionEnable()
throws Exception {
UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
final long inputSizeInMB = 1024 * 6;
final String [] runtimeValues = {"LOADJOB",
SubmitterUserResolver.class.getName(),
"STRESS",
inputSizeInMB + "m",
"file:///dev/null"};
final String [] otherArgs = {
"-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
"-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=false"
};
LOG.info("Verify the generated uncompressed input data.");
runAndVerify(false, inputSizeInMB, runtimeValues, otherArgs);
}
private void runAndVerify(boolean isCompressed, long INPUT_SIZE,
String [] runtimeValues, String [] otherArgs) throws Exception {
int exitCode =
UtilsForGridmix.runGridmixJob(gridmixDir, conf,
GridMixRunMode.DATA_GENERATION.getValue(),
runtimeValues,otherArgs);
Assert.assertEquals("Data generation has failed.", 0, exitCode);
verifyJobStatus();
verifyInputDataSize(INPUT_SIZE);
verifyInputFiles(isCompressed);
}
private void verifyInputFiles(boolean isCompressed) throws IOException {
List<String> inputFiles =
getInputFiles(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
for (String inputFile: inputFiles) {
boolean fileStatus = (inputFile.contains(".gz")
|| inputFile.contains(".tgz"))? true : false;
if (isCompressed) {
Assert.assertTrue("Compressed input split file was not found.",
fileStatus);
} else {
Assert.assertFalse("Uncompressed input split file was not found.",
fileStatus);
}
}
}
private void verifyInputDataSize(long INPUT_SIZE) throws IOException {
long actDataSize =
getInputDataSizeInMB(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
double ratio = ((double)actDataSize)/INPUT_SIZE;
long expDataSize = (long)(INPUT_SIZE * ratio);
Assert.assertEquals("Generated data has not matched with given size.",
expDataSize, actDataSize);
}
private void verifyJobStatus() throws IOException {
JobClient jobClient = jtClient.getClient();
int len = jobClient.getAllJobs().length;
LOG.info("Verify the job status after completion of job...");
Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED,
jobClient.getAllJobs()[len -1].getRunState());
}
private long getInputDataSizeInMB(Configuration conf, Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
ContentSummary csmry = fs.getContentSummary(inputDir);
long dataSize = csmry.getLength();
dataSize = dataSize/(1024 * 1024);
return dataSize;
}
private List<String> getInputFiles(Configuration conf, Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
FileStatus [] listStatus = fs.listStatus(inputDir);
List<String> files = new ArrayList<String>();
for (FileStatus fileStat : listStatus) {
files.add(getInputFile(fileStat, conf));
}
return files;
}
private String getInputFile(FileStatus fstatus, Configuration conf)
throws IOException {
String fileName = null;
if (!fstatus.isDirectory()) {
fileName = fstatus.getPath().getName();
} else {
FileSystem fs = fstatus.getPath().getFileSystem(conf);
FileStatus [] listStatus = fs.listStatus(fstatus.getPath());
for (FileStatus fileStat : listStatus) {
return getInputFile(fileStat, conf);
}
}
return fileName;
}
}