Merge branch 'master' of github.com:Parquet/parquet-mr into pages_refactoring
diff --git a/README.md b/README.md
index e8081f2..359cfc7 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@
   <tr><td>Bit Packing</td><td>YES</td><td></td></td><td></td><td>1.0</td></tr>
   <tr><td>Adaptive dictionary encoding</td><td>YES</td><td></td></td><td></td><td>1.0</td></tr>
   <tr><td>Complex structure support</td><td>YES</td><td></td></td><td></td><td>1.0</td></tr>
-  <tr><td>Predicate pushdown</td><td></td><td>YES (<a href ="https://github.com/Parquet/parquet-mr/pull/68">68</a>)</td></td><td></td><td>2.0</td></tr>
+  <tr><td>Predicate pushdown</td><td>YES (<a href ="https://github.com/Parquet/parquet-mr/pull/68">68</a>)</td><td></td></td><td></td><td>1.0</td></tr>
   <tr><td>Column stats</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>  <tr><td>Delta encoding</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>
   <tr><td>Native Protocol Buffers support</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>
   <tr><td>Index pages</td><td></td><td></td></td><td>YES</td><td>2.0</td></tr>
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index b6cb762..9e55185 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -201,17 +202,17 @@
           }
         }
         splits.add(new ParquetInputSplit(
-          fileStatus.getPath(),
-          hdfsBlock.getOffset(),
-          length,
-          hdfsBlock.getHosts(),
-          blocksForCurrentSplit,
-          fileMetaData.getSchema().toString(),
-          requestedSchema,
-          fileMetaData.getSchema().toString(),
-          fileMetaData.getKeyValueMetaData(),
-          readSupportMetadata
-          ));
+            fileStatus.getPath(),
+            hdfsBlock.getOffset(),
+            length,
+            hdfsBlock.getHosts(),
+            blocksForCurrentSplit,
+            fileMetaData.getSchema().toString(),
+            requestedSchema,
+            fileMetaData.getSchema().toString(),
+            fileMetaData.getKeyValueMetaData(),
+            readSupportMetadata
+            ));
       }
     }
     return splits;
@@ -256,6 +257,53 @@
     return splits;
   }
 
+  /*
+   * This is to support multi-level/recursive directory listing until
+   * MAPREDUCE-1577 is fixed.
+   */
+  @Override
+  protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+    return getAllFileRecursively(super.listStatus(jobContext),
+       ContextUtil.getConfiguration(jobContext));
+  }
+
+  private static List<FileStatus> getAllFileRecursively(
+      List<FileStatus> files, Configuration conf) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    int len = files.size();
+    for (int i = 0; i < len; ++i) {
+      FileStatus file = files.get(i);
+      if (file.isDir()) {
+        Path p = file.getPath();
+        FileSystem fs = p.getFileSystem(conf);
+        addInputPathRecursively(result, fs, p, hiddenFileFilter);
+      } else {
+        result.add(file);
+      }
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  private static void addInputPathRecursively(List<FileStatus> result,
+      FileSystem fs, Path path, PathFilter inputFilter)
+          throws IOException {
+    for (FileStatus stat: fs.listStatus(path, inputFilter)) {
+      if (stat.isDir()) {
+        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+      } else {
+        result.add(stat);
+      }
+    }
+  }
+
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+    public boolean accept(Path p){
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
   /**
    * @param jobContext the current job context
    * @return the footers for the files
@@ -263,15 +311,15 @@
    */
   public List<Footer> getFooters(JobContext jobContext) throws IOException {
     if (footers == null) {
-      footers = getFooters(ContextUtil.getConfiguration(jobContext), super.listStatus(jobContext));
+      footers = getFooters(ContextUtil.getConfiguration(jobContext), listStatus(jobContext));
     }
 
     return footers;
   }
 
   public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
-      LOG.debug("reading " + statuses.size() + " files");
-      return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
+    LOG.debug("reading " + statuses.size() + " files");
+    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
   }
 
   /**
diff --git a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
index a193472..7fb1ab5 100644
--- a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
@@ -18,6 +18,8 @@
 import static parquet.pig.TupleReadSupport.PARQUET_PIG_REQUESTED_SCHEMA;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -26,11 +28,15 @@
 import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
 
@@ -48,10 +54,11 @@
  * @author Julien Le Dem
  *
  */
-public class ParquetLoader extends LoadFunc implements LoadMetadata {
+public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
   private static final Log LOG = Log.getLog(ParquetLoader.class);
 
-  private final String requestedSchema;
+  private final String requestedSchemaStr;
+  private Schema requestedSchema;
 
   private boolean setLocationHasBeenCalled = false;
   private RecordReader<Void, Tuple> reader;
@@ -62,15 +69,15 @@
    * To read the content in its original schema
    */
   public ParquetLoader() {
-    this.requestedSchema = null;
+    this(null);
   }
 
   /**
    * To read only a subset of the columns in the file
-   * @param requestedSchema a subset of the original pig schema in the file
+   * @param requestedSchemaStr a subset of the original pig schema in the file
    */
-  public ParquetLoader(String requestedSchema) {
-    this.requestedSchema = requestedSchema;
+  public ParquetLoader(String requestedSchemaStr) {
+    this.requestedSchemaStr = requestedSchemaStr;
   }
 
   @Override
@@ -78,7 +85,18 @@
     LOG.debug("LoadFunc.setLocation(" + location + ", " + job + ")");
     setInput(location, job);
     if (requestedSchema != null) {
-      ContextUtil.getConfiguration(job).set(PARQUET_PIG_REQUESTED_SCHEMA, requestedSchema);
+      ContextUtil.getConfiguration(job).set(PARQUET_PIG_REQUESTED_SCHEMA, ObjectSerializer.serialize(requestedSchema));
+    } else if (requestedSchemaStr != null){
+      // request for the full schema (or requestedschema )
+      ContextUtil.getConfiguration(job).set(PARQUET_PIG_REQUESTED_SCHEMA, ObjectSerializer.serialize(schema));
+    }
+  }
+  
+  static Schema parsePigSchema(String pigSchemaString) {
+    try {
+      return pigSchemaString == null ? null : Utils.getSchemaFromString(pigSchemaString);
+    } catch (ParserException e) {
+      throw new SchemaConversionException("could not parse Pig schema: " + pigSchemaString, e);
     }
   }
 
@@ -141,14 +159,14 @@
     LOG.debug("LoadMetadata.getSchema(" + location + ", " + job + ")");
     setInput(location, job);
     if (schema == null) {
-      if (requestedSchema == null) {
+      if (requestedSchemaStr == null) {
         // no requested schema => use the schema from the file
         final FileMetaData globalMetaData = getParquetInputFormat().getGlobalMetaData(job);
         // TODO: if no Pig schema in file: generate one from the Parquet schema
         schema = TupleReadSupport.getPigSchemaFromFile(globalMetaData.getSchema(), globalMetaData.getKeyValueMetaData());
       } else {
         // there was a schema requested => use that
-        schema = Utils.getSchemaFromString(requestedSchema);
+        schema = Utils.getSchemaFromString(requestedSchemaStr);
       }
     }
     return new ResourceSchema(schema);
@@ -166,5 +184,44 @@
   public void setPartitionFilter(Expression expression) throws IOException {
     LOG.debug("LoadMetadata.setPartitionFilter(" + expression + ")");
   }
+  
+  @Override
+  public List<OperatorSet> getFeatures() {
+    return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+  }
 
+  @Override
+  public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
+      throws FrontendException {
+    if (requiredFieldList == null)
+      return null;
+    requestedSchema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
+    return new RequiredFieldResponse(true);
+  }
+  
+  private Schema getSchemaFromRequiredFieldList(Schema schema, List<RequiredField> fieldList) 
+      throws FrontendException {
+    Schema s = new Schema();
+    for (RequiredField rf : fieldList) {
+      FieldSchema f;
+      try {
+         f = schema.getField(rf.getAlias()).clone();
+      } catch (CloneNotSupportedException e) {
+        throw new FrontendException("Clone not supported for the fieldschema", e);
+      }
+      if (rf.getSubFields() == null) {
+        s.add(f);
+      } else {
+        Schema innerSchema = getSchemaFromRequiredFieldList(f.schema, rf.getSubFields());
+        if (innerSchema == null) {
+          return null;
+        } else {
+          f.schema = innerSchema;
+          s.add(f);
+        }
+      }
+    }
+    return s;
+  }
+  
 }
diff --git a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
index cbc7508..19c3d1a 100644
--- a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
@@ -15,11 +15,13 @@
  */
 package parquet.pig;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
 
@@ -50,8 +52,14 @@
    * @return the pig schema requested by the user or null if none.
    */
   static Schema getRequestedPigSchema(Configuration configuration) {
-    String pigSchemaString = configuration.get(PARQUET_PIG_REQUESTED_SCHEMA);
-    return parsePigSchema(pigSchemaString);
+    Schema pigSchema = null;
+    try {
+      String schemaStr = configuration.get(PARQUET_PIG_REQUESTED_SCHEMA);
+      pigSchema = (Schema)ObjectSerializer.deserialize(schemaStr);
+    } catch (IOException ioe) {
+      throw new SchemaConversionException("could not get pig schema from configuration ", ioe);
+    }
+    return pigSchema;
   }
 
   static Schema parsePigSchema(String pigSchemaString) {
diff --git a/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
new file mode 100644
index 0000000..e5fde6c
--- /dev/null
+++ b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
@@ -0,0 +1,35 @@
+package parquet.pig;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.data.DataType;
+import org.junit.Test;
+
+public class TestParquetLoader {
+  @Test
+  public void testSchema() throws Exception {
+    String location = "target/out";
+    ParquetLoader pLoader = new ParquetLoader("a:chararray, b:{t:(c:chararray, d:chararray)}, p:[(q:chararray, r:chararray)]");
+    Job job = new Job();
+    pLoader.getSchema(location, job);
+    RequiredFieldList list = new RequiredFieldList();
+    RequiredField field = new RequiredField("a", 0, null, DataType.CHARARRAY);
+    list.add(field);
+    field = new RequiredField("b", 0, 
+        Arrays.asList(new RequiredField("t", 0, 
+            Arrays.asList(new RequiredField("d", 1, null, DataType.CHARARRAY)), 
+                DataType.TUPLE)), 
+                DataType.BAG);
+    list.add(field);
+    pLoader.pushProjection(list);
+    pLoader.setLocation(location, job);
+    
+    assertEquals("{a: chararray,b: {t: (d: chararray)}}",
+        TupleReadSupport.getRequestedPigSchema(job.getConfiguration()).toString());
+  }
+}