DRILL-7723: Add Excel Metadata as Implicit Fields
diff --git a/contrib/format-excel/README.md b/contrib/format-excel/README.md
index ac08c66..1a59428 100644
--- a/contrib/format-excel/README.md
+++ b/contrib/format-excel/README.md
@@ -48,6 +48,27 @@
 ON t1.id = t2.id
 ```
 
+### Implicit Columns
+Drill includes several columns of file metadata in the query results. These fields are **not** included in star queries and thus must be explicitly listed in a query. 
+
+The fields are:
+
+    _category
+    _content_status
+    _content_type;
+    _creator
+    _description
+    _identifier
+    _keywords
+    _last_modified_by_user
+    _revision
+    _subject
+    _title
+    _created
+    _last_printed
+    _modified
+
+
 ### Known Limitations:
 At present, Drill requires that all columns be of the same data type. If they are not, Drill will throw an exception upon trying to read a column of mixed data type. If you are
  trying to query data with heterogenoeus columns, it will be necessary to set `allTextMode` to `true`. 
diff --git a/contrib/format-excel/pom.xml b/contrib/format-excel/pom.xml
index c8be2fe..cbafbc7 100644
--- a/contrib/format-excel/pom.xml
+++ b/contrib/format-excel/pom.xml
@@ -67,7 +67,7 @@
     <dependency>
       <groupId>com.github.pjfanning</groupId>
       <artifactId>excel-streaming-reader</artifactId>
-      <version>2.3.2</version>
+      <version>2.3.3</version>
     </dependency>
   </dependencies>
   <build>
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 88d124b..22ac590 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -19,10 +19,13 @@
 package org.apache.drill.exec.store.excel;
 
 import com.github.pjfanning.xlsx.StreamingReader;
+import com.github.pjfanning.xlsx.impl.StreamingWorkbook;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
@@ -32,84 +35,117 @@
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.poi.ooxml.POIXMLProperties.CoreProperties;
 import org.apache.poi.ss.usermodel.Cell;
 import org.apache.poi.ss.usermodel.CellType;
 import org.apache.poi.ss.usermodel.DateUtil;
 import org.apache.poi.ss.usermodel.Row;
 import org.apache.poi.ss.usermodel.Sheet;
 import org.apache.poi.ss.usermodel.Workbook;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.InputStream;
-import java.util.Date;
-import java.util.Iterator;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.TimeZone;
 
 public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
-  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ExcelBatchReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(ExcelBatchReader.class);
 
   private static final String SAFE_WILDCARD = "_$";
-
   private static final String SAFE_SEPARATOR = "_";
-
   private static final String PARSER_WILDCARD = ".*";
-
   private static final String HEADER_NEW_LINE_REPLACEMENT = "__";
-
   private static final String MISSING_FIELD_NAME_HEADER = "field_";
 
-  private static final int ROW_CACHE_SIZE = 100;
+  private enum IMPLICIT_STRING_COLUMN {
+    CATEGORY("_category"),
+    CONTENT_STATUS("_content_status"),
+    CONTENT_TYPE("_content_type"),
+    CREATOR("_creator"),
+    DESCRIPTION("_description"),
+    IDENTIFIER("_identifier"),
+    KEYWORDS("_keywords"),
+    LAST_MODIFIED_BY_USER("_last_modified_by_user"),
+    REVISION("_revision"),
+    SUBJECT("_subject"),
+    TITLE("_title");
 
+    private final String fieldName;
+
+    IMPLICIT_STRING_COLUMN(String fieldName) {
+      this.fieldName = fieldName;
+    }
+
+    public String getFieldName() {
+      return fieldName;
+    }
+  }
+
+  private enum IMPLICIT_TIMESTAMP_COLUMN {
+    /**
+     * The file created date
+     */
+    CREATED("_created"),
+    /**
+     * Date the file was last printed, null if never printed.
+     */
+    LAST_PRINTED("_last_printed"),
+    /**
+     * Date of last modification
+     */
+    MODIFIED("_modified");
+
+    private final String fieldName;
+
+    IMPLICIT_TIMESTAMP_COLUMN(String fieldName) {
+      this.fieldName = fieldName;
+    }
+
+    public String getFieldName() {
+      return fieldName;
+    }
+  }
+
+  private static final int ROW_CACHE_SIZE = 100;
   private static final int BUFFER_SIZE = 4096;
 
   private final ExcelReaderConfig readerConfig;
-
   private Sheet sheet;
-
   private Row currentRow;
-
-  private Workbook workbook;
-
+  private StreamingWorkbook streamingWorkbook;
   private InputStream fsStream;
-
   private List<String> excelFieldNames;
-
   private List<ScalarWriter> columnWriters;
-
   private List<CellWriter> cellWriterArray;
-
+  private List<ScalarWriter> metadataColumnWriters;
   private Iterator<Row> rowIterator;
-
   private RowSetLoader rowWriter;
-
   private int totalColumnCount;
-
   private boolean firstLine;
-
   private FileSplit split;
-
   private int recordCount;
+  private Map<String, String> stringMetadata;
+  private Map<String, Date> dateMetadata;
+  private CustomErrorContext errorContext;
+
 
   static class ExcelReaderConfig {
     final ExcelFormatPlugin plugin;
-
     final int headerRow;
-
     final int lastRow;
-
     final int firstColumn;
-
     final int lastColumn;
-
     final boolean allTextMode;
-
     final String sheetName;
 
     ExcelReaderConfig(ExcelFormatPlugin plugin) {
@@ -131,6 +167,7 @@
   @Override
   public boolean open(FileSchemaNegotiator negotiator) {
     split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
     ResultSetLoader loader = negotiator.build();
     rowWriter = loader.writer();
     openFile(negotiator);
@@ -147,18 +184,26 @@
       fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
 
       // Open streaming reader
-      workbook = StreamingReader.builder()
+      Workbook workbook = StreamingReader.builder()
         .rowCacheSize(ROW_CACHE_SIZE)
         .bufferSize(BUFFER_SIZE)
+        .setReadCoreProperties(true)
         .open(fsStream);
+
+      streamingWorkbook = (StreamingWorkbook) workbook;
+
     } catch (Exception e) {
       throw UserException
         .dataReadError(e)
         .message("Failed to open open input file: %s", split.getPath().toString())
         .addContext(e.getMessage())
+        .addContext(errorContext)
         .build(logger);
     }
     sheet = getSheet();
+
+    // Populate Metadata Hashmap
+    populateMetadata(streamingWorkbook);
   }
 
   /**
@@ -179,9 +224,13 @@
       return;
     }
 
+    columnWriters = new ArrayList<>();
+    metadataColumnWriters = new ArrayList<>();
+
     rowIterator = sheet.iterator();
 
     // Get the number of columns.
+    // This menthod also advances the row reader to the location of the first row of data
     columnCount = getColumnCount();
 
     excelFieldNames = new ArrayList<>();
@@ -198,8 +247,6 @@
         excelFieldNames.add(i, missingFieldName);
         i++;
       }
-      columnWriters = new ArrayList<>(columnCount);
-
       builder.buildSchema();
     } else if (rowIterator.hasNext()) {
       //Get the header row and column count
@@ -247,7 +294,7 @@
         colPosition++;
       }
     }
-    columnWriters = new ArrayList<>();
+    addMetadataToSchema(builder);
     builder.buildSchema();
   }
 
@@ -258,7 +305,7 @@
   private Sheet getSheet() {
     int sheetIndex = 0;
     if (!readerConfig.sheetName.isEmpty()) {
-      sheetIndex = workbook.getSheetIndex(readerConfig.sheetName);
+      sheetIndex = streamingWorkbook.getSheetIndex(readerConfig.sheetName);
     }
 
     //If the sheet name is not valid, throw user exception
@@ -266,9 +313,10 @@
       throw UserException
         .validationError()
         .message("Could not open sheet " + readerConfig.sheetName)
+        .addContext(errorContext)
         .build(logger);
     } else {
-      return workbook.getSheetAt(sheetIndex);
+      return streamingWorkbook.getSheetAt(sheetIndex);
     }
   }
 
@@ -335,8 +383,14 @@
     }
 
     if (firstLine) {
+      // Add metadata to column array
+      addMetadataWriters();
       firstLine = false;
     }
+
+    // Write the metadata
+    writeMetadata();
+
     rowWriter.save();
     recordCount++;
 
@@ -348,6 +402,32 @@
     }
   }
 
+  private void populateMetadata(StreamingWorkbook streamingWorkbook) {
+
+    CoreProperties fileMetadata = streamingWorkbook.getCoreProperties();
+
+    stringMetadata = new HashMap<>();
+    dateMetadata = new HashMap<>();
+
+    // Populate String metadata columns
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.CATEGORY.getFieldName(), fileMetadata.getCategory());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.CONTENT_STATUS.getFieldName(), fileMetadata.getContentStatus());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.CONTENT_TYPE.getFieldName(), fileMetadata.getContentType());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.CREATOR.getFieldName(), fileMetadata.getCreator());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.DESCRIPTION.getFieldName(), fileMetadata.getDescription());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.IDENTIFIER.getFieldName(), fileMetadata.getIdentifier());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.KEYWORDS.getFieldName(), fileMetadata.getKeywords());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.LAST_MODIFIED_BY_USER.getFieldName(), fileMetadata.getLastModifiedByUser());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.REVISION.getFieldName(), fileMetadata.getRevision());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.SUBJECT.getFieldName(), fileMetadata.getSubject());
+    stringMetadata.put(IMPLICIT_STRING_COLUMN.TITLE.getFieldName(), fileMetadata.getTitle());
+
+    // Populate Timestamp columns
+    dateMetadata.put(IMPLICIT_TIMESTAMP_COLUMN.CREATED.getFieldName(), fileMetadata.getCreated());
+    dateMetadata.put(IMPLICIT_TIMESTAMP_COLUMN.LAST_PRINTED.getFieldName(), fileMetadata.getLastPrinted());
+    dateMetadata.put(IMPLICIT_TIMESTAMP_COLUMN.MODIFIED.getFieldName(), fileMetadata.getModified());
+  }
+
   /**
    * Function to populate the column array
    * @param cell The input cell object
@@ -360,24 +440,36 @@
 
     // Case for empty data cell in first row.  In this case, fall back to string.
     if (cell == null) {
-      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR, false);
       return;
     }
 
     CellType cellType = cell.getCellType();
     if (cellType == CellType.STRING || readerConfig.allTextMode) {
-      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR, false);
     } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
       // Case if the column is a date or time
-      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP);
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP, false);
     } else if (cellType == CellType.NUMERIC || cellType == CellType.FORMULA) {
       // Case if the column is numeric
-      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8);
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8, false);
     } else {
       logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING.");
     }
   }
 
+  private void addMetadataToSchema(SchemaBuilder builder) {
+    // Add String Metadata columns
+    for (IMPLICIT_STRING_COLUMN name : IMPLICIT_STRING_COLUMN.values()) {
+      makeColumn(builder, name.getFieldName(), MinorType.VARCHAR);
+    }
+
+    // Add Date Column Names
+    for (IMPLICIT_TIMESTAMP_COLUMN name : IMPLICIT_TIMESTAMP_COLUMN.values()) {
+      makeColumn(builder, name.getFieldName(), MinorType.TIMESTAMP);
+    }
+  }
+
   private void makeColumn(SchemaBuilder builder, String name, TypeProtos.MinorType type) {
     // Verify supported types
     switch (type) {
@@ -395,40 +487,79 @@
           .message("Undefined column types")
           .addContext("Field name", name)
           .addContext("Type", type.toString())
+          .addContext(errorContext)
           .build(logger);
     }
   }
 
-  private void addColumnToArray(TupleWriter rowWriter, String name, TypeProtos.MinorType type) {
+  private void addColumnToArray(TupleWriter rowWriter, String name, TypeProtos.MinorType type, boolean isMetadata) {
     int index = rowWriter.tupleSchema().index(name);
     if (index == -1) {
       ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, TypeProtos.DataMode.OPTIONAL);
+      if (isMetadata) {
+        colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      }
       index = rowWriter.addColumn(colSchema);
     } else {
       return;
     }
 
-    columnWriters.add(rowWriter.scalar(index));
-    if (readerConfig.allTextMode && type == MinorType.FLOAT8) {
-      cellWriterArray.add(new NumericStringWriter(columnWriters.get(index)));
-    } else if (type == MinorType.VARCHAR) {
-      cellWriterArray.add(new StringCellWriter(columnWriters.get(index)));
-    } else if (type == MinorType.FLOAT8) {
-      cellWriterArray.add(new NumericCellWriter(columnWriters.get(index)));
-    } else if (type == MinorType.TIMESTAMP) {
-      cellWriterArray.add(new TimestampCellWriter(columnWriters.get(index)));
+    if (isMetadata) {
+      metadataColumnWriters.add(rowWriter.scalar(index));
+    } else {
+      columnWriters.add(rowWriter.scalar(index));
+      if (readerConfig.allTextMode && type == MinorType.FLOAT8) {
+        cellWriterArray.add(new NumericStringWriter(columnWriters.get(index)));
+      } else if (type == MinorType.VARCHAR) {
+        cellWriterArray.add(new StringCellWriter(columnWriters.get(index)));
+      } else if (type == MinorType.FLOAT8) {
+        cellWriterArray.add(new NumericCellWriter(columnWriters.get(index)));
+      } else if (type == MinorType.TIMESTAMP) {
+        cellWriterArray.add(new TimestampCellWriter(columnWriters.get(index)));
+      }
+    }
+  }
+
+  private void addMetadataWriters() {
+    for (IMPLICIT_STRING_COLUMN colName : IMPLICIT_STRING_COLUMN.values()) {
+      addColumnToArray(rowWriter, colName.getFieldName(), MinorType.VARCHAR, true);
+    }
+    for (IMPLICIT_TIMESTAMP_COLUMN colName : IMPLICIT_TIMESTAMP_COLUMN.values()) {
+      addColumnToArray(rowWriter, colName.getFieldName(), MinorType.TIMESTAMP, true);
+    }
+  }
+
+  private void writeMetadata() {
+    for (IMPLICIT_STRING_COLUMN column : IMPLICIT_STRING_COLUMN.values()) {
+      String value = stringMetadata.get(column.getFieldName());
+      int index = column.ordinal();
+      if (value == null) {
+        metadataColumnWriters.get(index).setNull();
+      } else {
+        metadataColumnWriters.get(index).setString(value);
+      }
+    }
+
+    for (IMPLICIT_TIMESTAMP_COLUMN column : IMPLICIT_TIMESTAMP_COLUMN.values()) {
+      Date timeValue = dateMetadata.get(column.getFieldName());
+      int index = column.ordinal() + IMPLICIT_STRING_COLUMN.values().length;
+      if (timeValue == null) {
+        metadataColumnWriters.get(index).setNull();
+      } else {
+        metadataColumnWriters.get(index).setTimestamp(new Instant(timeValue));
+      }
     }
   }
 
   @Override
   public void close() {
-    if (workbook != null) {
+    if (streamingWorkbook != null) {
       try {
-        workbook.close();
+        streamingWorkbook.close();
       } catch (IOException e) {
         logger.warn("Error when closing Excel Workbook resource: {}", e.getMessage());
       }
-      workbook = null;
+      streamingWorkbook = null;
     }
 
     if (fsStream != null) {
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index 3eed776..faecce8 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -18,29 +18,29 @@
 
 package org.apache.drill.exec.store.excel;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-
-import java.nio.file.Paths;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.categories.RowSetTests;
 
+import java.nio.file.Paths;
+
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
 
 @Category(RowSetTests.class)
 public class TestExcelFormat extends ClusterTest {
@@ -98,6 +98,41 @@
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 
+
+  @Test
+  public void testExplicitMetadataQuery() throws RpcException {
+    String sql =
+      "SELECT _category, _content_status, _content_type, _creator, _description, _identifier, _keywords, _last_modified_by_user, _revision, _subject, _title, _created," +
+        "_last_printed, _modified FROM cp.`excel/test_data.xlsx` LIMIT 1";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("_category", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_content_status", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_content_type", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_creator", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_description", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_identifier", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_keywords", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_last_modified_by_user", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_revision", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_subject", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_title", TypeProtos.MinorType.VARCHAR)
+      .addNullable("_created", TypeProtos.MinorType.TIMESTAMP)
+      .addNullable("_last_printed", TypeProtos.MinorType.TIMESTAMP)
+      .addNullable("_modified", TypeProtos.MinorType.TIMESTAMP)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("test_category", null, null, "test_author", null, null, "test_keywords", "Microsoft Office User", null, "test_subject", "test_title",
+        1571602578000L, null,1588212319000L)
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
   @Test
   public void testExplicitSomeQuery() throws RpcException {
     String sql = "SELECT id, first_name, order_count FROM cp.`excel/test_data.xlsx`";
diff --git a/contrib/format-excel/src/test/resources/excel/test_data.xlsx b/contrib/format-excel/src/test/resources/excel/test_data.xlsx
index a04a6bd..9841bad 100644
--- a/contrib/format-excel/src/test/resources/excel/test_data.xlsx
+++ b/contrib/format-excel/src/test/resources/excel/test_data.xlsx
Binary files differ