PIG-3891: FileBasedOutputSizeReader does not calculate size of files in sub-directories (nkollar via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1772380 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ecf0541..0651f16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -65,6 +65,8 @@
  
 BUG FIXES
 
+PIG-3891: FileBasedOutputSizeReader does not calculate size of files in sub-directories (nkollar via rohini)
+
 PIG-5070: Allow Grunt e2e tests to run in parallel (knoguchi)
 
 PIG-5061: ant test -Dtestcase=TestBoolean failing (knoguchi)
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
index efc4279..c76e9b8 100644
--- a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
@@ -18,34 +18,41 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class TestMultiStorage extends TestCase {
+public class TestMultiStorage {
   private static final String INPUT_FILE = "MultiStorageInput.txt";
 
   private PigServer pigServer;
   private PigServer pigServerLocal;
 
-  private MiniCluster cluster = MiniCluster.buildCluster();
+  private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
-  public TestMultiStorage() throws ExecException, IOException {
-    pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-    pigServerLocal = new PigServer(ExecType.LOCAL);
+  public TestMultiStorage() throws Exception {
+    pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+    pigServerLocal = new PigServer(Util.getLocalTestMode());
   }
 
   public static final PathFilter hiddenPathFilter = new PathFilter() {
@@ -74,59 +81,83 @@
     Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
   }
 
-  @Override
   @Before
   public void setUp() throws Exception {
     createFile();
     FileSystem fs = FileSystem.getLocal(new Configuration());
     Path localOut = new Path("local-out");
-    Path dummy = new Path("dummy");
     if (fs.exists(localOut)) {
       fs.delete(localOut, true);
     }
-    if (fs.exists(dummy)) {
-      fs.delete(dummy, true);
-    }
   }
 
-  @Override
   @After
   public void tearDown() throws Exception {
     new File(INPUT_FILE).delete();
     Util.deleteFile(cluster, INPUT_FILE);
+  }
+
+  @AfterClass
+  public static void shutdown() {
     cluster.shutDown();
   }
 
   enum Mode {
     local, cluster
-  };
+  }
 
   @Test
   public void testMultiStorage() throws IOException {
     final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
     final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
         + "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
-    final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING "
+    final String MULTI_STORE_LOCAL = "STORE A INTO 'local-out' USING "
         + "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
 
     System.out.print("Testing in LOCAL mode: ...");
-    //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
+    testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
     System.out.println("Succeeded!");
-    
+
     System.out.print("Testing in CLUSTER mode: ...");
     testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
     System.out.println("Succeeded!");
-    
-    
   }
 
-  /**
-   * The actual method that run the test in local or cluster mode. 
-   * 
-   * @param pigServer
-   * @param mode
-   * @param queries
-   * @throws IOException
+  @Test
+  public void testOutputStats() throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+    pigServer.registerQuery("B = FILTER A BY name == 'apple';");
+    pigServer.registerQuery("STORE A INTO 'out1' USING org.apache.pig.piggybank.storage.MultiStorage('out1', '1');"); //153 bytes
+    pigServer.registerQuery("STORE B INTO 'out2' USING org.apache.pig.piggybank.storage.MultiStorage('out2', '1');"); // 45 bytes
+
+    ExecJob job = pigServer.executeBatch().get(0);
+
+    PigStats stats = job.getStatistics();
+    PigStats.JobGraph jobGraph = stats.getJobGraph();
+    JobStats jobStats = (JobStats) jobGraph.getSinks().get(0);
+    Map<String, Long> multiStoreCounters = jobStats.getMultiStoreCounters();
+    List<OutputStats> outputStats = SimplePigStats.get().getOutputStats();
+    OutputStats outputStats1 = "out1".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+    OutputStats outputStats2 = "out2".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+
+    assertEquals(153 + 45, stats.getBytesWritten());
+    assertEquals(2, outputStats.size()); // 2 split conditions
+    assertEquals(153, outputStats1.getBytes());
+    assertEquals(45, outputStats2.getBytes());
+    assertEquals(9, outputStats1.getRecords());
+    assertEquals(3, outputStats2.getRecords());
+    assertEquals(3L, multiStoreCounters.get("Output records in _1_out2").longValue());
+    assertEquals(9L, multiStoreCounters.get("Output records in _0_out1").longValue());
+
+    fs.delete(new Path("out1"), true);
+    fs.delete(new Path("out2"), true);
+  }
+
+    /**
+   * The actual method that run the test in local or cluster mode.
    */
   private void testMultiStorage( Mode mode, String outPath,
       String... queries) throws IOException {
@@ -142,42 +173,38 @@
   /**
    * Test if records are split into directories corresponding to split field
    * values
-   * 
-   * @param mode
-   * @throws IOException
    */
   private void verifyResults(Mode mode, String outPath) throws IOException {
     FileSystem fs = (Mode.local == mode ? FileSystem
         .getLocal(new Configuration()) : cluster.getFileSystem());
     Path output = new Path(outPath);
-    Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+    assertTrue("Output dir does not exists!", fs.exists(output)
         && fs.getFileStatus(output).isDir());
 
     Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
-    Assert.assertTrue("Split field dirs not found!", paths != null);
+    assertTrue("Split field dirs not found!", paths != null);
 
     for (Path path : paths) {
       String splitField = path.getName();
       Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
-      Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+      assertTrue("No files found for path: " + path.toUri().getPath(),
           files != null);
       for (Path filePath : files) {
-        Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-        
+        assertTrue("This shouldn't be a directory", fs.isFile(filePath));
         BufferedReader reader = new BufferedReader(new InputStreamReader(fs
                 .open(filePath)));
         String line = "";
         int count = 0;
         while ((line = reader.readLine()) != null) {
           String[] fields = line.split("\\t");
-          Assert.assertEquals(fields.length, 3);
-          Assert.assertEquals("Unexpected field value in the output record",
+          assertEquals(fields.length, 3);
+          assertEquals("Unexpected field value in the output record",
                 splitField, fields[1]);
           count++;
           System.out.println("field: " + fields[1]);
-        }        
+        }
         reader.close();
-        Assert.assertEquals(count, 3);
+        assertEquals(count, 3);
       }
     }
   }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
index 2197437..2fe6751 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
@@ -75,16 +75,24 @@
             return -1;
         }
 
-        long bytes = 0;
         Path p = new Path(getLocationUri(sto));
-        FileSystem fs = p.getFileSystem(conf);
-        FileStatus[] lst = fs.listStatus(p);
+        return getPathSize(p, p.getFileSystem(conf));
+    }
+
+    private long getPathSize(Path storePath, FileSystem fs) throws IOException {
+        long bytes = 0;
+        FileStatus[] lst = fs.listStatus(storePath);
         if (lst != null) {
             for (FileStatus status : lst) {
-                bytes += status.getLen();
+                if (status.isFile()) {
+                    if (status.getLen() > 0)
+                        bytes += status.getLen();
+                }
+                else { // recursively count nested leaves' (files) sizes
+                    bytes += getPathSize(status.getPath(), fs);
+                }
             }
         }
-
         return bytes;
     }
 
diff --git a/test/org/apache/pig/test/TestMRJobStats.java b/test/org/apache/pig/test/TestMRJobStats.java
index 5be4cb9..5d5a281 100644
--- a/test/org/apache/pig/test/TestMRJobStats.java
+++ b/test/org/apache/pig/test/TestMRJobStats.java
@@ -102,7 +102,7 @@
         try {
             Constructor<MRJobStats> con = MRJobStats.class.getDeclaredConstructor(String.class, JobGraph.class);
             con.setAccessible(true);
-            MRJobStats jobStats = (MRJobStats) con.newInstance(name, plan);
+            MRJobStats jobStats = con.newInstance(name, plan);
             return jobStats;
         } catch (Exception e) {
             return null;
@@ -202,14 +202,49 @@
         }
     }
 
-    private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
-            Configuration conf) throws Exception {
+    private POStore createPOStoreForFileBasedSystemWithSubDirectories(long size, StoreFuncInterface storeFunc, Configuration conf) throws Exception {
+        File root = createTmpDirectory("outputRoot", null);
+        File dir1 = createTmpDirectory("dir1", root);
+        File dir2 = createTmpDirectory("dir2", root);
+        createTmpFile("tempFile1", size, dir1);
+        createTmpFile("tempFile2", size, dir2);
 
-        File file = File.createTempFile("tempFile", ".tmp");
+        storeFunc.setStoreLocation(root.getAbsolutePath(), new Job(conf));
+        FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
+        POStore poStore = new POStore(new OperatorKey());
+        poStore.setSFile(new FileSpec(root.getAbsolutePath(), funcSpec));
+        poStore.setStoreFunc(storeFunc);
+        poStore.setUp();
+
+        return poStore;
+    }
+
+    private static File createTmpDirectory(String name, File root) throws Exception {
+        File directory = File.createTempFile(name, "", root);
+
+        if (!(directory.delete())) {
+            throw new IOException("Could not delete temp file: " + directory.getAbsolutePath());
+        }
+
+        if (!(directory.mkdir())) {
+            throw new IOException("Could not create temp directory: " + directory.getAbsolutePath());
+        }
+
+        return directory;
+    }
+
+    private static File createTmpFile(String name, long size, File directory) throws Exception {
+        File file = directory == null ? File.createTempFile(name, ".tmp") : File.createTempFile(name, ".tmp", directory);
         file.deleteOnExit();
         RandomAccessFile f = new RandomAccessFile(file, "rw");
         f.setLength(size);
         f.close();
+        return file;
+    }
+
+    private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
+            Configuration conf) throws Exception {
+        File file = createTmpFile("tempFile", size, null);
 
         storeFunc.setStoreLocation(file.getAbsolutePath(), new Job(conf));
         FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
@@ -236,7 +271,7 @@
     }
 
     @Test
-    public void testGetOuputSizeUsingFileBasedStorage() throws Exception {
+    public void testGetOutputSizeUsingFileBasedStorage() throws Exception {
         // By default, FileBasedOutputSizeReader is used to compute the size of output.
         Configuration conf = new Configuration();
 
@@ -249,7 +284,20 @@
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage1() throws Exception {
+    public void testGetOutputSizeUsingFileBasedStorageWithSubDirectories() throws Exception {
+        // By default, FileBasedOutputSizeReader is used to compute the size of output.
+        Configuration conf = new Configuration();
+
+        long size = 2L * 1024 * 1024 * 1024;
+        long outputSize = JobStats.getOutputSize(
+                createPOStoreForFileBasedSystemWithSubDirectories(size, new PigStorageWithStatistics(), conf), conf);
+
+        assertEquals("The returned output size is expected to be sum of file sizes in the sub-directories",
+                2 * size, outputSize);
+    }
+
+    @Test
+    public void testGetOutputSizeUsingNonFileBasedStorage1() throws Exception {
         // By default, FileBasedOutputSizeReader is used to compute the size of output.
         Configuration conf = new Configuration();
 
@@ -263,7 +311,7 @@
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage2() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage2() throws Exception {
         // Register a custom output size reader in configuration
         Configuration conf = new Configuration();
         conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
@@ -279,7 +327,7 @@
     }
 
     @Test(expected = RuntimeException.class)
-    public void testGetOuputSizeUsingNonFileBasedStorage3() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage3() throws Exception {
         // Register an invalid output size reader in configuration, and verify
         // that an exception is thrown at run-time.
         Configuration conf = new Configuration();
@@ -292,7 +340,7 @@
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage4() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage4() throws Exception {
         // Register a comma-separated list of readers in configuration, and
         // verify that the one that supports a non-file-based uri is used.
         Configuration conf = new Configuration();
@@ -310,7 +358,7 @@
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage5() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage5() throws Exception {
         Configuration conf = new Configuration();
 
         long size = 2L * 1024 * 1024 * 1024;