DRILL-7471: DESCRIBE TABLE command fails with ClassCastException when Metastore is enabled
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java
index 296d43d..feeeab7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeColumnUtils.java
@@ -21,6 +21,7 @@
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.metastore.statistics.BaseStatisticsKind;
 import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
+import org.apache.drill.metastore.statistics.ExactStatisticsConstants;
 import org.apache.drill.metastore.statistics.StatisticsKind;
 import org.apache.drill.metastore.statistics.TableStatisticsKind;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
@@ -30,19 +31,19 @@
 public class AnalyzeColumnUtils {
   private static final String COLUMN_SEPARATOR = "$";
 
-  public static final Map<StatisticsKind, SqlKind> COLUMN_STATISTICS_FUNCTIONS = ImmutableMap.<StatisticsKind, SqlKind>builder()
+  public static final Map<StatisticsKind<?>, SqlKind> COLUMN_STATISTICS_FUNCTIONS = ImmutableMap.<StatisticsKind<?>, SqlKind>builder()
       .put(ColumnStatisticsKind.MAX_VALUE, SqlKind.MAX)
       .put(ColumnStatisticsKind.MIN_VALUE, SqlKind.MIN)
-      .put(ColumnStatisticsKind.NON_NULL_COUNT, SqlKind.COUNT)
+      .put(ColumnStatisticsKind.NON_NULL_VALUES_COUNT, SqlKind.COUNT)
       .put(TableStatisticsKind.ROW_COUNT, SqlKind.COUNT)
       .build();
 
-  public static final Map<StatisticsKind, TypeProtos.MinorType> COLUMN_STATISTICS_TYPES = ImmutableMap.<StatisticsKind, TypeProtos.MinorType>builder()
-      .put(ColumnStatisticsKind.NON_NULL_COUNT, TypeProtos.MinorType.BIGINT)
+  public static final Map<StatisticsKind<?>, TypeProtos.MinorType> COLUMN_STATISTICS_TYPES = ImmutableMap.<StatisticsKind<?>, TypeProtos.MinorType>builder()
+      .put(ColumnStatisticsKind.NON_NULL_VALUES_COUNT, TypeProtos.MinorType.BIGINT)
       .put(TableStatisticsKind.ROW_COUNT, TypeProtos.MinorType.BIGINT)
       .build();
 
-  public static final Map<StatisticsKind, SqlKind> META_STATISTICS_FUNCTIONS = ImmutableMap.<StatisticsKind, SqlKind>builder()
+  public static final Map<StatisticsKind<?>, SqlKind> META_STATISTICS_FUNCTIONS = ImmutableMap.<StatisticsKind<?>, SqlKind>builder()
       .put(TableStatisticsKind.ROW_COUNT, SqlKind.COUNT)
       .build();
 
@@ -65,21 +66,21 @@
    * @param fullName the source of {@link StatisticsKind} to obtain
    * @return {@link StatisticsKind} instance
    */
-  public static StatisticsKind getStatisticsKind(String fullName) {
+  public static StatisticsKind<?> getStatisticsKind(String fullName) {
     String statisticsIdentifier = fullName.split("\\" + COLUMN_SEPARATOR)[1];
     switch (statisticsIdentifier) {
-      case "minValue":
+      case ExactStatisticsConstants.MIN_VALUE:
         return ColumnStatisticsKind.MIN_VALUE;
-      case "maxValue":
+      case ExactStatisticsConstants.MAX_VALUE:
         return ColumnStatisticsKind.MAX_VALUE;
-      case "nullsCount":
+      case ExactStatisticsConstants.NULLS_COUNT:
         return ColumnStatisticsKind.NULLS_COUNT;
-      case "nonnullrowcount":
-        return ColumnStatisticsKind.NON_NULL_COUNT;
-      case "rowCount":
+      case ExactStatisticsConstants.NON_NULL_VALUES_COUNT:
+        return ColumnStatisticsKind.NON_NULL_VALUES_COUNT;
+      case ExactStatisticsConstants.ROW_COUNT:
         return TableStatisticsKind.ROW_COUNT;
     }
-    return new BaseStatisticsKind(statisticsIdentifier, false);
+    return new BaseStatisticsKind<>(statisticsIdentifier, false);
   }
 
   /**
@@ -93,7 +94,7 @@
    * @param statisticsKind statistics kind
    * @return analyze-specific field name which includes actual column name and statistics kind information
    */
-  public static String getColumnStatisticsFieldName(String columnName, StatisticsKind statisticsKind) {
+  public static String getColumnStatisticsFieldName(String columnName, StatisticsKind<?> statisticsKind) {
     return String.format("column%1$s%2$s%1$s%3$s", COLUMN_SEPARATOR, statisticsKind.getName(), columnName);
   }
 
@@ -105,7 +106,7 @@
    * @param statisticsKind statistics kind
    * @return analyze-specific field name for metadata statistics
    */
-  public static String getMetadataStatisticsFieldName(StatisticsKind statisticsKind) {
+  public static String getMetadataStatisticsFieldName(StatisticsKind<?> statisticsKind) {
     return String.format("metadata%s%s", COLUMN_SEPARATOR, statisticsKind.getName());
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java
index 157d3a0..b6b6b80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java
@@ -44,6 +44,7 @@
   private final List<MetadataInfo> metadataToHandle;
   private final List<MetadataInfo> metadataToRemove;
   private final MetadataType analyzeMetadataLevel;
+  private final boolean multiValueSegments;
 
   private MetadataControllerContext(MetadataControllerContextBuilder builder) {
     this.tableInfo = builder.tableInfo;
@@ -54,6 +55,7 @@
     this.metadataToHandle = builder.metadataToHandle;
     this.metadataToRemove = builder.metadataToRemove;
     this.analyzeMetadataLevel = builder.analyzeMetadataLevel;
+    this.multiValueSegments = builder.multiValueSegments;
   }
 
   @JsonProperty
@@ -96,6 +98,18 @@
     return analyzeMetadataLevel;
   }
 
+  /**
+   * Specifies whether metadata controller should create segments with multiple partition values.
+   * For example, Hive partitions contain multiple partition values within the same segment.
+   *
+   * @return {@code true} if metadata controller should create segments with multiple partition values,
+   * {@code false} otherwise
+   */
+  @JsonProperty
+  public boolean multiValueSegments() {
+    return multiValueSegments;
+  }
+
   @Override
   public String toString() {
     return new StringJoiner(",\n", MetadataControllerContext.class.getSimpleName() + "[", "]")
@@ -123,6 +137,7 @@
     private List<MetadataInfo> metadataToHandle;
     private List<MetadataInfo> metadataToRemove;
     private MetadataType analyzeMetadataLevel;
+    private boolean multiValueSegments;
 
     public MetadataControllerContextBuilder tableInfo(TableInfo tableInfo) {
       this.tableInfo = tableInfo;
@@ -164,6 +179,11 @@
       return this;
     }
 
+    public MetadataControllerContextBuilder multiValueSegments(boolean multiValueSegments) {
+      this.multiValueSegments = multiValueSegments;
+      return this;
+    }
+
     public MetadataControllerContext build() {
       Objects.requireNonNull(tableInfo, "tableInfo was not set");
       Objects.requireNonNull(location, "location was not set");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index ab82769..7ef4741 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -363,6 +363,7 @@
         .build();
   }
 
+  @SuppressWarnings("rawtypes")
   private List<TableMetadataUnit> getMetadataUnits(TupleReader reader, int nestingLevel) {
     List<TableMetadataUnit> metadataUnits = new ArrayList<>();
 
@@ -425,6 +426,7 @@
     return metadataUnits;
   }
 
+  @SuppressWarnings("rawtypes")
   private PartitionMetadata getPartitionMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
@@ -456,11 +458,11 @@
         .build();
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("rawtypes")
   private BaseTableMetadata getTableMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics) {
     List<StatisticsHolder> updatedMetaStats = new ArrayList<>(metadataStatistics);
-    updatedMetaStats.add(new StatisticsHolder(popConfig.getContext().analyzeMetadataLevel(), TableStatisticsKind.ANALYZE_METADATA_LEVEL));
+    updatedMetaStats.add(new StatisticsHolder<>(popConfig.getContext().analyzeMetadataLevel(), TableStatisticsKind.ANALYZE_METADATA_LEVEL));
 
     MetadataInfo metadataInfo = MetadataInfo.builder()
         .type(MetadataType.TABLE)
@@ -489,6 +491,7 @@
     return tableMetadata;
   }
 
+  @SuppressWarnings("rawtypes")
   private SegmentMetadata getSegmentMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
@@ -497,11 +500,17 @@
         ? reader.column(segmentColumns.iterator().next()).scalar().getString()
         : MetadataInfo.DEFAULT_SEGMENT_KEY;
 
-    List<String> partitionValues = segmentColumns.stream()
+    // for the case of multi-value segments, there is no nesting
+    // and therefore all values should be used when forming metadata identifier
+    if (popConfig.getContext().multiValueSegments()) {
+      nestingLevel = segmentColumns.size();
+    }
+
+    List<String> allPartitionValues = segmentColumns.stream()
         .limit(nestingLevel)
         .map(columnName -> reader.column(columnName).scalar().getString())
         .collect(Collectors.toList());
-    String metadataIdentifier = MetadataIdentifierUtils.getMetadataIdentifierKey(partitionValues);
+    String metadataIdentifier = MetadataIdentifierUtils.getMetadataIdentifierKey(allPartitionValues);
 
     MetadataInfo metadataInfo = MetadataInfo.builder()
         .type(MetadataType.SEGMENT)
@@ -509,6 +518,14 @@
         .identifier(StringUtils.defaultIfEmpty(metadataIdentifier, null))
         .build();
 
+    int segmentLevel = nestingLevel - 1;
+
+    // for the case of multi-value segments, there is no nesting,
+    // so all partition column values should be used
+    List<String> partitionValues = popConfig.getContext().multiValueSegments()
+        ? allPartitionValues
+        : Collections.singletonList(allPartitionValues.get(segmentLevel));
+
     return SegmentMetadata.builder()
         .tableInfo(tableInfo)
         .metadataInfo(metadataInfo)
@@ -516,13 +533,14 @@
         .metadataStatistics(metadataStatistics)
         .path(new Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString()))
         .locations(getIncomingLocations(reader))
-        .column(segmentColumns.size() > 0 ? SchemaPath.getSimplePath(segmentColumns.get(nestingLevel - 1)) : null)
+        .column(segmentColumns.size() > 0 ? SchemaPath.getSimplePath(segmentColumns.get(segmentLevel)) : null)
         .partitionValues(partitionValues)
         .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
         .schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
         .build();
   }
 
+  @SuppressWarnings("rawtypes")
   private FileMetadata getFileMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
@@ -556,6 +574,7 @@
         .build();
   }
 
+  @SuppressWarnings("rawtypes")
   private RowGroupMetadata getRowGroupMetadata(TupleReader reader,List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
 
@@ -594,7 +613,7 @@
         .build();
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("rawtypes")
   private Map<SchemaPath, ColumnStatistics> getColumnStatistics(TupleReader reader, TupleMetadata columnMetadata, Long rowCount) {
     Multimap<String, StatisticsHolder> columnStatistics = ArrayListMultimap.create();
     Map<String, TypeProtos.MinorType> columnTypes = new HashMap<>();
@@ -602,9 +621,9 @@
       String fieldName = AnalyzeColumnUtils.getColumnName(column.name());
 
       if (AnalyzeColumnUtils.isColumnStatisticsField(column.name())) {
-        StatisticsKind statisticsKind = AnalyzeColumnUtils.getStatisticsKind(column.name());
+        StatisticsKind<?> statisticsKind = AnalyzeColumnUtils.getStatisticsKind(column.name());
         columnStatistics.put(fieldName,
-            new StatisticsHolder(getConvertedColumnValue(reader.column(column.name())), statisticsKind));
+            new StatisticsHolder<>(getConvertedColumnValue(reader.column(column.name())), statisticsKind));
         if (statisticsKind.getName().equalsIgnoreCase(ColumnStatisticsKind.MIN_VALUE.getName())
             || statisticsKind.getName().equalsIgnoreCase(ColumnStatisticsKind.MAX_VALUE.getName())) {
           columnTypes.putIfAbsent(fieldName, column.type());
@@ -617,13 +636,13 @@
       Map<String, StatisticsHolder> nullsCountColumnStatistics = new HashMap<>();
       columnStatistics.asMap().forEach((key, value) ->
           value.stream()
-              .filter(statisticsHolder -> statisticsHolder.getStatisticsKind() == ColumnStatisticsKind.NON_NULL_COUNT)
+              .filter(statisticsHolder -> statisticsHolder.getStatisticsKind() == ColumnStatisticsKind.NON_NULL_VALUES_COUNT)
               .findAny()
               .map(statisticsHolder -> (Long) statisticsHolder.getStatisticsValue())
               .ifPresent(nonNullCount ->
                   nullsCountColumnStatistics.put(
                       key,
-                      new StatisticsHolder(rowCount - nonNullCount, ColumnStatisticsKind.NULLS_COUNT))));
+                      new StatisticsHolder<>(rowCount - nonNullCount, ColumnStatisticsKind.NULLS_COUNT))));
 
       nullsCountColumnStatistics.forEach(columnStatistics::put);
     }
@@ -631,11 +650,11 @@
     Map<SchemaPath, ColumnStatistics> resultingStats = new HashMap<>();
 
     columnStatistics.asMap().forEach((fieldName, statisticsHolders) ->
-        resultingStats.put(SchemaPath.parseFromString(fieldName), new ColumnStatistics(statisticsHolders, columnTypes.get(fieldName))));
+        resultingStats.put(SchemaPath.parseFromString(fieldName), new ColumnStatistics<>(statisticsHolders, columnTypes.get(fieldName))));
     return resultingStats;
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("rawtypes")
   private List<StatisticsHolder> getMetadataStatistics(TupleReader reader, TupleMetadata columnMetadata) {
     List<StatisticsHolder> metadataStatistics = new ArrayList<>();
     String rgs = columnNamesOptions.rowGroupStart();
@@ -644,15 +663,15 @@
       String columnName = column.name();
       ObjectReader objectReader = reader.column(columnName);
       if (AnalyzeColumnUtils.isMetadataStatisticsField(columnName)) {
-        metadataStatistics.add(new StatisticsHolder(objectReader.getObject(),
+        metadataStatistics.add(new StatisticsHolder<>(objectReader.getObject(),
             AnalyzeColumnUtils.getStatisticsKind(columnName)));
       } else if (!objectReader.isNull()) {
         if (columnName.equals(rgs)) {
-          metadataStatistics.add(new StatisticsHolder(Long.parseLong(objectReader.scalar().getString()),
-              new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
+          metadataStatistics.add(new StatisticsHolder<>(Long.parseLong(objectReader.scalar().getString()),
+              new BaseStatisticsKind<>(ExactStatisticsConstants.START, true)));
         } else if (columnName.equals(rgl)) {
-          metadataStatistics.add(new StatisticsHolder(Long.parseLong(objectReader.scalar().getString()),
-              new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+          metadataStatistics.add(new StatisticsHolder<>(Long.parseLong(objectReader.scalar().getString()),
+              new BaseStatisticsKind<>(ExactStatisticsConstants.LENGTH, true)));
         }
       }
     }
@@ -664,7 +683,7 @@
       List<FieldConverter> fieldConverters = new ArrayList<>();
       int fieldId = 0;
 
-      for (VectorWrapper wrapper : right) {
+      for (VectorWrapper<?> wrapper : right) {
         if (wrapper.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
           continue;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 22e90fa..044d140 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -263,7 +263,7 @@
     baseMetadata.getColumnsStatistics().entrySet().stream()
         .sorted(Comparator.comparing(e -> e.getKey().getRootSegmentPath()))
         .forEach(entry -> {
-          for (StatisticsKind statisticsKind : AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet()) {
+          for (StatisticsKind<?> statisticsKind : AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet()) {
             MinorType type = AnalyzeColumnUtils.COLUMN_STATISTICS_TYPES.get(statisticsKind);
             type = type != null ? type : entry.getValue().getComparatorType();
             schemaBuilder.addNullable(
@@ -272,7 +272,7 @@
           }
         });
 
-    for (StatisticsKind statisticsKind : AnalyzeColumnUtils.META_STATISTICS_FUNCTIONS.keySet()) {
+    for (StatisticsKind<?> statisticsKind : AnalyzeColumnUtils.META_STATISTICS_FUNCTIONS.keySet()) {
       schemaBuilder.addNullable(
           AnalyzeColumnUtils.getMetadataStatisticsFieldName(statisticsKind),
           AnalyzeColumnUtils.COLUMN_STATISTICS_TYPES.get(statisticsKind));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
index 43f6383..ed4038f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
@@ -201,16 +201,16 @@
 
       // populates record list with row group column metadata
       for (SchemaPath schemaPath : interestingColumns) {
-        ColumnStatistics columnStatistics = rowGroupMetadata.getColumnsStatistics().get(schemaPath);
+        ColumnStatistics<?> columnStatistics = rowGroupMetadata.getColumnsStatistics().get(schemaPath);
         if (IsPredicate.isNullOrEmpty(columnStatistics)) {
           logger.debug("Statistics for {} column wasn't found within {} row group.", schemaPath, path);
           return null;
         }
-        for (StatisticsKind statisticsKind : AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet()) {
+        for (StatisticsKind<?> statisticsKind : AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet()) {
           Object statsValue;
           if (statisticsKind.getName().equalsIgnoreCase(TableStatisticsKind.ROW_COUNT.getName())) {
             statsValue = TableStatisticsKind.ROW_COUNT.getValue(rowGroupMetadata);
-          } else if (statisticsKind.getName().equalsIgnoreCase(ColumnStatisticsKind.NON_NULL_COUNT.getName())) {
+          } else if (statisticsKind.getName().equalsIgnoreCase(ColumnStatisticsKind.NON_NULL_VALUES_COUNT.getName())) {
             statsValue = TableStatisticsKind.ROW_COUNT.getValue(rowGroupMetadata) - ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStatistics);
           } else {
             statsValue = columnStatistics.get(statisticsKind);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
index 25bbc7d..b1f1d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
@@ -20,6 +20,7 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ExprNode.Type;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
@@ -45,6 +46,8 @@
     return exprRoot;
   }
 
+  // include type-info to be able to deserialize subclasses correctly
+  @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "@class")
   public static class ExprNode {
     @JsonProperty
     public Type type;
@@ -68,7 +71,8 @@
     public List<ExprNode> args;
 
     @JsonCreator
-    public FunctionExprNode(String function, List<ExprNode> args) {
+    public FunctionExprNode(@JsonProperty("function") String function,
+        @JsonProperty("args") List<ExprNode> args) {
       super(Type.FUNCTION);
       this.function = function;
       this.args = args;
@@ -90,7 +94,7 @@
     public String field;
 
     @JsonCreator
-    public FieldExprNode(String field) {
+    public FieldExprNode(@JsonProperty("field") String field) {
       super(Type.FIELD);
       this.field = field;
     }
@@ -106,7 +110,7 @@
     public String value;
 
     @JsonCreator
-    public ConstantExprNode(String value) {
+    public ConstantExprNode(@JsonProperty("value") String value) {
       super(Type.CONSTANT);
       this.value = value;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 6cf9cc7..ff2108b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -62,6 +62,9 @@
 
 import java.io.File;
 import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -78,6 +81,7 @@
 import static org.junit.Assert.assertTrue;
 
 @Category({SlowTest.class, MetastoreTest.class})
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class TestMetastoreCommands extends ClusterTest {
 
   private static final TupleMetadata SCHEMA = new SchemaBuilder()
@@ -360,8 +364,8 @@
         .columnsStatistics(DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS)
         .metadataStatistics(Arrays.asList(
             new StatisticsHolder<>(10L, TableStatisticsKind.ROW_COUNT),
-            new StatisticsHolder<>(1196L, new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)),
-            new StatisticsHolder<>(4L, new BaseStatisticsKind(ExactStatisticsConstants.START, true))))
+            new StatisticsHolder<>(1196L, new BaseStatisticsKind<>(ExactStatisticsConstants.LENGTH, true)),
+            new StatisticsHolder<>(4L, new BaseStatisticsKind<>(ExactStatisticsConstants.START, true))))
         .path(new Path(tablePath, "1994/Q1/orders_94_q1.parquet"))
         .build();
 
@@ -438,7 +442,7 @@
           .columnsStatistics(DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS)
           .metadataStatistics(Collections.singletonList(new StatisticsHolder<>(10L, TableStatisticsKind.ROW_COUNT)))
           .locations(ImmutableSet.of(new Path(tablePath, "1994/Q1/orders_94_q1.parquet")))
-          .partitionValues(Arrays.asList("1994", "Q1"))
+          .partitionValues(Collections.singletonList("Q1"))
           .build();
 
       // verify segment for 1994
@@ -963,7 +967,6 @@
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testIncrementalAnalyzeNewParentSegment() throws Exception {
     String tableName = "multilevel/parquetNewParentSegment";
 
@@ -979,7 +982,7 @@
         columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(160L, TableStatisticsKind.ROW_COUNT),
-                new StatisticsHolder<>(160L, ColumnStatisticsKind.NON_NULL_COUNT)))));
+                new StatisticsHolder<>(160L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
 
     updatedStatistics.computeIfPresent(SchemaPath.getSimplePath("dir0"), (logicalExpressions, columnStatistics) ->
         columnStatistics.cloneWith(new ColumnStatistics<>(
@@ -1055,7 +1058,6 @@
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testIncrementalAnalyzeNewChildSegment() throws Exception {
     String tableName = "multilevel/parquetNewChildSegment";
 
@@ -1071,7 +1073,7 @@
         columnStatistics.cloneWith(new ColumnStatistics(
             Arrays.asList(
                 new StatisticsHolder<>(130L, TableStatisticsKind.ROW_COUNT),
-                new StatisticsHolder<>(130L, ColumnStatisticsKind.NON_NULL_COUNT)))));
+                new StatisticsHolder<>(130L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
 
     updatedStatistics.computeIfPresent(SchemaPath.getSimplePath("dir1"), (logicalExpressions, columnStatistics) ->
         columnStatistics.cloneWith(new ColumnStatistics(
@@ -1140,7 +1142,6 @@
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testIncrementalAnalyzeNewFile() throws Exception {
     String tableName = "multilevel/parquetNewFile";
 
@@ -1156,7 +1157,7 @@
         columnStatistics.cloneWith(new ColumnStatistics(
             Arrays.asList(
                 new StatisticsHolder<>(130L, TableStatisticsKind.ROW_COUNT),
-                new StatisticsHolder<>(130L, ColumnStatisticsKind.NON_NULL_COUNT)))));
+                new StatisticsHolder<>(130L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
 
     BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
         .tableInfo(tableInfo)
@@ -1556,7 +1557,6 @@
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testIncrementalAnalyzeUpdatedFile() throws Exception {
     String tableName = "multilevel/parquetUpdatedFile";
 
@@ -1772,7 +1772,6 @@
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testDefaultSegment() throws Exception {
     String tableName = "multilevel/parquet/1994/Q1";
     File table = dirTestWatcher.copyResourceToTestTmp(Paths.get(tableName), Paths.get(tableName));
@@ -1804,7 +1803,7 @@
         columnStatistics.cloneWith(new ColumnStatistics(
             Arrays.asList(
                 new StatisticsHolder<>(10L, TableStatisticsKind.ROW_COUNT),
-                new StatisticsHolder<>(10L, ColumnStatisticsKind.NON_NULL_COUNT)))));
+                new StatisticsHolder<>(10L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
 
     BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
         .tableInfo(tableInfo)
@@ -2979,6 +2978,156 @@
     }
   }
 
+  @Test
+  public void testDescribeWithMetastore() throws Exception {
+    String tableName = "describeTable";
+
+    File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
+
+    try {
+      client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
+
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
+
+      testBuilder()
+          .sqlQuery("describe table dfs.tmp.`%s`", tableName)
+          .unOrdered()
+          .baselineColumns("COLUMN_NAME", "DATA_TYPE", "IS_NULLABLE")
+          .baselineValues("dir0", "CHARACTER VARYING", "YES")
+          .baselineValues("dir1", "CHARACTER VARYING", "YES")
+          .baselineValues("o_orderkey", "INTEGER", "NO")
+          .baselineValues("o_custkey", "INTEGER", "NO")
+          .baselineValues("o_orderstatus", "CHARACTER VARYING", "NO")
+          .baselineValues("o_totalprice", "DOUBLE", "NO")
+          .baselineValues("o_orderdate", "DATE", "NO")
+          .baselineValues("o_orderpriority", "CHARACTER VARYING", "NO")
+          .baselineValues("o_clerk", "CHARACTER VARYING", "NO")
+          .baselineValues("o_shippriority", "INTEGER", "NO")
+          .baselineValues("o_comment", "CHARACTER VARYING", "NO")
+          .go();
+    } finally {
+      run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
+      client.resetSession(PlannerSettings.STATISTICS_USE.getOptionName());
+
+      FileUtils.deleteQuietly(table);
+    }
+  }
+
+  @Test
+  public void testSelectFromInfoSchemaTablesWithMetastore() throws Exception {
+    String tableName = "tableInInfoSchema";
+
+    File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
+
+    try {
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
+
+      LocalDateTime localDateTime = getLocalDateTime(getMaxLastModified(table));
+
+      String absolutePath = new Path(table.toURI().getPath()).toUri().getPath();
+      testBuilder()
+          .sqlQuery("select * from information_schema.`tables` where TABLE_NAME='%s'", tableName)
+          .unOrdered()
+          .baselineColumns("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE", "TABLE_SOURCE", "LOCATION", "NUM_ROWS", "LAST_MODIFIED_TIME")
+          .baselineValues("DRILL", "dfs.tmp", tableName, "TABLE", "PARQUET", absolutePath, 120L, localDateTime)
+          .go();
+    } finally {
+      run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
+
+      FileUtils.deleteQuietly(table);
+    }
+  }
+
+  private LocalDateTime getLocalDateTime(long maxLastModified) {
+    return Instant.ofEpochMilli(maxLastModified)
+            .atZone(ZoneId.of("UTC"))
+            .withZoneSameLocal(ZoneId.systemDefault())
+            .toLocalDateTime();
+  }
+
+  @Test
+  public void testSelectFromInfoSchemaColumnsWithMetastore() throws Exception {
+    String tableName = "columnInInfoSchema";
+
+    File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
+
+    try {
+      client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
+
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
+
+      testBuilder()
+          .sqlQuery("select * from information_schema.`columns` where TABLE_NAME='%s' and COLUMN_NAME in ('dir0', 'o_orderkey', 'o_totalprice')", tableName)
+          .unOrdered()
+          .baselineColumns("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "COLUMN_NAME", "ORDINAL_POSITION",
+              "COLUMN_DEFAULT", "IS_NULLABLE", "DATA_TYPE", "CHARACTER_MAXIMUM_LENGTH", "CHARACTER_OCTET_LENGTH",
+              "NUMERIC_PRECISION", "NUMERIC_PRECISION_RADIX", "NUMERIC_SCALE", "DATETIME_PRECISION", "INTERVAL_TYPE",
+              "INTERVAL_PRECISION", "COLUMN_SIZE", "COLUMN_FORMAT", "NUM_NULLS", "MIN_VAL", "MAX_VAL", "NDV", "EST_NUM_NON_NULLS", "IS_NESTED")
+          .baselineValues("DRILL", "dfs.tmp", tableName, "dir0", 1, null, "YES", "CHARACTER VARYING",
+              65535, 65535, null, null, null, null, null, null, 65535, null, 0L, "1994", "1996", null, null, false)
+          .baselineValues("DRILL", "dfs.tmp", tableName, "o_orderkey", 3, null, "NO", "INTEGER",
+              null, null, 0, 2, 0, null, null, null, 11, null, 0L, "1", "1319", 119.0, 120.0, false)
+          .baselineValues("DRILL", "dfs.tmp", tableName, "o_totalprice", 6, null, "NO", "DOUBLE",
+              null, null, 0, 2, 0, null, null, null, 24, null, 0L, "3266.69", "350110.21", 120.0, 120.0, false)
+          .go();
+    } finally {
+      run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
+      client.resetSession(PlannerSettings.STATISTICS_USE.getOptionName());
+
+      FileUtils.deleteQuietly(table);
+    }
+  }
+
+  @Test
+  public void testSelectFromInfoSchemaPartitionsWithMetastore() throws Exception {
+    String tableName = "partitionInInfoSchema";
+
+    File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
+
+    try {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
+
+      File seg1994q1 = new File(table, "1994/Q1");
+      File seg1995q2 = new File(table, "1995/Q2");
+      testBuilder()
+          .sqlQuery("select * from information_schema.`partitions` where TABLE_NAME='%s' and METADATA_IDENTIFIER in ('1994/Q1', '1995/Q2') order by LOCATION", tableName)
+          .unOrdered()
+          .baselineColumns("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "METADATA_KEY", "METADATA_TYPE",
+              "METADATA_IDENTIFIER", "PARTITION_COLUMN", "PARTITION_VALUE", "LOCATION", "LAST_MODIFIED_TIME")
+          .baselineValues("DRILL", "dfs.tmp", tableName, "1994", "SEGMENT", "1994/Q1", "`dir1`", "Q1",
+              new Path(seg1994q1.toURI().getPath()).toUri().getPath(), getLocalDateTime(getMaxLastModified(seg1994q1)))
+          .baselineValues("DRILL", "dfs.tmp", tableName, "1995", "SEGMENT", "1995/Q2", "`dir1`", "Q2",
+              new Path(seg1995q2.toURI().getPath()).toUri().getPath(), getLocalDateTime(getMaxLastModified(seg1995q2)))
+          .go();
+    } finally {
+      run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
+      client.alterSession(ExecConstants.SLICE_TARGET, 1);
+
+      FileUtils.deleteQuietly(table);
+    }
+  }
+
   private static <T> ColumnStatistics<T> getColumnStatistics(T minValue, T maxValue,
       long rowCount, TypeProtos.MinorType minorType) {
     return new ColumnStatistics<>(
@@ -2986,7 +3135,7 @@
             new StatisticsHolder<>(minValue, ColumnStatisticsKind.MIN_VALUE),
             new StatisticsHolder<>(maxValue, ColumnStatisticsKind.MAX_VALUE),
             new StatisticsHolder<>(rowCount, TableStatisticsKind.ROW_COUNT),
-            new StatisticsHolder<>(rowCount, ColumnStatisticsKind.NON_NULL_COUNT),
+            new StatisticsHolder<>(rowCount, ColumnStatisticsKind.NON_NULL_VALUES_COUNT),
             new StatisticsHolder<>(0L, ColumnStatisticsKind.NULLS_COUNT)),
         minorType);
   }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
index 1b40b7e..613b602 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
@@ -92,7 +92,25 @@
       };
 
   /**
-   * Column statistics kind which represents number of non-null values for the specific column.
+   * Column statistics kind which represents exact number of non-null values for the specific column.
+   */
+  public static final ColumnStatisticsKind<Long> NON_NULL_VALUES_COUNT =
+      new ColumnStatisticsKind<Long>(ExactStatisticsConstants.NON_NULL_VALUES_COUNT, true) {
+        @Override
+        public Long mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+          long nonNullRowCount = 0;
+          for (ColumnStatistics<?> statistics : statisticsList) {
+            Long nnRowCount = statistics.get(this);
+            if (nnRowCount != null) {
+              nonNullRowCount += nnRowCount;
+            }
+          }
+          return nonNullRowCount;
+        }
+      };
+
+  /**
+   * Column statistics kind which represents estimated number of non-null values for the specific column.
    */
   public static final ColumnStatisticsKind<Double> NON_NULL_COUNT =
       new ColumnStatisticsKind<Double>(Statistic.NNROWCOUNT, false) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ExactStatisticsConstants.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ExactStatisticsConstants.java
index 38174a9..b96b322 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ExactStatisticsConstants.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ExactStatisticsConstants.java
@@ -22,6 +22,7 @@
   String MAX_VALUE = "maxValue";
   String ROW_COUNT = "rowCount";
   String NULLS_COUNT = "nullsCount";
+  String NON_NULL_VALUES_COUNT = "nonNullValuesCount";
 
   String START = "start";
   String LENGTH = "length";