PIG-3891: FileBasedOutputSizeReader does not calculate size of files in sub-directories (nkollar via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1772380 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ecf0541..0651f16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -65,6 +65,8 @@
BUG FIXES
+PIG-3891: FileBasedOutputSizeReader does not calculate size of files in sub-directories (nkollar via rohini)
+
PIG-5070: Allow Grunt e2e tests to run in parallel (knoguchi)
PIG-5061: ant test -Dtestcase=TestBoolean failing (knoguchi)
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
index efc4279..c76e9b8 100644
--- a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
@@ -18,34 +18,41 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-public class TestMultiStorage extends TestCase {
+public class TestMultiStorage {
private static final String INPUT_FILE = "MultiStorageInput.txt";
private PigServer pigServer;
private PigServer pigServerLocal;
- private MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
- public TestMultiStorage() throws ExecException, IOException {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- pigServerLocal = new PigServer(ExecType.LOCAL);
+ public TestMultiStorage() throws Exception {
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ pigServerLocal = new PigServer(Util.getLocalTestMode());
}
public static final PathFilter hiddenPathFilter = new PathFilter() {
@@ -74,59 +81,83 @@
Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
}
- @Override
@Before
public void setUp() throws Exception {
createFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
Path localOut = new Path("local-out");
- Path dummy = new Path("dummy");
if (fs.exists(localOut)) {
fs.delete(localOut, true);
}
- if (fs.exists(dummy)) {
- fs.delete(dummy, true);
- }
}
- @Override
@After
public void tearDown() throws Exception {
new File(INPUT_FILE).delete();
Util.deleteFile(cluster, INPUT_FILE);
+ }
+
+ @AfterClass
+ public static void shutdown() {
cluster.shutDown();
}
enum Mode {
local, cluster
- };
+ }
@Test
public void testMultiStorage() throws IOException {
final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
+ "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
- final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING "
+ final String MULTI_STORE_LOCAL = "STORE A INTO 'local-out' USING "
+ "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
System.out.print("Testing in LOCAL mode: ...");
- //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
+ testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
System.out.println("Succeeded!");
-
+
System.out.print("Testing in CLUSTER mode: ...");
testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
System.out.println("Succeeded!");
-
-
}
- /**
- * The actual method that run the test in local or cluster mode.
- *
- * @param pigServer
- * @param mode
- * @param queries
- * @throws IOException
+ @Test
+ public void testOutputStats() throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+ pigServer.registerQuery("B = FILTER A BY name == 'apple';");
+ pigServer.registerQuery("STORE A INTO 'out1' USING org.apache.pig.piggybank.storage.MultiStorage('out1', '1');"); //153 bytes
+ pigServer.registerQuery("STORE B INTO 'out2' USING org.apache.pig.piggybank.storage.MultiStorage('out2', '1');"); // 45 bytes
+
+ ExecJob job = pigServer.executeBatch().get(0);
+
+ PigStats stats = job.getStatistics();
+ PigStats.JobGraph jobGraph = stats.getJobGraph();
+ JobStats jobStats = (JobStats) jobGraph.getSinks().get(0);
+ Map<String, Long> multiStoreCounters = jobStats.getMultiStoreCounters();
+ List<OutputStats> outputStats = SimplePigStats.get().getOutputStats();
+ OutputStats outputStats1 = "out1".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+ OutputStats outputStats2 = "out2".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+
+ assertEquals(153 + 45, stats.getBytesWritten());
+ assertEquals(2, outputStats.size()); // 2 split conditions
+ assertEquals(153, outputStats1.getBytes());
+ assertEquals(45, outputStats2.getBytes());
+ assertEquals(9, outputStats1.getRecords());
+ assertEquals(3, outputStats2.getRecords());
+ assertEquals(3L, multiStoreCounters.get("Output records in _1_out2").longValue());
+ assertEquals(9L, multiStoreCounters.get("Output records in _0_out1").longValue());
+
+ fs.delete(new Path("out1"), true);
+ fs.delete(new Path("out2"), true);
+ }
+
+ /**
+ * The actual method that run the test in local or cluster mode.
*/
private void testMultiStorage( Mode mode, String outPath,
String... queries) throws IOException {
@@ -142,42 +173,38 @@
/**
* Test if records are split into directories corresponding to split field
* values
- *
- * @param mode
- * @throws IOException
*/
private void verifyResults(Mode mode, String outPath) throws IOException {
FileSystem fs = (Mode.local == mode ? FileSystem
.getLocal(new Configuration()) : cluster.getFileSystem());
Path output = new Path(outPath);
- Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+ assertTrue("Output dir does not exists!", fs.exists(output)
&& fs.getFileStatus(output).isDir());
Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
- Assert.assertTrue("Split field dirs not found!", paths != null);
+ assertTrue("Split field dirs not found!", paths != null);
for (Path path : paths) {
String splitField = path.getName();
Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
- Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+ assertTrue("No files found for path: " + path.toUri().getPath(),
files != null);
for (Path filePath : files) {
- Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-
+ assertTrue("This shouldn't be a directory", fs.isFile(filePath));
BufferedReader reader = new BufferedReader(new InputStreamReader(fs
.open(filePath)));
String line = "";
int count = 0;
while ((line = reader.readLine()) != null) {
String[] fields = line.split("\\t");
- Assert.assertEquals(fields.length, 3);
- Assert.assertEquals("Unexpected field value in the output record",
+ assertEquals(fields.length, 3);
+ assertEquals("Unexpected field value in the output record",
splitField, fields[1]);
count++;
System.out.println("field: " + fields[1]);
- }
+ }
reader.close();
- Assert.assertEquals(count, 3);
+ assertEquals(count, 3);
}
}
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
index 2197437..2fe6751 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
@@ -75,16 +75,24 @@
return -1;
}
- long bytes = 0;
Path p = new Path(getLocationUri(sto));
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] lst = fs.listStatus(p);
+ return getPathSize(p, p.getFileSystem(conf));
+ }
+
+ private long getPathSize(Path storePath, FileSystem fs) throws IOException {
+ long bytes = 0;
+ FileStatus[] lst = fs.listStatus(storePath);
if (lst != null) {
for (FileStatus status : lst) {
- bytes += status.getLen();
+ if (status.isFile()) {
+ if (status.getLen() > 0)
+ bytes += status.getLen();
+ }
+ else { // recursively count nested leaves' (files) sizes
+ bytes += getPathSize(status.getPath(), fs);
+ }
}
}
-
return bytes;
}
diff --git a/test/org/apache/pig/test/TestMRJobStats.java b/test/org/apache/pig/test/TestMRJobStats.java
index 5be4cb9..5d5a281 100644
--- a/test/org/apache/pig/test/TestMRJobStats.java
+++ b/test/org/apache/pig/test/TestMRJobStats.java
@@ -102,7 +102,7 @@
try {
Constructor<MRJobStats> con = MRJobStats.class.getDeclaredConstructor(String.class, JobGraph.class);
con.setAccessible(true);
- MRJobStats jobStats = (MRJobStats) con.newInstance(name, plan);
+ MRJobStats jobStats = con.newInstance(name, plan);
return jobStats;
} catch (Exception e) {
return null;
@@ -202,14 +202,49 @@
}
}
- private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
- Configuration conf) throws Exception {
+ private POStore createPOStoreForFileBasedSystemWithSubDirectories(long size, StoreFuncInterface storeFunc, Configuration conf) throws Exception {
+ File root = createTmpDirectory("outputRoot", null);
+ File dir1 = createTmpDirectory("dir1", root);
+ File dir2 = createTmpDirectory("dir2", root);
+ createTmpFile("tempFile1", size, dir1);
+ createTmpFile("tempFile2", size, dir2);
- File file = File.createTempFile("tempFile", ".tmp");
+ storeFunc.setStoreLocation(root.getAbsolutePath(), new Job(conf));
+ FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
+ POStore poStore = new POStore(new OperatorKey());
+ poStore.setSFile(new FileSpec(root.getAbsolutePath(), funcSpec));
+ poStore.setStoreFunc(storeFunc);
+ poStore.setUp();
+
+ return poStore;
+ }
+
+ private static File createTmpDirectory(String name, File root) throws Exception {
+ File directory = File.createTempFile(name, "", root);
+
+ if (!(directory.delete())) {
+ throw new IOException("Could not delete temp file: " + directory.getAbsolutePath());
+ }
+
+ if (!(directory.mkdir())) {
+ throw new IOException("Could not create temp directory: " + directory.getAbsolutePath());
+ }
+
+ return directory;
+ }
+
+ private static File createTmpFile(String name, long size, File directory) throws Exception {
+ File file = directory == null ? File.createTempFile(name, ".tmp") : File.createTempFile(name, ".tmp", directory);
file.deleteOnExit();
RandomAccessFile f = new RandomAccessFile(file, "rw");
f.setLength(size);
f.close();
+ return file;
+ }
+
+ private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
+ Configuration conf) throws Exception {
+ File file = createTmpFile("tempFile", size, null);
storeFunc.setStoreLocation(file.getAbsolutePath(), new Job(conf));
FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
@@ -236,7 +271,7 @@
}
@Test
- public void testGetOuputSizeUsingFileBasedStorage() throws Exception {
+ public void testGetOutputSizeUsingFileBasedStorage() throws Exception {
// By default, FileBasedOutputSizeReader is used to compute the size of output.
Configuration conf = new Configuration();
@@ -249,7 +284,20 @@
}
@Test
- public void testGetOuputSizeUsingNonFileBasedStorage1() throws Exception {
+ public void testGetOutputSizeUsingFileBasedStorageWithSubDirectories() throws Exception {
+ // By default, FileBasedOutputSizeReader is used to compute the size of output.
+ Configuration conf = new Configuration();
+
+ long size = 2L * 1024 * 1024 * 1024;
+ long outputSize = JobStats.getOutputSize(
+ createPOStoreForFileBasedSystemWithSubDirectories(size, new PigStorageWithStatistics(), conf), conf);
+
+ assertEquals("The returned output size is expected to be sum of file sizes in the sub-directories",
+ 2 * size, outputSize);
+ }
+
+ @Test
+ public void testGetOutputSizeUsingNonFileBasedStorage1() throws Exception {
// By default, FileBasedOutputSizeReader is used to compute the size of output.
Configuration conf = new Configuration();
@@ -263,7 +311,7 @@
}
@Test
- public void testGetOuputSizeUsingNonFileBasedStorage2() throws Exception {
+ public void testGetOutputSizeUsingNonFileBasedStorage2() throws Exception {
// Register a custom output size reader in configuration
Configuration conf = new Configuration();
conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
@@ -279,7 +327,7 @@
}
@Test(expected = RuntimeException.class)
- public void testGetOuputSizeUsingNonFileBasedStorage3() throws Exception {
+ public void testGetOutputSizeUsingNonFileBasedStorage3() throws Exception {
// Register an invalid output size reader in configuration, and verify
// that an exception is thrown at run-time.
Configuration conf = new Configuration();
@@ -292,7 +340,7 @@
}
@Test
- public void testGetOuputSizeUsingNonFileBasedStorage4() throws Exception {
+ public void testGetOutputSizeUsingNonFileBasedStorage4() throws Exception {
// Register a comma-separated list of readers in configuration, and
// verify that the one that supports a non-file-based uri is used.
Configuration conf = new Configuration();
@@ -310,7 +358,7 @@
}
@Test
- public void testGetOuputSizeUsingNonFileBasedStorage5() throws Exception {
+ public void testGetOutputSizeUsingNonFileBasedStorage5() throws Exception {
Configuration conf = new Configuration();
long size = 2L * 1024 * 1024 * 1024;