PIG-5263: Using wildcard doesn't work with OrcStorage (satishsaley via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1800339 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f56a414..b272a1a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
BUG FIXES
+PIG-5263: Using wildcard doesn't work with OrcStorage (satishsaley via rohini)
+
PIG-4548: Records Lost With Specific Combination of Commands and Streaming Function (knoguchi)
PIG-5262: Fix jdiff related issues: fail build upon error, correct xml character escaping (szita)
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
index 0d3f8d9..dd881ae 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
@@ -61,6 +61,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
@@ -190,7 +191,7 @@
if (inputAvroSchema == null || UDFContext.getUDFContext().isFrontend()) {
Configuration conf = job.getConfiguration();
- Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
+ Set<Path> paths = getGlobPaths(location, conf, true);
if (!paths.isEmpty()) {
// Set top level directories in input format. Adding all files will
// bloat configuration size
@@ -273,7 +274,7 @@
*/
@SuppressWarnings("deprecation")
protected Schema getAvroSchema(Path path, FileSystem fs) throws IOException {
- if (!fs.exists(path) || !AvroStorageUtils.PATH_FILTER.accept(path))
+ if (!fs.exists(path) || !Utils.VISIBLE_FILES.accept(path))
return null;
/* if path is first level directory or is a file */
@@ -281,7 +282,7 @@
return getSchema(path, fs);
}
- FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
+ FileStatus[] ss = fs.listStatus(path, Utils.VISIBLE_FILES);
Schema schema = null;
if (ss.length > 0) {
if (AvroStorageUtils.noDir(ss))
@@ -433,7 +434,7 @@
// getSchema is called during script parsing we don't want to fail
// here if path not found
- Set<Path> paths = AvroStorageUtils.getPaths(location, conf, false);
+ Set<Path> paths = getGlobPaths(location, conf, false);
if (!paths.isEmpty()) {
setInputAvroSchema(paths, conf);
}
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
index 887cb84..aa45a8c 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
@@ -41,6 +41,7 @@
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.Utils;
import org.codehaus.jackson.JsonNode;
/**
* This is utility class for this package
@@ -59,15 +60,6 @@
private static final String NONAME = "NONAME";
private static final String PIG_TUPLE_WRAPPER = "PIG_WRAPPER";
- /** ignore hdfs files with prefix "_" and "." */
- public static PathFilter PATH_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return !path.getName().startsWith("_")
- && !path.getName().startsWith(".");
- }
- };
-
static String getDummyFieldName(int index) {
return NONAME + "_" + index;
}
@@ -93,33 +85,6 @@
}
/**
- * Gets the list of paths from the pathString specified which may contain
- * comma-separated paths and glob style path
- *
- * @throws IOException
- */
- public static Set<Path> getPaths(String pathString, Configuration conf, boolean failIfNotFound)
- throws IOException {
- Set<Path> paths = new HashSet<Path>();
- String[] pathStrs = LoadFunc.getPathStrings(pathString);
- for (String pathStr : pathStrs) {
- FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);
- FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), PATH_FILTER);
- if (matchedFiles == null || matchedFiles.length == 0) {
- if (failIfNotFound) {
- throw new IOException("Input Pattern " + pathStr + " matches 0 files");
- } else {
- continue;
- }
- }
- for (FileStatus file : matchedFiles) {
- paths.add(file.getPath());
- }
- }
- return paths;
- }
-
- /**
* Returns all non-hidden files recursively inside the base paths given
*
* @throws IOException
@@ -140,7 +105,7 @@
private static void getAllFilesInternal(FileStatus file, Configuration conf,
Set<Path> paths, FileSystem fs) throws IOException {
- for (FileStatus f : fs.listStatus(file.getPath(), PATH_FILTER)) {
+ for (FileStatus f : fs.listStatus(file.getPath(), Utils.VISIBLE_FILES)) {
if (f.isDir()) {
getAllFilesInternal(f, conf, paths, fs);
} else {
@@ -167,7 +132,7 @@
if (!status.isDir()) {
return path;
}
- FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
+ FileStatus[] statuses = fs.listStatus(path, Utils.VISIBLE_FILES);
if (statuses.length == 0) {
return null;
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
index b58fc16..7d7d2b1 100644
--- a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
@@ -89,54 +89,6 @@
assertNull(realSchema);
}
- @Test
- public void testGetPaths() throws IOException {
- final String basedir = "file://" + System.getProperty("user.dir");
- final String tempdir = Long.toString(System.currentTimeMillis());
- final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
-
- String locationStr = null;
- Set<Path> paths;
- Configuration conf = new Configuration();
-
- // existent path
- locationStr = basedir;
- paths = AvroStorageUtils.getPaths(locationStr, conf, true);
- assertFalse(paths.isEmpty());
-
- // non-existent path
- locationStr = nonexistentpath;
- try {
- paths = AvroStorageUtils.getPaths(locationStr, conf, true);
- fail();
- } catch (IOException e) {
- assertTrue(e.getMessage().contains("matches 0 files"));
- }
-
- // empty glob pattern
- locationStr = basedir + "/{}";
- try {
- paths = AvroStorageUtils.getPaths(locationStr, conf, true);
- fail();
- } catch (IOException e) {
- assertTrue(e.getMessage().contains("matches 0 files"));
- }
-
- paths = AvroStorageUtils.getPaths(locationStr, conf, false);
- assertTrue(paths.isEmpty());
-
- // bad glob pattern
- locationStr = basedir + "/{1,";
- try {
- AvroStorageUtils.getPaths(locationStr, conf, true);
- Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
- } catch (IOException e) {
- // The message of the exception for illegal file pattern is rather long,
- // so we simply confirm if it contains 'illegal file pattern'.
- assertTrue(e.getMessage().contains("Illegal file pattern"));
- }
- }
-
// test merging null and non-null
@Test
public void testMergeSchema1() throws IOException {
diff --git a/src/org/apache/pig/LoadFunc.java b/src/org/apache/pig/LoadFunc.java
index c262bad..83e89a3 100644
--- a/src/org/apache/pig/LoadFunc.java
+++ b/src/org/apache/pig/LoadFunc.java
@@ -21,9 +21,14 @@
import java.net.URI;
import java.util.AbstractCollection;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -38,6 +43,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -210,7 +216,38 @@
return pathStrings.toArray(new String[0]);
}
-
+
+ /**
+ * Return all the file paths in commaSeparatedPaths matching patterns if any
+ *
+ * @param commaSeparatedPaths
+ * @param conf
+ * @param failIfNotFound
+ * @return a set of paths
+ * @throws IOException
+ */
+ public static Set<Path> getGlobPaths(String commaSeparatedPaths, Configuration conf, boolean failIfNotFound)
+ throws IOException {
+ Set<Path> paths = new HashSet<Path>();
+ String[] pathStrs = LoadFunc.getPathStrings(commaSeparatedPaths);
+ for (String pathStr : pathStrs) {
+ FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);
+ FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), Utils.VISIBLE_FILES);
+ if (matchedFiles == null || matchedFiles.length == 0) {
+ if (failIfNotFound) {
+ throw new IOException("Input Pattern " + pathStr + " matches 0 files");
+ }
+ else {
+ continue;
+ }
+ }
+ for (FileStatus file : matchedFiles) {
+ paths.add(file.getPath());
+ }
+ }
+ return paths;
+ }
+
/**
* Construct the absolute path from the file location and the current
* directory. The current directory is either of the form
@@ -324,4 +361,5 @@
public List<String> getShipFiles() {
return null;
}
+
}
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index e449c71..5f89706 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -315,9 +316,14 @@
if (p.getProperty(signature + SearchArgsSuffix) != null) {
job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
}
-
}
- FileInputFormat.setInputPaths(job, location);
+ Set<Path> paths = getGlobPaths(location, job.getConfiguration(), true);
+ if (!paths.isEmpty()) {
+ FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+ }
+ else {
+ throw new IOException("Input path \'" + location + "\' is not found");
+ }
}
private String getReqiredColumnIdString(boolean[] requiredColumns) {
diff --git a/test/org/apache/pig/builtin/TestOrcStorage.java b/test/org/apache/pig/builtin/TestOrcStorage.java
index b92561b..e881481 100644
--- a/test/org/apache/pig/builtin/TestOrcStorage.java
+++ b/test/org/apache/pig/builtin/TestOrcStorage.java
@@ -125,7 +125,7 @@
@Test
public void testSimpleLoad() throws Exception {
- pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc'" + " using OrcStorage();" );
+ pigServer.registerQuery("A = load '" + basedir + "*-file-11-format.orc'" + " using OrcStorage();");
Schema s = pigServer.dumpSchema("A");
assertEquals(s.toString(), "{boolean1: boolean,byte1: int,short1: int,int1: int,long1: long," +
"float1: float,double1: double,bytes1: bytearray,string1: chararray," +
diff --git a/test/org/apache/pig/test/TestLoadFunc.java b/test/org/apache/pig/test/TestLoadFunc.java
index 01f3db7..a78b82a 100644
--- a/test/org/apache/pig/test/TestLoadFunc.java
+++ b/test/org/apache/pig/test/TestLoadFunc.java
@@ -19,10 +19,15 @@
package org.apache.pig.test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
+import java.util.Set;
import org.junit.Assert;
-
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.pig.LoadFunc;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -150,6 +155,57 @@
Assert.assertEquals("har:///user/pig/harfile",
LoadFunc.getAbsolutePath("har:///user/pig/harfile",
curHdfsDir));
- }
-
+ }
+
+ @Test
+ public void testGlobPaths() throws IOException {
+ final String basedir = "file://" + System.getProperty("user.dir");
+ final String tempdir = Long.toString(System.currentTimeMillis());
+ final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
+
+ String locationStr = null;
+ Set<Path> paths;
+ Configuration conf = new Configuration();
+
+ // existent path
+ locationStr = basedir;
+ paths = LoadFunc.getGlobPaths(locationStr, conf, true);
+ assertFalse(paths.isEmpty());
+
+ // non-existent path
+ locationStr = nonexistentpath;
+ try {
+ paths = LoadFunc.getGlobPaths(locationStr, conf, true);
+ fail("Paths with pattern are not readable");
+ }
+ catch (IOException e) {
+ assertTrue(e.getMessage().contains("matches 0 files"));
+ }
+
+ // empty glob pattern
+ locationStr = basedir + "/{}";
+ try {
+ paths = LoadFunc.getGlobPaths(locationStr, conf, true);
+ fail();
+ }
+ catch (IOException e) {
+ assertTrue(e.getMessage().contains("matches 0 files"));
+ }
+
+ paths = LoadFunc.getGlobPaths(locationStr, conf, false);
+ assertTrue(paths.isEmpty());
+
+ // bad glob pattern
+ locationStr = basedir + "/{1,";
+ try {
+ LoadFunc.getGlobPaths(locationStr, conf, true);
+ Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
+ }
+ catch (IOException e) {
+ // The message of the exception for illegal file pattern is rather
+ // long, so we simply confirm if it contains 'illegal file pattern'.
+ assertTrue(e.getMessage().contains("Illegal file pattern"));
+ }
+ }
+
}