Merge branch 'master' of github.com:Parquet/parquet-mr into pages_refactoring
diff --git a/README.md b/README.md
index e8081f2..359cfc7 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@
<tr><td>Bit Packing</td><td>YES</td><td></td></td><td></td><td>1.0</td></tr>
<tr><td>Adaptive dictionary encoding</td><td>YES</td><td></td></td><td></td><td>1.0</td></tr>
<tr><td>Complex structure support</td><td>YES</td><td></td></td><td></td><td>1.0</td></tr>
- <tr><td>Predicate pushdown</td><td></td><td>YES (<a href ="https://github.com/Parquet/parquet-mr/pull/68">68</a>)</td></td><td></td><td>2.0</td></tr>
+ <tr><td>Predicate pushdown</td><td>YES (<a href ="https://github.com/Parquet/parquet-mr/pull/68">68</a>)</td><td></td></td><td></td><td>1.0</td></tr>
<tr><td>Column stats</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr> <tr><td>Delta encoding</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>
<tr><td>Native Protocol Buffers support</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>
<tr><td>Index pages</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index b6cb762..9e55185 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -201,17 +202,17 @@
}
}
splits.add(new ParquetInputSplit(
- fileStatus.getPath(),
- hdfsBlock.getOffset(),
- length,
- hdfsBlock.getHosts(),
- blocksForCurrentSplit,
- fileMetaData.getSchema().toString(),
- requestedSchema,
- fileMetaData.getSchema().toString(),
- fileMetaData.getKeyValueMetaData(),
- readSupportMetadata
- ));
+ fileStatus.getPath(),
+ hdfsBlock.getOffset(),
+ length,
+ hdfsBlock.getHosts(),
+ blocksForCurrentSplit,
+ fileMetaData.getSchema().toString(),
+ requestedSchema,
+ fileMetaData.getSchema().toString(),
+ fileMetaData.getKeyValueMetaData(),
+ readSupportMetadata
+ ));
}
}
return splits;
@@ -256,6 +257,53 @@
return splits;
}
+ /*
+ * This is to support multi-level/recursive directory listing until
+ * MAPREDUCE-1577 is fixed.
+ */
+ @Override
+ protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+ return getAllFileRecursively(super.listStatus(jobContext),
+ ContextUtil.getConfiguration(jobContext));
+ }
+
+ private static List<FileStatus> getAllFileRecursively(
+ List<FileStatus> files, Configuration conf) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ int len = files.size();
+ for (int i = 0; i < len; ++i) {
+ FileStatus file = files.get(i);
+ if (file.isDir()) {
+ Path p = file.getPath();
+ FileSystem fs = p.getFileSystem(conf);
+ addInputPathRecursively(result, fs, p, hiddenFileFilter);
+ } else {
+ result.add(file);
+ }
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ private static void addInputPathRecursively(List<FileStatus> result,
+ FileSystem fs, Path path, PathFilter inputFilter)
+ throws IOException {
+ for (FileStatus stat: fs.listStatus(path, inputFilter)) {
+ if (stat.isDir()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
/**
* @param jobContext the current job context
* @return the footers for the files
@@ -263,15 +311,15 @@
*/
public List<Footer> getFooters(JobContext jobContext) throws IOException {
if (footers == null) {
- footers = getFooters(ContextUtil.getConfiguration(jobContext), super.listStatus(jobContext));
+ footers = getFooters(ContextUtil.getConfiguration(jobContext), listStatus(jobContext));
}
return footers;
}
public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
- LOG.debug("reading " + statuses.size() + " files");
- return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
+ LOG.debug("reading " + statuses.size() + " files");
+ return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
}
/**
diff --git a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
index a193472..7fb1ab5 100644
--- a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
@@ -18,6 +18,8 @@
import static parquet.pig.TupleReadSupport.PARQUET_PIG_REQUESTED_SCHEMA;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -26,11 +28,15 @@
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
@@ -48,10 +54,11 @@
* @author Julien Le Dem
*
*/
-public class ParquetLoader extends LoadFunc implements LoadMetadata {
+public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
private static final Log LOG = Log.getLog(ParquetLoader.class);
- private final String requestedSchema;
+ private final String requestedSchemaStr;
+ private Schema requestedSchema;
private boolean setLocationHasBeenCalled = false;
private RecordReader<Void, Tuple> reader;
@@ -62,15 +69,15 @@
* To read the content in its original schema
*/
public ParquetLoader() {
- this.requestedSchema = null;
+ this(null);
}
/**
* To read only a subset of the columns in the file
- * @param requestedSchema a subset of the original pig schema in the file
+ * @param requestedSchemaStr a subset of the original pig schema in the file
*/
- public ParquetLoader(String requestedSchema) {
- this.requestedSchema = requestedSchema;
+ public ParquetLoader(String requestedSchemaStr) {
+ this.requestedSchemaStr = requestedSchemaStr;
}
@Override
@@ -78,7 +85,18 @@
LOG.debug("LoadFunc.setLocation(" + location + ", " + job + ")");
setInput(location, job);
if (requestedSchema != null) {
- ContextUtil.getConfiguration(job).set(PARQUET_PIG_REQUESTED_SCHEMA, requestedSchema);
+ ContextUtil.getConfiguration(job).set(PARQUET_PIG_REQUESTED_SCHEMA, ObjectSerializer.serialize(requestedSchema));
+ } else if (requestedSchemaStr != null){
+ // request for the full schema (or requestedschema )
+ ContextUtil.getConfiguration(job).set(PARQUET_PIG_REQUESTED_SCHEMA, ObjectSerializer.serialize(schema));
+ }
+ }
+
+ static Schema parsePigSchema(String pigSchemaString) {
+ try {
+ return pigSchemaString == null ? null : Utils.getSchemaFromString(pigSchemaString);
+ } catch (ParserException e) {
+ throw new SchemaConversionException("could not parse Pig schema: " + pigSchemaString, e);
}
}
@@ -141,14 +159,14 @@
LOG.debug("LoadMetadata.getSchema(" + location + ", " + job + ")");
setInput(location, job);
if (schema == null) {
- if (requestedSchema == null) {
+ if (requestedSchemaStr == null) {
// no requested schema => use the schema from the file
final FileMetaData globalMetaData = getParquetInputFormat().getGlobalMetaData(job);
// TODO: if no Pig schema in file: generate one from the Parquet schema
schema = TupleReadSupport.getPigSchemaFromFile(globalMetaData.getSchema(), globalMetaData.getKeyValueMetaData());
} else {
// there was a schema requested => use that
- schema = Utils.getSchemaFromString(requestedSchema);
+ schema = Utils.getSchemaFromString(requestedSchemaStr);
}
}
return new ResourceSchema(schema);
@@ -166,5 +184,44 @@
public void setPartitionFilter(Expression expression) throws IOException {
LOG.debug("LoadMetadata.setPartitionFilter(" + expression + ")");
}
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+ @Override
+ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
+ throws FrontendException {
+ if (requiredFieldList == null)
+ return null;
+ requestedSchema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
+ return new RequiredFieldResponse(true);
+ }
+
+ private Schema getSchemaFromRequiredFieldList(Schema schema, List<RequiredField> fieldList)
+ throws FrontendException {
+ Schema s = new Schema();
+ for (RequiredField rf : fieldList) {
+ FieldSchema f;
+ try {
+ f = schema.getField(rf.getAlias()).clone();
+ } catch (CloneNotSupportedException e) {
+ throw new FrontendException("Clone not supported for the fieldschema", e);
+ }
+ if (rf.getSubFields() == null) {
+ s.add(f);
+ } else {
+ Schema innerSchema = getSchemaFromRequiredFieldList(f.schema, rf.getSubFields());
+ if (innerSchema == null) {
+ return null;
+ } else {
+ f.schema = innerSchema;
+ s.add(f);
+ }
+ }
+ }
+ return s;
+ }
+
}
diff --git a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
index cbc7508..19c3d1a 100644
--- a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
@@ -15,11 +15,13 @@
*/
package parquet.pig;
+import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
@@ -50,8 +52,14 @@
* @return the pig schema requested by the user or null if none.
*/
static Schema getRequestedPigSchema(Configuration configuration) {
- String pigSchemaString = configuration.get(PARQUET_PIG_REQUESTED_SCHEMA);
- return parsePigSchema(pigSchemaString);
+ Schema pigSchema = null;
+ try {
+ String schemaStr = configuration.get(PARQUET_PIG_REQUESTED_SCHEMA);
+ pigSchema = (Schema)ObjectSerializer.deserialize(schemaStr);
+ } catch (IOException ioe) {
+ throw new SchemaConversionException("could not get pig schema from configuration ", ioe);
+ }
+ return pigSchema;
}
static Schema parsePigSchema(String pigSchemaString) {
diff --git a/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
new file mode 100644
index 0000000..e5fde6c
--- /dev/null
+++ b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
@@ -0,0 +1,35 @@
+package parquet.pig;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.data.DataType;
+import org.junit.Test;
+
+public class TestParquetLoader {
+ @Test
+ public void testSchema() throws Exception {
+ String location = "target/out";
+ ParquetLoader pLoader = new ParquetLoader("a:chararray, b:{t:(c:chararray, d:chararray)}, p:[(q:chararray, r:chararray)]");
+ Job job = new Job();
+ pLoader.getSchema(location, job);
+ RequiredFieldList list = new RequiredFieldList();
+ RequiredField field = new RequiredField("a", 0, null, DataType.CHARARRAY);
+ list.add(field);
+ field = new RequiredField("b", 0,
+ Arrays.asList(new RequiredField("t", 0,
+ Arrays.asList(new RequiredField("d", 1, null, DataType.CHARARRAY)),
+ DataType.TUPLE)),
+ DataType.BAG);
+ list.add(field);
+ pLoader.pushProjection(list);
+ pLoader.setLocation(location, job);
+
+ assertEquals("{a: chararray,b: {t: (d: chararray)}}",
+ TupleReadSupport.getRequestedPigSchema(job.getConfiguration()).toString());
+ }
+}