DRILL-8305: Add Implicit Fields to Google Sheets Reader (#2648)

diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java
index 7b6ae18..367c32e 100644
--- a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsBatchReader.java
@@ -23,12 +23,15 @@
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsBigIntegerColumnWriter;
 import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter.GoogleSheetsBooleanColumnWriter;
@@ -42,10 +45,14 @@
 import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsRangeBuilder;
 import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
 import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -56,6 +63,9 @@
   // rows. There is conflicting information about this online, but during testing, ranges with more than
   // 1000 rows would throw invalid request errors.
   private static final int BATCH_SIZE = 1000;
+  private static final String SHEET_COLUMN_NAME = "_sheets";
+
+  private static final List<String> IMPLICIT_FIELDS =  Arrays.asList(SHEET_COLUMN_NAME);
 
   private final GoogleSheetsStoragePluginConfig config;
   private final GoogleSheetsSubScan subScan;
@@ -64,7 +74,11 @@
   private final Sheets service;
   private final GoogleSheetsRangeBuilder rangeBuilder;
   private final String sheetID;
+  private final List<String> sheetNames;
   private CustomErrorContext errorContext;
+  private ScalarWriter sheetNameWriter;
+
+  private TupleMetadata schema;
   private Map<String, GoogleSheetsColumn> columnMap;
   private RowSetLoader rowWriter;
 
@@ -74,6 +88,7 @@
     this.projectedColumns = subScan.getColumns();
     this.service = plugin.getSheetsService(subScan.getUserName());
     this.sheetID = subScan.getScanSpec().getSheetID();
+    this.sheetNames = new ArrayList<>();
     try {
       List<Sheet> sheetList = GoogleSheetsUtils.getSheetList(service, sheetID);
       this.sheet = sheetList.get(subScan.getScanSpec().getTabIndex());
@@ -104,8 +119,15 @@
     // Build Schema
     String tableName = subScan.getScanSpec().getTableName();
     String pluginName = subScan.getScanSpec().getSheetID();
+
     try {
       columnMap = GoogleSheetsUtils.getColumnMap(GoogleSheetsUtils.getFirstRows(service, pluginName, tableName), projectedColumns, config.allTextMode());
+
+      // Get sheet list for metadata.
+      List<Sheet> sheetList = GoogleSheetsUtils.getSheetList(service, pluginName);
+      for (Sheet sheet : sheetList) {
+        sheetNames.add(sheet.getProperties().getTitle());
+      }
     } catch (IOException e) {
       throw UserException.validationError(e)
         .message("Error building schema: " + e.getMessage())
@@ -148,12 +170,17 @@
     logger.debug(rangeBuilder.toString());
 
     // Add provided schema if present.
-    TupleMetadata schema;
     if (negotiator.hasProvidedSchema()) {
       schema = negotiator.providedSchema();
     } else {
       schema = GoogleSheetsUtils.buildSchema(columnMap);
     }
+
+    // Add implicit metadata to schema
+    ColumnMetadata sheetImplicitColumn = MetadataUtils.newScalar(SHEET_COLUMN_NAME, MinorType.VARCHAR, DataMode.REPEATED);
+    sheetImplicitColumn.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+    schema.addColumn(sheetImplicitColumn);
+
     negotiator.tableSchema(schema, true);
     ResultSetLoader resultLoader = negotiator.build();
     // Create ScalarWriters
@@ -165,6 +192,10 @@
       // Build writers
       MinorType dataType;
       for (GoogleSheetsColumn column : columnMap.values()) {
+        // Ignore metadata columns.
+        if (column.isMetadata()) {
+          continue;
+        }
         dataType = column.getDrillDataType();
         if (dataType == MinorType.FLOAT8) {
           column.setWriter(new GoogleSheetsNumericColumnWriter(rowWriter, column.getColumnName()));
@@ -207,7 +238,11 @@
         data = GoogleSheetsUtils.getDataFromRange(service, sheetID, range);
       } else {
         List<String> batches = rangeBuilder.nextBatch();
-        data = GoogleSheetsUtils.getBatchData(service, sheetID, batches);
+        if (!batches.isEmpty()) {
+          data = GoogleSheetsUtils.getBatchData(service, sheetID, batches);
+        } else {
+          data = Collections.emptyList();
+        }
       }
     } catch (IOException e) {
       throw UserException.dataReadError(e)
@@ -223,6 +258,14 @@
     if (config.getExtractHeaders()) {
       startIndex = 1;
     }
+
+    // Edge Case:  If only metadata columns are projected, project one row and return
+    if (data.size() == 0 && onlyMetadata(schema)) {
+      rowWriter.start();
+      projectMetadata();
+      rowWriter.save();
+    }
+
     for (int rowIndex = startIndex; rowIndex < data.size(); rowIndex++) {
       rowWriter.start();
       row = data.get(rowIndex);
@@ -239,6 +282,7 @@
         }
         column.load(value);
       }
+      projectMetadata();
       rowWriter.save();
     }
 
@@ -250,6 +294,36 @@
     return true;
   }
 
+  private void projectMetadata() {
+    // Add metadata
+    if (sheetNameWriter == null) {
+      int sheetColumnIndex = rowWriter.tupleSchema().index(SHEET_COLUMN_NAME);
+      if (sheetColumnIndex == -1) {
+        ColumnMetadata colSchema = MetadataUtils.newScalar(SHEET_COLUMN_NAME, MinorType.VARCHAR, DataMode.REPEATED);
+        colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      }
+      sheetNameWriter = rowWriter.column(SHEET_COLUMN_NAME).array().scalar();
+    }
+
+    for (String sheetName : sheetNames) {
+      sheetNameWriter.setString(sheetName);
+    }
+  }
+
+  /**
+   * Returns true if the projected schema only contains implicit metadata columns.
+   * @param schema {@link TupleMetadata} The active schema
+   * @return True if the schema is only metadata, false otherwise.
+   */
+  private boolean onlyMetadata(TupleMetadata schema) {
+    for (MaterializedField field: schema.toFieldList()) {
+      if (!IMPLICIT_FIELDS.contains(field.getName())){
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void setColumnWritersFromProvidedSchema(TupleMetadata schema) {
     List<MaterializedField> fieldList = schema.toFieldList();
 
@@ -259,6 +333,11 @@
       dataType = field.getType().getMinorType();
       column = columnMap.get(field.getName());
 
+      // Do not create a column writer object for metadata columns
+      if (column == null || column.isMetadata()) {
+        continue;
+      }
+
       // Get the field
       if (dataType == MinorType.FLOAT8) {
         column.setWriter(new GoogleSheetsNumericColumnWriter(rowWriter, column.getColumnName()));
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java
index d348a97..b9e726a 100644
--- a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsColumn.java
@@ -20,9 +20,11 @@
 
 
 import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.store.googlesheets.columns.GoogleSheetsColumnWriter;
 import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils;
+import org.apache.drill.exec.store.googlesheets.utils.GoogleSheetsUtils.DATA_TYPES;
 
 import java.util.Objects;
 
@@ -42,9 +44,12 @@
   private final String columnName;
   private final GoogleSheetsUtils.DATA_TYPES dataType;
   private final MinorType drillDataType;
+  private final DataMode dataMode;
   private final int columnIndex;
   private final int drillColumnIndex;
   private final String columnLetter;
+  private final boolean isMetadata;
+
   private GoogleSheetsColumnWriter writer;
 
   public GoogleSheetsColumn(String columnName, GoogleSheetsUtils.DATA_TYPES dataType, int googleColumnIndex, int drillColumnIndex) {
@@ -54,6 +59,24 @@
     this.dataType = dataType;
     this.columnLetter = GoogleSheetsUtils.columnToLetter(googleColumnIndex + 1);
     this.drillDataType = getDrillDataType(dataType);
+    this.dataMode = DataMode.OPTIONAL;
+    this.isMetadata = false;
+  }
+
+  public GoogleSheetsColumn(String columnName, GoogleSheetsUtils.DATA_TYPES dataType, int drillColumnIndex, boolean isMetadata) {
+    // Constructor for metadata fields.
+    this.columnName = columnName;
+    this.columnIndex = -1;
+    this.drillColumnIndex = drillColumnIndex;
+    this.dataType = dataType;
+    this.columnLetter = null;
+    this.drillDataType = getDrillDataType(dataType);
+    if (dataType == DATA_TYPES.VARCHAR_REPEATED) {
+      dataMode = DataMode.REPEATED;
+    } else {
+      dataMode = DataMode.OPTIONAL;
+    }
+    this.isMetadata = isMetadata;
   }
 
   private MinorType getDrillDataType(GoogleSheetsUtils.DATA_TYPES dataType) {
@@ -89,12 +112,18 @@
 
   public String getColumnLetter() { return columnLetter; }
 
+  public boolean isMetadata() {
+    return this.isMetadata;
+  }
+
   public String getColumnName() {
     return columnName;
   }
 
   public void load(Object value) {
-    writer.load(value);
+    if (! isMetadata) {
+      writer.load(value);
+    }
   }
 
   @Override
@@ -104,6 +133,7 @@
       .field("columnIndex", columnIndex)
       .field("columnLetter", columnLetter)
       .field("data type", dataType)
+      .field("isMetadata", isMetadata)
       .toString();
   }
   @Override
@@ -117,11 +147,12 @@
     return Objects.equals(columnName, otherColumn.columnName) &&
       Objects.equals(columnIndex, otherColumn.columnIndex) &&
       Objects.equals(columnLetter, otherColumn.columnLetter) &&
-      Objects.equals(dataType, otherColumn.dataType);
+      Objects.equals(dataType, otherColumn.dataType) &&
+      Objects.equals(isMetadata, otherColumn.isMetadata);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(columnName, columnIndex, columnLetter, dataType);
+    return Objects.hash(columnName, columnIndex, columnLetter, dataType, isMetadata);
   }
 }
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java
index 18b0531..f5e8417 100644
--- a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsRangeBuilder.java
@@ -203,6 +203,11 @@
     StringBuilder batch = new StringBuilder();
 
     for (GoogleSheetsColumnRange columnRange : projectedRanges) {
+      // The start column index will be -1 for metadata columns. Since we don't want
+      // metadata columns included in the batch, we skip them.
+      if (columnRange.getStartColIndex() == null  && columnRange.getEndColIndex() == -1) {
+        continue;
+      }
       batch.append("'")
         .append(sheetName)
         .append("'!")
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java
index 1788fbd..a121568 100644
--- a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/utils/GoogleSheetsUtils.java
@@ -102,7 +102,11 @@
     /**
      * A field containing timestamps.
      */
-    TIMESTAMP
+    TIMESTAMP,
+    /**
+     * A field containing a list of strings.  Only used for implicit columns.
+     */
+    VARCHAR_REPEATED
   }
 
   /**
@@ -379,6 +383,10 @@
     int currentIndex;
     GoogleSheetsColumnRange currentRange = new GoogleSheetsColumnRange(sheetName);
     for (GoogleSheetsColumn column : columnMap.values()) {
+      // Exclude metadata columns
+      if (column.isMetadata()) {
+        continue;
+      }
       currentIndex = column.getColumnIndex();
 
       // Edge case for first range
diff --git a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java
index d1e6a61..bfc529a 100644
--- a/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java
+++ b/contrib/storage-googlesheets/src/test/java/org/apache/drill/exec/store/googlesheets/TestGoogleSheetsQueries.java
@@ -44,6 +44,7 @@
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -133,6 +134,81 @@
   }
 
   @Test
+  public void testImplicitFields() throws Exception {
+    // Tests special case of only implicit metadata fields being projected.
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT _sheets FROM googlesheets.`%s`.`MixedSheet` LIMIT 1", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addArray("_sheets", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow((Object) strArray("TestSheet1", "MixedSheet"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Ignore("Implicit columns have some projection issues. See DRILL-7080.  Once this is resolved, re-enable this test.")
+  @Test
+  public void testStarAndImplicitFields() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT *, _sheets FROM googlesheets.`%s`.`MixedSheet` LIMIT 3", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col2", MinorType.FLOAT8)
+      .addNullable("Col3", MinorType.DATE)
+      .addArray("_sheets", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow("Rosaline  Thales", 1.0, null, strArray("TestSheet1", "MixedSheet"))
+      .addRow("Abdolhossein  Detlev", 2.0001, LocalDate.parse("2020-04-30"), strArray("TestSheet1", "MixedSheet"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicitAndImplicitFields() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT Col1, Col3, _sheets FROM googlesheets.`%s`.`MixedSheet` LIMIT 3", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col1", MinorType.VARCHAR)
+      .addNullable("Col3", MinorType.DATE)
+      .addArray("_sheets", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow("Rosaline  Thales", null, strArray("TestSheet1", "MixedSheet"))
+      .addRow("Abdolhossein  Detlev", LocalDate.parse("2020-04-30"), strArray("TestSheet1", "MixedSheet"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
   public void testProjectPushdown() throws Exception {
     try {
       initializeTokens("googlesheets");
@@ -180,6 +256,36 @@
   }
 
   @Test
+  public void testWithExplicitColumnsInDifferentOrder() throws Exception {
+    try {
+      initializeTokens("googlesheets");
+    } catch (PluginException e) {
+      fail(e.getMessage());
+    }
+
+    String sql = String.format("SELECT Col3, Col1 FROM googlesheets.`%s`.`MixedSheet` WHERE `Col2` < 6.0", sheetID);
+    RowSet results = queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("Col3", MinorType.DATE)
+      .addNullable("Col1", MinorType.VARCHAR)
+      .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow(null, "Rosaline  Thales")
+      .addRow(LocalDate.parse("2020-04-30"), "Abdolhossein  Detlev")
+      .addRow(LocalDate.parse("2020-06-30"), null)
+      .addRow(LocalDate.parse("2021-01-15"), "Yunus  Elena")
+      .addRow(LocalDate.parse("2021-04-08"), "Swaran  Ohiyesa")
+      .addRow(LocalDate.parse("2021-06-28"), "Kalani  Godabert")
+      .addRow(LocalDate.parse("2021-07-09"), "Caishen  Origenes")
+      .addRow(LocalDate.parse("2021-11-05"), "Toufik  Gurgen")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
   public void testAggregateQuery() throws Exception {
     try {
       initializeTokens("googlesheets");