PIG-5263: Using wildcard doesn't work with OrcStorage (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1800339 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f56a414..b272a1a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
  
 BUG FIXES
 
+PIG-5263: Using wildcard doesn't work with OrcStorage (satishsaley via rohini)
+
 PIG-4548: Records Lost With Specific Combination of Commands and Streaming Function (knoguchi)
 
 PIG-5262: Fix jdiff related issues: fail build upon error, correct xml character escaping (szita)
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
index 0d3f8d9..dd881ae 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
@@ -61,6 +61,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
@@ -190,7 +191,7 @@
 
         if (inputAvroSchema == null || UDFContext.getUDFContext().isFrontend()) {
             Configuration conf = job.getConfiguration();
-            Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
+            Set<Path> paths = getGlobPaths(location, conf, true);
             if (!paths.isEmpty()) {
                 // Set top level directories in input format. Adding all files will
                 // bloat configuration size
@@ -273,7 +274,7 @@
      */
     @SuppressWarnings("deprecation")
     protected Schema getAvroSchema(Path path, FileSystem fs) throws IOException {
-        if (!fs.exists(path) || !AvroStorageUtils.PATH_FILTER.accept(path))
+        if (!fs.exists(path) || !Utils.VISIBLE_FILES.accept(path))
             return null;
 
         /* if path is first level directory or is a file */
@@ -281,7 +282,7 @@
             return getSchema(path, fs);
         }
 
-        FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
+        FileStatus[] ss = fs.listStatus(path, Utils.VISIBLE_FILES);
         Schema schema = null;
         if (ss.length > 0) {
             if (AvroStorageUtils.noDir(ss))
@@ -433,7 +434,7 @@
             // getSchema is called during script parsing we don't want to fail
             // here if path not found
 
-            Set<Path> paths = AvroStorageUtils.getPaths(location, conf, false);
+            Set<Path> paths = getGlobPaths(location, conf, false);
             if (!paths.isEmpty()) {
                 setInputAvroSchema(paths, conf);
             }
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
index 887cb84..aa45a8c 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
@@ -41,6 +41,7 @@
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.Utils;
 import org.codehaus.jackson.JsonNode;
 /**
  * This is utility class for this package
@@ -59,15 +60,6 @@
     private static final String NONAME = "NONAME";
     private static final String PIG_TUPLE_WRAPPER = "PIG_WRAPPER";
 
-    /** ignore hdfs files with prefix "_" and "." */
-    public static PathFilter PATH_FILTER = new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            return !path.getName().startsWith("_")
-                        && !path.getName().startsWith(".");
-        }
-    };
-
     static String getDummyFieldName(int index) {
         return NONAME + "_" + index;
     }
@@ -93,33 +85,6 @@
     }
 
     /**
-     * Gets the list of paths from the pathString specified which may contain
-     * comma-separated paths and glob style path
-     *
-     * @throws IOException
-     */
-    public static Set<Path> getPaths(String pathString, Configuration conf, boolean failIfNotFound)
-            throws IOException {
-        Set<Path> paths = new HashSet<Path>();
-        String[] pathStrs = LoadFunc.getPathStrings(pathString);
-        for (String pathStr : pathStrs) {
-            FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);
-            FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), PATH_FILTER);
-            if (matchedFiles == null || matchedFiles.length == 0) {
-                if (failIfNotFound) {
-                    throw new IOException("Input Pattern " + pathStr + " matches 0 files");
-                } else {
-                    continue;
-                }
-            }
-            for (FileStatus file : matchedFiles) {
-                paths.add(file.getPath());
-            }
-        }
-        return paths;
-    }
-
-    /**
      * Returns all non-hidden files recursively inside the base paths given
      *
      * @throws IOException
@@ -140,7 +105,7 @@
 
     private static void getAllFilesInternal(FileStatus file, Configuration conf,
             Set<Path> paths, FileSystem fs) throws IOException {
-        for (FileStatus f : fs.listStatus(file.getPath(), PATH_FILTER)) {
+        for (FileStatus f : fs.listStatus(file.getPath(), Utils.VISIBLE_FILES)) {
             if (f.isDir()) {
                 getAllFilesInternal(f, conf, paths, fs);
             } else {
@@ -167,7 +132,7 @@
         if (!status.isDir()) {
             return path;
         }
-        FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
+        FileStatus[] statuses = fs.listStatus(path, Utils.VISIBLE_FILES);
 
         if (statuses.length == 0) {
             return null;
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
index b58fc16..7d7d2b1 100644
--- a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
@@ -89,54 +89,6 @@
         assertNull(realSchema);
     }
 
-    @Test
-    public void testGetPaths() throws IOException {
-        final String basedir = "file://" + System.getProperty("user.dir");
-        final String tempdir = Long.toString(System.currentTimeMillis());
-        final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
-
-        String locationStr = null;
-        Set<Path> paths;
-        Configuration conf = new Configuration();
-
-        // existent path
-        locationStr = basedir;
-        paths = AvroStorageUtils.getPaths(locationStr, conf, true);
-        assertFalse(paths.isEmpty());
-
-        // non-existent path
-        locationStr = nonexistentpath;
-        try {
-            paths = AvroStorageUtils.getPaths(locationStr, conf, true);
-            fail();
-        } catch (IOException e) {
-            assertTrue(e.getMessage().contains("matches 0 files"));
-        }
-
-        // empty glob pattern
-        locationStr = basedir + "/{}";
-        try {
-            paths = AvroStorageUtils.getPaths(locationStr, conf, true);
-            fail();
-        } catch (IOException e) {
-            assertTrue(e.getMessage().contains("matches 0 files"));
-        }
-
-        paths = AvroStorageUtils.getPaths(locationStr, conf, false);
-        assertTrue(paths.isEmpty());
-
-        // bad glob pattern
-        locationStr = basedir + "/{1,";
-        try {
-            AvroStorageUtils.getPaths(locationStr, conf, true);
-            Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
-        } catch (IOException e) {
-            // The message of the exception for illegal file pattern is rather long,
-            // so we simply confirm if it contains 'illegal file pattern'.
-            assertTrue(e.getMessage().contains("Illegal file pattern"));
-        }
-    }
-
     // test merging null and non-null
     @Test
     public void testMergeSchema1() throws IOException {
diff --git a/src/org/apache/pig/LoadFunc.java b/src/org/apache/pig/LoadFunc.java
index c262bad..83e89a3 100644
--- a/src/org/apache/pig/LoadFunc.java
+++ b/src/org/apache/pig/LoadFunc.java
@@ -21,9 +21,14 @@
 import java.net.URI;
 import java.util.AbstractCollection;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -38,6 +43,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 
@@ -210,7 +216,38 @@
 
         return pathStrings.toArray(new String[0]);
     }
-    
+
+    /**
+     * Return all the file paths in commaSeparatedPaths matching patterns if any
+     *
+     * @param commaSeparatedPaths
+     * @param conf
+     * @param failIfNotFound
+     * @return a set of paths
+     * @throws IOException
+     */
+    public static Set<Path> getGlobPaths(String commaSeparatedPaths, Configuration conf, boolean failIfNotFound)
+            throws IOException {
+        Set<Path> paths = new HashSet<Path>();
+        String[] pathStrs = LoadFunc.getPathStrings(commaSeparatedPaths);
+        for (String pathStr : pathStrs) {
+            FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);
+            FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), Utils.VISIBLE_FILES);
+            if (matchedFiles == null || matchedFiles.length == 0) {
+                if (failIfNotFound) {
+                    throw new IOException("Input Pattern " + pathStr + " matches 0 files");
+                }
+                else {
+                    continue;
+                }
+            }
+            for (FileStatus file : matchedFiles) {
+                paths.add(file.getPath());
+            }
+        }
+        return paths;
+    }
+
     /**
      * Construct the absolute path from the file location and the current
      * directory. The current directory is either of the form 
@@ -324,4 +361,5 @@
     public List<String> getShipFiles() {
         return null;
     }
+
 }
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index e449c71..5f89706 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -315,9 +316,14 @@
             if (p.getProperty(signature + SearchArgsSuffix) != null) {
                 job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
             }
-
         }
-        FileInputFormat.setInputPaths(job, location);
+        Set<Path> paths = getGlobPaths(location, job.getConfiguration(), true);
+        if (!paths.isEmpty()) {
+            FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+        }
+        else {
+            throw new IOException("Input path \'" + location + "\' is not found");
+        }
     }
 
     private String getReqiredColumnIdString(boolean[] requiredColumns) {
diff --git a/test/org/apache/pig/builtin/TestOrcStorage.java b/test/org/apache/pig/builtin/TestOrcStorage.java
index b92561b..e881481 100644
--- a/test/org/apache/pig/builtin/TestOrcStorage.java
+++ b/test/org/apache/pig/builtin/TestOrcStorage.java
@@ -125,7 +125,7 @@
 
     @Test
     public void testSimpleLoad() throws Exception {
-        pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc'" + " using OrcStorage();" );
+        pigServer.registerQuery("A = load '" + basedir + "*-file-11-format.orc'" + " using OrcStorage();");
         Schema s = pigServer.dumpSchema("A");
         assertEquals(s.toString(), "{boolean1: boolean,byte1: int,short1: int,int1: int,long1: long," +
                 "float1: float,double1: double,bytes1: bytearray,string1: chararray," +
diff --git a/test/org/apache/pig/test/TestLoadFunc.java b/test/org/apache/pig/test/TestLoadFunc.java
index 01f3db7..a78b82a 100644
--- a/test/org/apache/pig/test/TestLoadFunc.java
+++ b/test/org/apache/pig/test/TestLoadFunc.java
@@ -19,10 +19,15 @@
 package org.apache.pig.test;
 
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
+import java.util.Set;
 
 import org.junit.Assert;
-
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -150,6 +155,57 @@
         Assert.assertEquals("har:///user/pig/harfile",
                 LoadFunc.getAbsolutePath("har:///user/pig/harfile",
                         curHdfsDir));
-    }   
-    
+    }
+
+    @Test
+    public void testGlobPaths() throws IOException {
+        final String basedir = "file://" + System.getProperty("user.dir");
+        final String tempdir = Long.toString(System.currentTimeMillis());
+        final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
+
+        String locationStr = null;
+        Set<Path> paths;
+        Configuration conf = new Configuration();
+
+        // existent path
+        locationStr = basedir;
+        paths = LoadFunc.getGlobPaths(locationStr, conf, true);
+        assertFalse(paths.isEmpty());
+
+        // non-existent path
+        locationStr = nonexistentpath;
+        try {
+            paths = LoadFunc.getGlobPaths(locationStr, conf, true);
+            fail("Paths with pattern are not readable");
+        }
+        catch (IOException e) {
+            assertTrue(e.getMessage().contains("matches 0 files"));
+        }
+
+        // empty glob pattern
+        locationStr = basedir + "/{}";
+        try {
+            paths = LoadFunc.getGlobPaths(locationStr, conf, true);
+            fail();
+        }
+        catch (IOException e) {
+            assertTrue(e.getMessage().contains("matches 0 files"));
+        }
+
+        paths = LoadFunc.getGlobPaths(locationStr, conf, false);
+        assertTrue(paths.isEmpty());
+
+        // bad glob pattern
+        locationStr = basedir + "/{1,";
+        try {
+            LoadFunc.getGlobPaths(locationStr, conf, true);
+            Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
+        }
+        catch (IOException e) {
+            // The message of the exception for illegal file pattern is rather
+            // long, so we simply confirm if it contains 'illegal file pattern'.
+            assertTrue(e.getMessage().contains("Illegal file pattern"));
+        }
+    }
+
 }