blob: 5d5a281600b2cfc88e5e4c6bfe7e94ec2283de75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
public class TestMRJobStats {
private static final TaskReport[] mapTaskReports = new TaskReport[5];
private static final TaskReport[] reduceTaskReports = new TaskReport[5];
private static final JobID jobID = new JobID("jobStat", Integer.MAX_VALUE);
private static String ASSERT_STRING;
private static final int ONE_THOUSAND = 1000;
private static final long[][] MAP_START_FINISH_TIME_DATA = { { 0, 100},
{ 200, 400 }, { 500, 800 }, { 900, 1300 }, { 1400, 1900 } };
private static final long[][] REDUCE_START_FINISH_TIME_DATA = { { 0, 100 },
{ 200, 400 }, { 500, 700 }, { 700, 900 }, { 1000, 1500 } };
@BeforeClass
public static void oneTimeSetup() throws Exception {
// setting up TaskReport for map tasks
for (int i = 0; i < mapTaskReports.length; i++) {
mapTaskReports[i] = Mockito.mock(TaskReport.class);
Mockito.when(mapTaskReports[i].getStartTime()).thenReturn(MAP_START_FINISH_TIME_DATA[i][0] * ONE_THOUSAND);
Mockito.when(mapTaskReports[i].getFinishTime()).thenReturn(MAP_START_FINISH_TIME_DATA[i][1] * ONE_THOUSAND);
}
// setting up TaskReport for reduce tasks
for (int i = 0; i < reduceTaskReports.length; i++) {
reduceTaskReports[i] = Mockito.mock(TaskReport.class);
Mockito.when(reduceTaskReports[i].getStartTime()).thenReturn(REDUCE_START_FINISH_TIME_DATA[i][0] * ONE_THOUSAND);
Mockito.when(reduceTaskReports[i].getFinishTime()).thenReturn(REDUCE_START_FINISH_TIME_DATA[i][1] * ONE_THOUSAND);
}
StringBuilder sb = new StringBuilder();
sb.append(jobID.toString()).append("\t");
sb.append(mapTaskReports.length).append("\t");
sb.append(reduceTaskReports.length).append("\t");
sb.append("500\t100\t300\t300\t500\t100\t240\t200");
ASSERT_STRING = sb.toString();
}
MRJobStats createJobStats(String name, JobGraph plan) {
try {
Constructor<MRJobStats> con = MRJobStats.class.getDeclaredConstructor(String.class, JobGraph.class);
con.setAccessible(true);
MRJobStats jobStats = con.newInstance(name, plan);
return jobStats;
} catch (Exception e) {
return null;
}
}
Method getJobStatsMethod(String methodName, Class<?>... parameterTypes) throws Exception {
Method m = MRJobStats.class.getDeclaredMethod(methodName, parameterTypes);
m.setAccessible(true);
return m;
}
@Test
public void testMedianMapReduceTime() throws Exception {
JobClient jobClient = Mockito.mock(JobClient.class);
// mock methods to return the predefined map and reduce task reports
Mockito.when(jobClient.getMapTaskReports(jobID)).thenReturn(mapTaskReports);
Mockito.when(jobClient.getReduceTaskReports(jobID)).thenReturn(reduceTaskReports);
PigStats.JobGraph jobGraph = new PigStats.JobGraph();
MRJobStats jobStats = createJobStats("JobStatsTest", jobGraph);
getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
jobStats.setSuccessful(true);
getJobStatsMethod("addMapReduceStatistics", Iterator.class, Iterator.class)
.invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), Arrays.asList(reduceTaskReports).iterator());
String msg = (String)getJobStatsMethod("getDisplayString")
.invoke(jobStats);
System.out.println(JobStats.SUCCESS_HEADER);
System.out.println(msg);
assertTrue(msg.startsWith(ASSERT_STRING));
}
@Test
public void testOneTaskReport() throws Exception {
// setting up one map task report
TaskReport[] mapTaskReports = new TaskReport[1];
mapTaskReports[0] = Mockito.mock(TaskReport.class);
Mockito.when(mapTaskReports[0].getStartTime()).thenReturn(300L * ONE_THOUSAND);
Mockito.when(mapTaskReports[0].getFinishTime()).thenReturn(400L * ONE_THOUSAND);
// setting up one reduce task report
TaskReport[] reduceTaskReports = new TaskReport[1];
reduceTaskReports[0] = Mockito.mock(TaskReport.class);
Mockito.when(reduceTaskReports[0].getStartTime()).thenReturn(500L * ONE_THOUSAND);
Mockito.when(reduceTaskReports[0].getFinishTime()).thenReturn(700L * ONE_THOUSAND);
PigStats.JobGraph jobGraph = new PigStats.JobGraph();
MRJobStats jobStats = createJobStats("JobStatsTest", jobGraph);
getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
jobStats.setSuccessful(true);
getJobStatsMethod("addMapReduceStatistics", Iterator.class, Iterator.class)
.invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), Arrays.asList(reduceTaskReports).iterator());
String msg = (String)getJobStatsMethod("getDisplayString")
.invoke(jobStats);
System.out.println(JobStats.SUCCESS_HEADER);
System.out.println(msg);
StringBuilder sb = new StringBuilder();
sb.append(jobID.toString()).append("\t");
sb.append(mapTaskReports.length).append("\t");
sb.append(reduceTaskReports.length).append("\t");
sb.append("100\t100\t100\t100\t200\t200\t200\t200");
System.out.println("assert msg: " + sb.toString());
assertTrue(msg.startsWith(sb.toString()));
}
/**
* Dummy output size reader class for testing JobStats.getOutputSize()
*/
public static class DummyOutputSizeReader implements PigStatsOutputSizeReader {
public static final long SIZE = 12345;
/**
* Returns true always
* @param sto POStore
*/
@Override
public boolean supports(POStore sto, Configuration conf) {
return true;
}
/**
* Returns a dummy constant value
* @param sto POStore
* @param conf configuration
*/
@Override
public long getOutputSize(POStore sto, Configuration conf) throws IOException {
return SIZE;
}
}
private POStore createPOStoreForFileBasedSystemWithSubDirectories(long size, StoreFuncInterface storeFunc, Configuration conf) throws Exception {
File root = createTmpDirectory("outputRoot", null);
File dir1 = createTmpDirectory("dir1", root);
File dir2 = createTmpDirectory("dir2", root);
createTmpFile("tempFile1", size, dir1);
createTmpFile("tempFile2", size, dir2);
storeFunc.setStoreLocation(root.getAbsolutePath(), new Job(conf));
FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
POStore poStore = new POStore(new OperatorKey());
poStore.setSFile(new FileSpec(root.getAbsolutePath(), funcSpec));
poStore.setStoreFunc(storeFunc);
poStore.setUp();
return poStore;
}
private static File createTmpDirectory(String name, File root) throws Exception {
File directory = File.createTempFile(name, "", root);
if (!(directory.delete())) {
throw new IOException("Could not delete temp file: " + directory.getAbsolutePath());
}
if (!(directory.mkdir())) {
throw new IOException("Could not create temp directory: " + directory.getAbsolutePath());
}
return directory;
}
private static File createTmpFile(String name, long size, File directory) throws Exception {
File file = directory == null ? File.createTempFile(name, ".tmp") : File.createTempFile(name, ".tmp", directory);
file.deleteOnExit();
RandomAccessFile f = new RandomAccessFile(file, "rw");
f.setLength(size);
f.close();
return file;
}
private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
Configuration conf) throws Exception {
File file = createTmpFile("tempFile", size, null);
storeFunc.setStoreLocation(file.getAbsolutePath(), new Job(conf));
FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
POStore poStore = new POStore(new OperatorKey());
poStore.setSFile(new FileSpec(file.getAbsolutePath(), funcSpec));
poStore.setStoreFunc(storeFunc);
poStore.setUp();
return poStore;
}
private static POStore createPOStoreForNonFileBasedSystem(StoreFuncInterface storeFunc,
Configuration conf) throws Exception {
String nonFileBasedUri = "hbase://tableName";
storeFunc.setStoreLocation(nonFileBasedUri, new Job(conf));
FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
POStore poStore = new POStore(new OperatorKey());
poStore.setSFile(new FileSpec(nonFileBasedUri, funcSpec));
poStore.setStoreFunc(storeFunc);
poStore.setUp();
return poStore;
}
@Test
public void testGetOutputSizeUsingFileBasedStorage() throws Exception {
// By default, FileBasedOutputSizeReader is used to compute the size of output.
Configuration conf = new Configuration();
long size = 2L * 1024 * 1024 * 1024;
long outputSize = JobStats.getOutputSize(
createPOStoreForFileBasedSystem(size, new PigStorageWithStatistics(), conf), conf);
assertEquals("The returned output size is expected to be the same as the file size",
size, outputSize);
}
@Test
public void testGetOutputSizeUsingFileBasedStorageWithSubDirectories() throws Exception {
// By default, FileBasedOutputSizeReader is used to compute the size of output.
Configuration conf = new Configuration();
long size = 2L * 1024 * 1024 * 1024;
long outputSize = JobStats.getOutputSize(
createPOStoreForFileBasedSystemWithSubDirectories(size, new PigStorageWithStatistics(), conf), conf);
assertEquals("The returned output size is expected to be sum of file sizes in the sub-directories",
2 * size, outputSize);
}
@Test
public void testGetOutputSizeUsingNonFileBasedStorage1() throws Exception {
// By default, FileBasedOutputSizeReader is used to compute the size of output.
Configuration conf = new Configuration();
// ClientSystemProps is needed to instantiate HBaseStorage
UDFContext.getUDFContext().setClientSystemProps(new Properties());
long outputSize = JobStats.getOutputSize(
createPOStoreForNonFileBasedSystem(new HBaseStorage("colName"), conf), conf);
assertEquals("The default output size reader returns -1 for a non-file-based uri",
-1, outputSize);
}
@Test
public void testGetOutputSizeUsingNonFileBasedStorage2() throws Exception {
// Register a custom output size reader in configuration
Configuration conf = new Configuration();
conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
DummyOutputSizeReader.class.getName());
// ClientSystemProps is needed to instantiate HBaseStorage
UDFContext.getUDFContext().setClientSystemProps(new Properties());
long outputSize = JobStats.getOutputSize(
createPOStoreForNonFileBasedSystem(new HBaseStorage("colName"), conf), conf);
assertEquals("The dummy output size reader always returns " + DummyOutputSizeReader.SIZE,
DummyOutputSizeReader.SIZE, outputSize);
}
@Test(expected = RuntimeException.class)
public void testGetOutputSizeUsingNonFileBasedStorage3() throws Exception {
// Register an invalid output size reader in configuration, and verify
// that an exception is thrown at run-time.
Configuration conf = new Configuration();
conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY, "bad_output_size_reader");
// ClientSystemProps is needed to instantiate HBaseStorage
UDFContext.getUDFContext().setClientSystemProps(new Properties());
JobStats.getOutputSize(
createPOStoreForNonFileBasedSystem(new HBaseStorage("colName"), conf), conf);
}
@Test
public void testGetOutputSizeUsingNonFileBasedStorage4() throws Exception {
// Register a comma-separated list of readers in configuration, and
// verify that the one that supports a non-file-based uri is used.
Configuration conf = new Configuration();
conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
FileBasedOutputSizeReader.class.getName() + ","
+ DummyOutputSizeReader.class.getName());
// ClientSystemProps needs to be initialized to instantiate HBaseStorage
UDFContext.getUDFContext().setClientSystemProps(new Properties());
long outputSize = JobStats.getOutputSize(
createPOStoreForNonFileBasedSystem(new HBaseStorage("colName"), conf), conf);
assertEquals("The dummy output size reader always returns " + DummyOutputSizeReader.SIZE,
DummyOutputSizeReader.SIZE, outputSize);
}
@Test
public void testGetOutputSizeUsingNonFileBasedStorage5() throws Exception {
Configuration conf = new Configuration();
long size = 2L * 1024 * 1024 * 1024;
long outputSize = JobStats.getOutputSize(
createPOStoreForFileBasedSystem(size, new PigStorageWithStatistics(), conf), conf);
// By default, FileBasedOutputSizeReader is used to compute the size of output.
assertEquals("The returned output size is expected to be the same as the file size",
size, outputSize);
// Now add PigStorageWithStatistics to the unsupported store funcs list, and
// verify that JobStats.getOutputSize() returns -1.
conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED,
PigStorageWithStatistics.class.getName());
outputSize = JobStats.getOutputSize(
createPOStoreForFileBasedSystem(size, new PigStorageWithStatistics(), conf), conf);
assertEquals("The default output size reader returns -1 for unsupported store funcs",
-1, outputSize);
}
// See PIG-4043
@Test
public void testNoTaskReportProperty() throws IOException{
MiniGenericCluster cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_MR);
Properties properties = cluster.getProperties();
String inputFile = "input";
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, inputFile));
pw.println("100\tapple");
pw.println("200\torange");
pw.close();
// Enable task reports in job statistics
properties.setProperty(PigConfiguration.PIG_NO_TASK_REPORT, "false");
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.setBatchOn();
// Launch a map-only job
pigServer.registerQuery("A = load '" + inputFile + "' as (id:int, fruit:chararray);");
pigServer.registerQuery("store A into 'task_reports';");
List<ExecJob> jobs = pigServer.executeBatch();
PigStats pigStats = jobs.get(0).getStatistics();
MRJobStats jobStats = (MRJobStats) pigStats.getJobGraph().getJobList().get(0);
// Make sure JobStats includes TaskReports information
long minMapTime = jobStats.getMinMapTime();
long maxMapTime = jobStats.getMaxMapTime();
long avgMapTime = jobStats.getAvgMapTime();
assertTrue("TaskReports are enabled, so minMapTime shouldn't be -1", minMapTime != -1l);
assertTrue("TaskReports are enabled, so maxMapTime shouldn't be -1", maxMapTime != -1l);
assertTrue("TaskReports are enabled, so avgMapTime shouldn't be -1", avgMapTime != -1l);
// Disable task reports in job statistics
properties.setProperty(PigConfiguration.PIG_NO_TASK_REPORT, "true");
// Launch another map-only job
pigServer.registerQuery("B = load '" + inputFile + "' as (id:int, fruit:chararray);");
pigServer.registerQuery("store B into 'no_task_reports';");
jobs = pigServer.executeBatch();
pigStats = jobs.get(0).getStatistics();
jobStats = (MRJobStats) pigStats.getJobGraph().getJobList().get(0);
// Make sure JobStats doesn't include any TaskReports information
minMapTime = jobStats.getMinMapTime();
maxMapTime = jobStats.getMaxMapTime();
avgMapTime = jobStats.getAvgMapTime();
assertEquals("TaskReports are disabled, so minMapTime should be -1", -1l, minMapTime);
assertEquals("TaskReports are disabled, so maxMapTime should be -1", -1l, maxMapTime);
assertEquals("TaskReports are disabled, so avgMapTime should be -1", -1l, avgMapTime);
cluster.shutDown();
}
}