DRILL-7479: Partial fixes for metadata parameterized type issues

See DRILL-7479 and DRILL-7480 for an explanation. Adds generic
type parameters where needed to avoid the need to supporess
warnings. However, type parameters are probably not needed
at all and should be removed in the future for reasons explained
in DRILL-7480.

closes #1923
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
index fa51780..3594a6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
@@ -88,11 +88,11 @@
   @Override
   @SuppressWarnings("unchecked")
   public RowsMatch matches(StatisticsProvider<C> evaluator) {
-    ColumnStatistics leftStat = left.accept(evaluator, null);
+    ColumnStatistics<C> leftStat = (ColumnStatistics<C>) left.accept(evaluator, null);
     if (IsPredicate.isNullOrEmpty(leftStat)) {
       return RowsMatch.SOME;
     }
-    ColumnStatistics rightStat = right.accept(evaluator, null);
+    ColumnStatistics<C> rightStat = (ColumnStatistics<C>) right.accept(evaluator, null);
     if (IsPredicate.isNullOrEmpty(rightStat)) {
       return RowsMatch.SOME;
     }
@@ -112,9 +112,11 @@
       int leftScale = left.getMajorType().getScale();
       int rightScale = right.getMajorType().getScale();
       if (leftScale > rightScale) {
-        rightStat = adjustDecimalStatistics(rightStat, leftScale - rightScale);
+        rightStat = (ColumnStatistics<C>) adjustDecimalStatistics(
+            (ColumnStatistics<BigInteger>) rightStat, leftScale - rightScale);
       } else if (leftScale < rightScale) {
-        leftStat = adjustDecimalStatistics(leftStat, rightScale - leftScale);
+        leftStat = (ColumnStatistics<C>) adjustDecimalStatistics(
+            (ColumnStatistics<BigInteger>) leftStat, rightScale - leftScale);
       }
     }
     return predicate.apply(leftStat, rightStat);
@@ -127,7 +129,7 @@
    * @param scale adjustment scale
    * @return adjusted statistics
    */
-  private ColumnStatistics adjustDecimalStatistics(ColumnStatistics<BigInteger> statistics, int scale) {
+  private ColumnStatistics<BigInteger> adjustDecimalStatistics(ColumnStatistics<BigInteger> statistics, int scale) {
     BigInteger min = new BigDecimal(ColumnStatisticsKind.MIN_VALUE.getValueStatistic(statistics))
         .setScale(scale, RoundingMode.HALF_UP).unscaledValue();
     BigInteger max = new BigDecimal(ColumnStatisticsKind.MAX_VALUE.getValueStatistic(statistics))
@@ -139,7 +141,7 @@
   /**
    * If one rowgroup contains some null values, change the RowsMatch.ALL into RowsMatch.SOME (null values should be discarded by filter)
    */
-  private static RowsMatch checkNull(ColumnStatistics leftStat, ColumnStatistics rightStat) {
+  private static RowsMatch checkNull(ColumnStatistics<?> leftStat, ColumnStatistics<?> rightStat) {
     return !IsPredicate.hasNoNulls(leftStat) || !IsPredicate.hasNoNulls(rightStat) ? RowsMatch.SOME : RowsMatch.ALL;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
index aa0b9fd..def1d3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
@@ -63,7 +63,8 @@
    */
   @Override
   public RowsMatch matches(StatisticsProvider<C> evaluator) {
-    ColumnStatistics<C> exprStat = expr.accept(evaluator, null);
+    @SuppressWarnings("unchecked")
+    ColumnStatistics<C> exprStat = (ColumnStatistics<C>) expr.accept(evaluator, null);
     return isNullOrEmpty(exprStat) ? RowsMatch.SOME : predicate.apply(exprStat, evaluator);
   }
 
@@ -71,7 +72,7 @@
    * @param stat statistics object
    * @return <tt>true</tt> if the input stat object is null or has invalid statistics; false otherwise
    */
-  public static boolean isNullOrEmpty(ColumnStatistics stat) {
+  public static boolean isNullOrEmpty(ColumnStatistics<?> stat) {
     return stat == null
         || !stat.contains(ColumnStatisticsKind.MIN_VALUE)
         || !stat.contains(ColumnStatisticsKind.MAX_VALUE)
@@ -85,7 +86,7 @@
    * If it contains some null values, then we change the RowsMatch.ALL into RowsMatch.SOME, which sya that maybe
    * some values (the null ones) should be disgarded.
    */
-  private static RowsMatch checkNull(ColumnStatistics exprStat) {
+  private static RowsMatch checkNull(ColumnStatistics<?> exprStat) {
     return hasNoNulls(exprStat) ? RowsMatch.ALL : RowsMatch.SOME;
   }
 
@@ -95,7 +96,7 @@
    * @param stat column statistics
    * @return <tt>true</tt> if the statistics does not have nulls and <tt>false</tt> otherwise
    */
-  static boolean hasNoNulls(ColumnStatistics stat) {
+  static boolean hasNoNulls(ColumnStatistics<?> stat) {
     return ColumnStatisticsKind.NULLS_COUNT.getFrom(stat) == 0;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
index 5b3cfc8..1ab7579 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
@@ -48,12 +48,12 @@
 import java.util.Map;
 import java.util.Set;
 
-public class StatisticsProvider<T extends Comparable<T>> extends AbstractExprVisitor<ColumnStatistics, Void, RuntimeException> {
+public class StatisticsProvider<T extends Comparable<T>> extends AbstractExprVisitor<ColumnStatistics<?>, Void, RuntimeException> {
 
-  private final Map<SchemaPath, ColumnStatistics> columnStatMap;
+  private final Map<SchemaPath, ColumnStatistics<?>> columnStatMap;
   private final long rowCount;
 
-  public StatisticsProvider(Map<SchemaPath, ColumnStatistics> columnStatMap, long rowCount) {
+  public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>> columnStatMap, long rowCount) {
     this.columnStatMap = columnStatMap;
     this.rowCount = rowCount;
   }
@@ -63,14 +63,14 @@
   }
 
   @Override
-  public ColumnStatistics visitUnknown(LogicalExpression e, Void value) {
+  public ColumnStatistics<?> visitUnknown(LogicalExpression e, Void value) {
     // do nothing for the unknown expression
     return null;
   }
 
   @Override
-  public ColumnStatistics visitTypedFieldExpr(TypedFieldExpr typedFieldExpr, Void value) {
-    ColumnStatistics columnStatistics = columnStatMap.get(typedFieldExpr.getPath().getUnIndexed());
+  public ColumnStatistics<?> visitTypedFieldExpr(TypedFieldExpr typedFieldExpr, Void value) {
+    ColumnStatistics<?> columnStatistics = columnStatMap.get(typedFieldExpr.getPath().getUnIndexed());
     if (columnStatistics != null) {
       return columnStatistics;
     } else if (typedFieldExpr.getMajorType().equals(Types.OPTIONAL_INT)) {
@@ -132,7 +132,7 @@
 
   @Override
   @SuppressWarnings("unchecked")
-  public ColumnStatistics visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Void value) {
+  public ColumnStatistics<?> visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Void value) {
     FuncHolder funcHolder = holderExpr.getHolder();
 
     if (!(funcHolder instanceof DrillSimpleFuncHolder)) {
@@ -143,7 +143,7 @@
     String funcName = ((DrillSimpleFuncHolder) funcHolder).getRegisteredNames()[0];
 
     if (FunctionReplacementUtils.isCastFunction(funcName)) {
-      ColumnStatistics<T> stat = holderExpr.args.get(0).accept(this, null);
+      ColumnStatistics<T> stat = (ColumnStatistics<T>) holderExpr.args.get(0).accept(this, null);
       if (!IsPredicate.isNullOrEmpty(stat)) {
         return evalCastFunc(holderExpr, stat);
       }
@@ -151,7 +151,7 @@
     return null;
   }
 
-  private ColumnStatistics evalCastFunc(FunctionHolderExpression holderExpr, ColumnStatistics<T> input) {
+  private ColumnStatistics<?> evalCastFunc(FunctionHolderExpression holderExpr, ColumnStatistics<T> input) {
     try {
       DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
index 24736bf..acad916 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
@@ -85,7 +85,7 @@
   private Multimap<Path, RowGroupMetadata> rowGroups;
   private NonInterestingColumnsMetadata nonInterestingColumnsMetadata;
   // stores builder to provide lazy init for fallback ParquetTableMetadataProvider
-  private ParquetFileTableMetadataProviderBuilder fallbackBuilder;
+  private final ParquetFileTableMetadataProviderBuilder fallbackBuilder;
   private ParquetTableMetadataProvider fallback;
 
   private MetastoreParquetTableMetadataProvider(List<ReadEntryWithPath> entries,
@@ -259,12 +259,12 @@
     if (nonInterestingColumnsMetadata == null) {
       TupleMetadata schema = getTableMetadata().getSchema();
 
-      List<StatisticsHolder> statistics = Collections.singletonList(new StatisticsHolder<>(Statistic.NO_COLUMN_STATS, ColumnStatisticsKind.NULLS_COUNT));
+      List<StatisticsHolder<?>> statistics = Collections.singletonList(new StatisticsHolder<>(Statistic.NO_COLUMN_STATS, ColumnStatisticsKind.NULLS_COUNT));
 
       List<SchemaPath> columnPaths = SchemaUtil.getSchemaPaths(schema);
       List<SchemaPath> interestingColumns = getInterestingColumns(columnPaths);
       // populates statistics for non-interesting columns and columns for which statistics wasn't collected
-      Map<SchemaPath, ColumnStatistics> columnsStatistics = columnPaths.stream()
+      Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = columnPaths.stream()
           .filter(schemaPath -> !interestingColumns.contains(schemaPath)
               || SchemaPathUtils.getColumnMetadata(schemaPath, schema).isArray())
           .collect(Collectors.toMap(
@@ -315,7 +315,7 @@
 
     // builder for fallback ParquetFileTableMetadataProvider
     // for the case when required metadata is absent in Metastore
-    private ParquetFileTableMetadataProviderBuilder fallback;
+    private final ParquetFileTableMetadataProviderBuilder fallback;
 
     public Builder(MetastoreMetadataProviderManager source) {
       this.metadataProviderManager = source;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
index b3a34658..1ae99e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
@@ -49,7 +49,7 @@
 public class SimpleFileTableMetadataProvider implements TableMetadataProvider {
   private static final Logger logger = LoggerFactory.getLogger(SimpleFileTableMetadataProvider.class);
 
-  private TableMetadata tableMetadata;
+  private final TableMetadata tableMetadata;
 
   private SimpleFileTableMetadataProvider(TableMetadata tableMetadata) {
     this.tableMetadata = tableMetadata;
@@ -111,7 +111,7 @@
     private long lastModifiedTime = -1L;
     private TupleMetadata schema;
 
-    private MetadataProviderManager metadataProviderManager;
+    private final MetadataProviderManager metadataProviderManager;
 
     public Builder(MetadataProviderManager source) {
       this.metadataProviderManager = source;
@@ -147,7 +147,7 @@
       TableMetadataProvider source = metadataProviderManager.getTableMetadataProvider();
       if (source == null) {
         DrillStatsTable statsProvider = metadataProviderManager.getStatsProvider();
-        Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+        Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
 
         if (statsProvider != null) {
           if (!statsProvider.isMaterialized()) {
@@ -156,7 +156,7 @@
           if (statsProvider.isMaterialized()) {
             for (SchemaPath column : statsProvider.getColumns()) {
               columnsStatistics.put(column,
-                  new ColumnStatistics(DrillStatsTable.getEstimatedColumnStats(statsProvider, column)));
+                  new ColumnStatistics<>(DrillStatsTable.getEstimatedColumnStats(statsProvider, column)));
             }
           }
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 2371c6d..40ab594 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -261,7 +261,7 @@
    * @return group scan with applied filter expression
    */
   @Override
-  public AbstractGroupScanWithMetadata applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
+  public AbstractGroupScanWithMetadata<?> applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
 
     // Builds filter for pruning. If filter cannot be built, null should be returned.
@@ -531,7 +531,6 @@
     return columnMetadata != null ? columnMetadata.majorType() : null;
   }
 
-  @SuppressWarnings("unchecked")
   @JsonIgnore
   public <T> T getPartitionValue(Path path, SchemaPath column, Class<T> clazz) {
     return getPartitionsMetadata().stream()
@@ -642,7 +641,7 @@
     // and files which belongs to that partitions may be returned
     protected MetadataType overflowLevel = MetadataType.NONE;
 
-    public GroupScanWithMetadataFilterer(AbstractGroupScanWithMetadata source) {
+    public GroupScanWithMetadataFilterer(AbstractGroupScanWithMetadata<?> source) {
       this.source = source;
     }
 
@@ -651,7 +650,7 @@
      *
      * @return implementation of {@link AbstractGroupScanWithMetadata} with filtered metadata
      */
-    public abstract AbstractGroupScanWithMetadata build();
+    public abstract AbstractGroupScanWithMetadata<?> build();
 
     public B table(TableMetadata tableMetadata) {
       this.tableMetadata = tableMetadata;
@@ -968,8 +967,7 @@
           filterPredicate = getFilterPredicate(filterExpression, udfUtilities,
               context, optionManager, true, true, schema);
         }
-        @SuppressWarnings("rawtypes")
-        Map<SchemaPath, ColumnStatistics> columnsStatistics = metadata.getColumnsStatistics();
+        Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = metadata.getColumnsStatistics();
 
         // adds partition (dir) column statistics if it may be used during filter evaluation
         if (metadata instanceof LocationProvider && optionManager != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 7ef4741..73a55b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -363,7 +363,6 @@
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
   private List<TableMetadataUnit> getMetadataUnits(TupleReader reader, int nestingLevel) {
     List<TableMetadataUnit> metadataUnits = new ArrayList<>();
 
@@ -383,7 +382,7 @@
       }
     }
 
-    List<StatisticsHolder> metadataStatistics = getMetadataStatistics(reader, columnMetadata);
+    List<StatisticsHolder<?>> metadataStatistics = getMetadataStatistics(reader, columnMetadata);
 
     Long rowCount = (Long) metadataStatistics.stream()
         .filter(statisticsHolder -> statisticsHolder.getStatisticsKind() == TableStatisticsKind.ROW_COUNT)
@@ -391,7 +390,7 @@
         .map(StatisticsHolder::getStatisticsValue)
         .orElse(null);
 
-    Map<SchemaPath, ColumnStatistics> columnStatistics = getColumnStatistics(reader, columnMetadata, rowCount);
+    Map<SchemaPath, ColumnStatistics<?>> columnStatistics = getColumnStatistics(reader, columnMetadata, rowCount);
 
     MetadataType metadataType = MetadataType.valueOf(metadataColumnReader.scalar().getString());
 
@@ -426,9 +425,8 @@
     return metadataUnits;
   }
 
-  @SuppressWarnings("rawtypes")
-  private PartitionMetadata getPartitionMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private PartitionMetadata getPartitionMetadata(TupleReader reader, List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
 
     String segmentKey = segmentColumns.size() > 0
@@ -458,10 +456,9 @@
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private BaseTableMetadata getTableMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics) {
-    List<StatisticsHolder> updatedMetaStats = new ArrayList<>(metadataStatistics);
+  private BaseTableMetadata getTableMetadata(TupleReader reader, List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics) {
+    List<StatisticsHolder<?>> updatedMetaStats = new ArrayList<>(metadataStatistics);
     updatedMetaStats.add(new StatisticsHolder<>(popConfig.getContext().analyzeMetadataLevel(), TableStatisticsKind.ANALYZE_METADATA_LEVEL));
 
     MetadataInfo metadataInfo = MetadataInfo.builder()
@@ -483,7 +480,7 @@
 
     if (context.getOptions().getOption(PlannerSettings.STATISTICS_USE)) {
       DrillStatsTable statistics = new DrillStatsTable(statisticsCollector.getStatistics());
-      Map<SchemaPath, ColumnStatistics> tableColumnStatistics =
+      Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics =
           ParquetTableMetadataUtils.getColumnStatistics(tableMetadata.getSchema(), statistics);
       tableMetadata = tableMetadata.cloneWithStats(tableColumnStatistics, DrillStatsTable.getEstimatedTableStats(statistics));
     }
@@ -491,9 +488,8 @@
     return tableMetadata;
   }
 
-  @SuppressWarnings("rawtypes")
-  private SegmentMetadata getSegmentMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private SegmentMetadata getSegmentMetadata(TupleReader reader, List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
 
     String segmentKey = segmentColumns.size() > 0
@@ -540,9 +536,8 @@
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private FileMetadata getFileMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private FileMetadata getFileMetadata(TupleReader reader, List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
 
     String segmentKey = segmentColumns.size() > 0
@@ -574,9 +569,8 @@
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private RowGroupMetadata getRowGroupMetadata(TupleReader reader,List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private RowGroupMetadata getRowGroupMetadata(TupleReader reader,List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) {
 
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
     String segmentKey = segmentColumns.size() > 0
@@ -613,9 +607,8 @@
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private Map<SchemaPath, ColumnStatistics> getColumnStatistics(TupleReader reader, TupleMetadata columnMetadata, Long rowCount) {
-    Multimap<String, StatisticsHolder> columnStatistics = ArrayListMultimap.create();
+  private Map<SchemaPath, ColumnStatistics<?>> getColumnStatistics(TupleReader reader, TupleMetadata columnMetadata, Long rowCount) {
+    Multimap<String, StatisticsHolder<?>> columnStatistics = ArrayListMultimap.create();
     Map<String, TypeProtos.MinorType> columnTypes = new HashMap<>();
     for (ColumnMetadata column : columnMetadata) {
       String fieldName = AnalyzeColumnUtils.getColumnName(column.name());
@@ -633,7 +626,7 @@
 
     // adds NON_NULL_COUNT to use it during filter pushdown
     if (rowCount != null) {
-      Map<String, StatisticsHolder> nullsCountColumnStatistics = new HashMap<>();
+      Map<String, StatisticsHolder<?>> nullsCountColumnStatistics = new HashMap<>();
       columnStatistics.asMap().forEach((key, value) ->
           value.stream()
               .filter(statisticsHolder -> statisticsHolder.getStatisticsKind() == ColumnStatisticsKind.NON_NULL_VALUES_COUNT)
@@ -647,16 +640,15 @@
       nullsCountColumnStatistics.forEach(columnStatistics::put);
     }
 
-    Map<SchemaPath, ColumnStatistics> resultingStats = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> resultingStats = new HashMap<>();
 
     columnStatistics.asMap().forEach((fieldName, statisticsHolders) ->
         resultingStats.put(SchemaPath.parseFromString(fieldName), new ColumnStatistics<>(statisticsHolders, columnTypes.get(fieldName))));
     return resultingStats;
   }
 
-  @SuppressWarnings("rawtypes")
-  private List<StatisticsHolder> getMetadataStatistics(TupleReader reader, TupleMetadata columnMetadata) {
-    List<StatisticsHolder> metadataStatistics = new ArrayList<>();
+  private List<StatisticsHolder<?>> getMetadataStatistics(TupleReader reader, TupleMetadata columnMetadata) {
+    List<StatisticsHolder<?>> metadataStatistics = new ArrayList<>();
     String rgs = columnNamesOptions.rowGroupStart();
     String rgl = columnNamesOptions.rowGroupLength();
     for (ColumnMetadata column : columnMetadata) {
@@ -753,6 +745,8 @@
       case FILE: {
         childLocations.add(new Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString()));
       }
+      default:
+        break;
     }
 
     return childLocations;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 044d140..5c45150 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -170,6 +170,8 @@
                   new ArrayList<>(metadataToHandle.values()));
           return populateContainer(segments);
         }
+        default:
+          break;
       }
     }
     return outcome;
@@ -194,7 +196,6 @@
     }
   }
 
-  @SuppressWarnings("unchecked")
   private <T extends BaseMetadata & LocationProvider> VectorContainer writeMetadata(List<T> metadataList) {
     BaseMetadata firstElement = metadataList.iterator().next();
 
@@ -304,7 +305,6 @@
     return new ResultSetLoaderImpl(container.getAllocator(), options);
   }
 
-  @SuppressWarnings("unchecked")
   private <T extends BaseMetadata & LocationProvider> VectorContainer writeMetadataUsingBatchSchema(List<T> metadataList) {
     Preconditions.checkArgument(!metadataList.isEmpty(), "Metadata list shouldn't be empty.");
 
@@ -413,6 +413,7 @@
     container.setEmpty();
   }
 
+  @Override
   protected boolean setupNewSchema() {
     setupSchemaFromContainer(incoming.getContainer());
     return true;
@@ -473,6 +474,8 @@
           }
           break;
         }
+        default:
+          break;
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
index 9e4e925..70f3e16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
@@ -263,6 +263,7 @@
     @JsonProperty ("directories") List<DirectoryStatistics_v0> directoryStatistics;
     // Default constructor required for deserializer
     public Statistics_v0 () { }
+    @Override
     @JsonGetter ("directories")
     public List<DirectoryStatistics_v0> getDirectoryStatistics() {
       return directoryStatistics;
@@ -296,6 +297,7 @@
     List<DirectoryStatistics_v1> directoryStatistics;
     // Default constructor required for deserializer
     public Statistics_v1 () { }
+    @Override
     @JsonGetter ("directories")
     public List<DirectoryStatistics_v1> getDirectoryStatistics() {
       return directoryStatistics;
@@ -396,7 +398,7 @@
     }
     @JsonIgnore
     public void buildHistogram(byte[] tdigest_bytearray) {
-      int num_buckets = (int) Math.min(ndv, (long) DrillStatsTable.NUM_HISTOGRAM_BUCKETS);
+      int num_buckets = (int) Math.min(ndv, DrillStatsTable.NUM_HISTOGRAM_BUCKETS);
       this.histogram = HistogramUtils.buildHistogramFromTDigest(tdigest_bytearray, this.getType(),
               num_buckets, nonNullCount);
     }
@@ -479,9 +481,9 @@
    * @param statsProvider the source of statistics
    * @return list of {@link StatisticsKind} and statistics values
    */
-  public static List<StatisticsHolder> getEstimatedTableStats(DrillStatsTable statsProvider) {
+  public static List<StatisticsHolder<?>> getEstimatedTableStats(DrillStatsTable statsProvider) {
     if (statsProvider != null && statsProvider.isMaterialized()) {
-      List<StatisticsHolder> tableStatistics = Arrays.asList(
+      List<StatisticsHolder<?>> tableStatistics = Arrays.asList(
           new StatisticsHolder<>(statsProvider.getRowCount(), TableStatisticsKind.EST_ROW_COUNT),
           new StatisticsHolder<>(Boolean.TRUE, TableStatisticsKind.HAS_DESCRIPTIVE_STATISTICS));
       return tableStatistics;
@@ -496,9 +498,9 @@
    * @param fieldName     name of the columns whose statistics should be obtained
    * @return list of {@link StatisticsKind} and statistics values
    */
-  public static List<StatisticsHolder> getEstimatedColumnStats(DrillStatsTable statsProvider, SchemaPath fieldName) {
+  public static List<StatisticsHolder<?>> getEstimatedColumnStats(DrillStatsTable statsProvider, SchemaPath fieldName) {
     if (statsProvider != null && statsProvider.isMaterialized()) {
-      List<StatisticsHolder> statisticsValues = new ArrayList<>();
+      List<StatisticsHolder<?>> statisticsValues = new ArrayList<>();
       Double ndv = statsProvider.getNdv(fieldName);
       if (ndv != null) {
         statisticsValues.add(new StatisticsHolder<>(ndv, ColumnStatisticsKind.NDV));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
index 3053297..9f2901b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
@@ -163,7 +163,7 @@
       if (!groupKey.get(i)) {
         continue;
       }
-      ColumnStatistics columnStatistics = tableMetadata != null ?
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ?
           tableMetadata.getColumnStatistics(SchemaPath.getSimplePath(colName)) : null;
       Double ndv = columnStatistics != null ? ColumnStatisticsKind.NDV.getFrom(columnStatistics) : null;
       // Skip NDV, if not available
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
index 49d2129..9f3beca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
@@ -187,6 +187,7 @@
             try {
               RexVisitor<Void> visitor =
                       new RexVisitorImpl<Void>(true) {
+                        @Override
                         public Void visitCall(RexCall call) {
                           if (call.getKind() != SqlKind.EQUALS) {
                             throw new Util.FoundOne(call);
@@ -294,7 +295,7 @@
   private double computeEqualsSelectivity(TableMetadata tableMetadata, RexNode orPred, List<SchemaPath> fieldNames) {
     SchemaPath col = getColumn(orPred, fieldNames);
     if (col != null) {
-      ColumnStatistics columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null;
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null;
       Double ndv = columnStatistics != null ? ColumnStatisticsKind.NDV.getFrom(columnStatistics) : null;
       if (ndv != null) {
         return 1.00 / ndv;
@@ -307,7 +308,7 @@
   private double computeRangeSelectivity(TableMetadata tableMetadata, RexNode orPred, List<SchemaPath> fieldNames) {
     SchemaPath col = getColumn(orPred, fieldNames);
     if (col != null) {
-      ColumnStatistics columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null;
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null;
       Histogram histogram = columnStatistics != null ? ColumnStatisticsKind.HISTOGRAM.getFrom(columnStatistics) : null;
       if (histogram != null) {
         Double totalCount = ColumnStatisticsKind.ROWCOUNT.getFrom(columnStatistics);
@@ -324,7 +325,7 @@
   private double computeIsNotNullSelectivity(TableMetadata tableMetadata, RexNode orPred, List<SchemaPath> fieldNames) {
     SchemaPath col = getColumn(orPred, fieldNames);
     if (col != null) {
-      ColumnStatistics columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null;
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null;
       Double nonNullCount = columnStatistics != null ? ColumnStatisticsKind.NON_NULL_COUNT.getFrom(columnStatistics) : null;
       if (nonNullCount != null) {
         // Cap selectivity below Calcite Guess
@@ -423,6 +424,7 @@
     try {
       RexVisitor<Void> visitor =
           new RexVisitorImpl<Void>(true) {
+            @Override
             public Void visitCall(RexCall call) {
               for (RexNode child : call.getOperands()) {
                 child.accept(this);
@@ -430,6 +432,7 @@
               return super.visitCall(call);
             }
 
+            @Override
             public Void visitInputRef(RexInputRef inputRef) {
               throw new Util.FoundOne(inputRef);
             }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index a9bff98..ac2c3ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -229,7 +229,7 @@
    * @return group scan with applied filter expression
    */
   @Override
-  public AbstractGroupScanWithMetadata applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
+  public AbstractGroupScanWithMetadata<?> applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
     // Builds filter for pruning. If filter cannot be built, null should be returned.
     FilterPredicate<?> filterPredicate = getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
@@ -481,7 +481,7 @@
   protected abstract static class RowGroupScanFilterer<B extends RowGroupScanFilterer<B>> extends GroupScanWithMetadataFilterer<B> {
     protected Multimap<Path, RowGroupMetadata> rowGroups = LinkedListMultimap.create();
 
-    public RowGroupScanFilterer(AbstractGroupScanWithMetadata source) {
+    public RowGroupScanFilterer(AbstractGroupScanWithMetadata<?> source) {
       super(source);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index de555c2..74b4c17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -102,7 +102,7 @@
       Path prevRowGroupPath = null;
       Metadata_V4.ParquetTableMetadata_v4 tableMetadataV4 = null;
       Metadata_V4.ParquetFileAndRowCountMetadata fileMetadataV4 = null;
-      FilterPredicate filterPredicate = null;
+      FilterPredicate<?> filterPredicate = null;
       Set<SchemaPath> schemaPathsInExpr = null;
       Set<SchemaPath> columnsInExpr = null;
       // for debug/info logging
@@ -135,7 +135,7 @@
         Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
         TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
         we should add more information to the RowGroupInfo that will be populated upon the first read to
-        provide the reader with all of th file meta-data it needs
+        provide the reader with all of the file meta-data it needs
         These fields will be added to the constructor below
         */
 
@@ -190,7 +190,7 @@
 
             MetadataBase.RowGroupMetadata rowGroupMetadata = fileMetadataV4.getFileMetadata().getRowGroups().get(rowGroup.getRowGroupIndex());
 
-            Map<SchemaPath, ColumnStatistics> columnsStatistics = ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, rowGroupMetadata);
+            Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, rowGroupMetadata);
 
             try {
               Map<SchemaPath, TypeProtos.MajorType> intermediateColumns =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
index ccf5c15..b535e0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
@@ -207,7 +207,7 @@
   @Override
   public TableMetadata getTableMetadata() {
     if (tableMetadata == null) {
-      List<StatisticsHolder> tableStatistics = new ArrayList<>(DrillStatsTable.getEstimatedTableStats(statsTable));
+      List<StatisticsHolder<?>> tableStatistics = new ArrayList<>(DrillStatsTable.getEstimatedTableStats(statsTable));
       Map<SchemaPath, TypeProtos.MajorType> fields = ParquetTableMetadataUtils.resolveFields(parquetTableMetadata);
       Map<SchemaPath, TypeProtos.MajorType> intermediateFields = ParquetTableMetadataUtils.resolveIntermediateFields(parquetTableMetadata);
 
@@ -223,7 +223,7 @@
         });
       }
 
-      Map<SchemaPath, ColumnStatistics> columnsStatistics;
+      Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
       if (collectMetadata) {
         Collection<? extends BaseMetadata> metadata = getFilesMetadataMap().values();
         if (metadata.isEmpty()) {
@@ -243,18 +243,18 @@
         fields.forEach((columnPath, value) -> {
           long columnValueCount = getParquetGroupScanStatistics().getColumnValueCount(columnPath);
           // Adds statistics values itself if statistics is available
-          List<StatisticsHolder> stats = new ArrayList<>(DrillStatsTable.getEstimatedColumnStats(statsTable, columnPath));
+          List<StatisticsHolder<?>> stats = new ArrayList<>(DrillStatsTable.getEstimatedColumnStats(statsTable, columnPath));
           unhandledColumns.remove(columnPath);
 
           // adds statistics for partition columns
           stats.add(new StatisticsHolder<>(columnValueCount, TableStatisticsKind.ROW_COUNT));
           stats.add(new StatisticsHolder<>(getParquetGroupScanStatistics().getRowCount() - columnValueCount, ColumnStatisticsKind.NULLS_COUNT));
-          columnsStatistics.put(columnPath, new ColumnStatistics(stats, value.getMinorType()));
+          columnsStatistics.put(columnPath, new ColumnStatistics<>(stats, value.getMinorType()));
         });
 
         for (SchemaPath column : unhandledColumns) {
           columnsStatistics.put(column,
-              new ColumnStatistics(DrillStatsTable.getEstimatedColumnStats(statsTable, column)));
+              new ColumnStatistics<>(DrillStatsTable.getEstimatedColumnStats(statsTable, column)));
         }
       }
       MetadataInfo metadataInfo = MetadataInfo.builder().type(MetadataType.TABLE).build();
@@ -327,9 +327,9 @@
           partitionPaths.forEach((path, value) -> partitionsForValue.put(value, path));
 
           partitionsForValue.asMap().forEach((partitionKey, value) -> {
-            Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+            Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
 
-            List<StatisticsHolder> statistics = new ArrayList<>();
+            List<StatisticsHolder<?>> statistics = new ArrayList<>();
             partitionKey = partitionKey == NULL_VALUE ? null : partitionKey;
             statistics.add(new StatisticsHolder<>(partitionKey, ColumnStatisticsKind.MIN_VALUE));
             statistics.add(new StatisticsHolder<>(partitionKey, ColumnStatisticsKind.MAX_VALUE));
@@ -381,6 +381,7 @@
         .collect(Collectors.toList());
   }
 
+  @SuppressWarnings("unused")
   @Override
   public Map<Path, SegmentMetadata> getSegmentsMetadataMap() {
     if (segments == null) {
@@ -456,7 +457,7 @@
    */
   private static <T extends BaseMetadata & LocationProvider> SegmentMetadata combineToSegmentMetadata(Collection<T> metadataList,
       SchemaPath column, Set<Path> metadataLocations) {
-    List<StatisticsHolder> segmentStatistics =
+    List<StatisticsHolder<?>> segmentStatistics =
         Collections.singletonList(
             new StatisticsHolder<>(
                 TableStatisticsKind.ROW_COUNT.mergeStatistics(metadataList),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
index 022975e..ffde9c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
@@ -66,7 +66,7 @@
 
     RowGroupMetadata rowGroupMetadata = new ArrayList<>(ParquetTableMetadataUtils.getRowGroupsMetadata(footer).values()).get(rowGroupIndex);
     NonInterestingColumnsMetadata nonInterestingColumnsMetadata = ParquetTableMetadataUtils.getNonInterestingColumnsMeta(footer);
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = rowGroupMetadata.getColumnsStatistics();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = rowGroupMetadata.getColumnsStatistics();
 
     // Add column statistics of non-interesting columns if there are any
     columnsStatistics.putAll(nonInterestingColumnsMetadata.getColumnsStatistics());
@@ -78,7 +78,7 @@
         fragmentContext, fragmentContext.getFunctionRegistry(), new HashSet<>(schemaPathsInExpr));
   }
 
-  public static RowsMatch matches(LogicalExpression expr, Map<SchemaPath, ColumnStatistics> columnsStatistics, TupleMetadata schema,
+  public static RowsMatch matches(LogicalExpression expr, Map<SchemaPath, ColumnStatistics<?>> columnsStatistics, TupleMetadata schema,
                                   long rowCount, UdfUtilities udfUtilities, FunctionLookupContext functionImplementationRegistry,
                                   Set<SchemaPath> schemaPathsInExpr) {
     ErrorCollector errorCollector = new ErrorCollectorImpl();
@@ -95,21 +95,22 @@
     }
 
     Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
-    FilterPredicate parquetPredicate = FilterBuilder.buildFilterPredicate(
+    FilterPredicate<?> parquetPredicate = FilterBuilder.buildFilterPredicate(
         materializedFilter, constantBoundaries, udfUtilities, true);
 
     return matches(parquetPredicate, columnsStatistics, rowCount, schema, schemaPathsInExpr);
   }
 
   @SuppressWarnings("unchecked")
-  public static RowsMatch matches(FilterPredicate parquetPredicate,
-                                  Map<SchemaPath, ColumnStatistics> columnsStatistics,
+  public static <T extends Comparable<T>> RowsMatch matches(FilterPredicate<T> parquetPredicate,
+                                  Map<SchemaPath, ColumnStatistics<?>> columnsStatistics,
                                   long rowCount,
                                   TupleMetadata fileMetadata,
                                   Set<SchemaPath> schemaPathsInExpr) {
     RowsMatch rowsMatch = RowsMatch.SOME;
     if (parquetPredicate != null) {
-      StatisticsProvider rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount);
+      @SuppressWarnings("rawtypes")
+      StatisticsProvider<T> rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount);
       rowsMatch = parquetPredicate.matches(rangeExprEvaluator);
     }
     return rowsMatch == RowsMatch.ALL && isRepeated(schemaPathsInExpr, fileMetadata) ? RowsMatch.SOME : rowsMatch;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 051b436..5c0e2e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -65,10 +65,10 @@
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
 
-  private boolean usedMetadataCache; // false by default
+  private final boolean usedMetadataCache; // false by default
   // may change when filter push down / partition pruning is applied
-  private Path selectionRoot;
-  private Path cacheFileRoot;
+  private final Path selectionRoot;
+  private final Path cacheFileRoot;
 
   @SuppressWarnings("unused")
   @JsonCreator
@@ -198,6 +198,7 @@
     return formatPlugin.getStorageConfig();
   }
 
+  @Override
   @JsonProperty
   public Path getSelectionRoot() {
     return selectionRoot;
@@ -288,7 +289,7 @@
   }
 
   @Override
-  protected RowGroupScanFilterer getFilterer() {
+  protected RowGroupScanFilterer<?> getFilterer() {
     return new ParquetGroupScanFilterer(this);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
index 17a7c55..f15409d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
@@ -59,8 +59,7 @@
     collect(rowGroupInfos);
   }
 
-  @SuppressWarnings("unchecked")
-  public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) {
+  public ParquetGroupScanStatistics(ParquetGroupScanStatistics<T> that) {
     this.partitionValueMap = HashBasedTable.create(that.partitionValueMap);
     this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap);
     this.columnValueCounts = new HashMap<>(that.columnValueCounts);
@@ -101,9 +100,9 @@
     boolean first = true;
     for (T metadata : metadataList) {
       long localRowCount = TableStatisticsKind.ROW_COUNT.getValue(metadata);
-      for (Map.Entry<SchemaPath, ColumnStatistics> columnsStatistics : metadata.getColumnsStatistics().entrySet()) {
+      for (Map.Entry<SchemaPath, ColumnStatistics<?>> columnsStatistics : metadata.getColumnsStatistics().entrySet()) {
         SchemaPath schemaPath = columnsStatistics.getKey();
-        ColumnStatistics statistics = columnsStatistics.getValue();
+        ColumnStatistics<?> statistics = columnsStatistics.getValue();
         MutableLong emptyCount = new MutableLong();
         MutableLong previousCount = columnValueCounts.putIfAbsent(schemaPath, emptyCount);
         if (previousCount == null) {
@@ -164,7 +163,7 @@
    * @param rowCount         row count
    * @return whether column is a potential partition column
    */
-  private boolean checkForPartitionColumn(ColumnStatistics columnStatistics,
+  private boolean checkForPartitionColumn(ColumnStatistics<?> columnStatistics,
                                           boolean first,
                                           long rowCount,
                                           TypeProtos.MajorType type,
@@ -202,11 +201,11 @@
    * @param rowCount         rows count in column chunk
    * @return true if column has single value
    */
-  private boolean hasSingleValue(ColumnStatistics columnStatistics, long rowCount) {
+  private boolean hasSingleValue(ColumnStatistics<?> columnStatistics, long rowCount) {
     return columnStatistics != null && isSingleVal(columnStatistics, rowCount);
   }
 
-  private boolean isSingleVal(ColumnStatistics columnStatistics, long rowCount) {
+  private boolean isSingleVal(ColumnStatistics<?> columnStatistics, long rowCount) {
     Long numNulls = ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStatistics);
     if (numNulls != null && numNulls != Statistic.NO_COLUMN_STATS) {
       Object min = columnStatistics.get(ColumnStatisticsKind.MIN_VALUE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index bd2e119..219359b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -138,7 +138,7 @@
 
     final List<RexNode> qualifiedPredList = new ArrayList<>();
 
-    // list of predicates which cannot be converted to parquet filter predicate
+    // list of predicates which cannot be converted to Parquet filter predicate
     List<RexNode> nonConvertedPredList = new ArrayList<>();
 
     for (RexNode pred : predList) {
@@ -147,7 +147,7 @@
             new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred);
 
         // checks whether predicate may be used for filter pushdown
-        FilterPredicate parquetFilterPredicate =
+        FilterPredicate<?> parquetFilterPredicate =
             groupScan.getFilterPredicate(drillPredicate,
                 optimizerContext,
                 optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions(), false);
@@ -171,11 +171,11 @@
     LogicalExpression conditionExp = DrillOptiq.toDrill(
         new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);
 
-    // Default - pass the original filter expr to (potentialy) be used at run-time
+    // Default - pass the original filter expr to (potentially) be used at run-time
     groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later may remove or set to another filter (see below)
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    AbstractGroupScanWithMetadata newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
+    AbstractGroupScanWithMetadata<?> newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
         optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
     if (timer != null) {
       logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index 718f3c0..68254d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -80,7 +80,7 @@
 @SuppressWarnings("WeakerAccess")
 public class ParquetTableMetadataUtils {
 
-  static final List<CollectableColumnStatisticsKind> PARQUET_COLUMN_STATISTICS =
+  static final List<CollectableColumnStatisticsKind<?>> PARQUET_COLUMN_STATISTICS =
           ImmutableList.of(
               ColumnStatisticsKind.MAX_VALUE,
               ColumnStatisticsKind.MIN_VALUE,
@@ -102,8 +102,8 @@
    * @param supportsFileImplicitColumns whether implicit columns are supported
    * @return map with added statistics for implicit and partition (dir) columns
    */
-  public static Map<SchemaPath, ColumnStatistics> addImplicitColumnsStatistics(
-      Map<SchemaPath, ColumnStatistics> columnsStatistics, List<SchemaPath> columns,
+  public static Map<SchemaPath, ColumnStatistics<?>> addImplicitColumnsStatistics(
+      Map<SchemaPath, ColumnStatistics<?>> columnsStatistics, List<SchemaPath> columns,
       List<String> partitionValues, OptionManager optionManager, Path location, boolean supportsFileImplicitColumns) {
     ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
 
@@ -157,11 +157,11 @@
    */
   public static RowGroupMetadata getRowGroupMetadata(MetadataBase.ParquetTableMetadataBase tableMetadata,
       MetadataBase.RowGroupMetadata rowGroupMetadata, int rgIndexInFile, Path location) {
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = getRowGroupColumnStatistics(tableMetadata, rowGroupMetadata);
-    List<StatisticsHolder> rowGroupStatistics = new ArrayList<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = getRowGroupColumnStatistics(tableMetadata, rowGroupMetadata);
+    List<StatisticsHolder<?>> rowGroupStatistics = new ArrayList<>();
     rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getRowCount(), TableStatisticsKind.ROW_COUNT));
-    rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getStart(), new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
-    rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getLength(), new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+    rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getStart(), new BaseStatisticsKind<>(ExactStatisticsConstants.START, true)));
+    rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getLength(), new BaseStatisticsKind<>(ExactStatisticsConstants.LENGTH, true)));
 
     Map<SchemaPath, TypeProtos.MajorType> columns = getRowGroupFields(tableMetadata, rowGroupMetadata);
     Map<SchemaPath, TypeProtos.MajorType> intermediateColumns = getIntermediateFields(tableMetadata, rowGroupMetadata);
@@ -195,7 +195,7 @@
     if (rowGroups.isEmpty()) {
       return null;
     }
-    List<StatisticsHolder> fileStatistics = new ArrayList<>();
+    List<StatisticsHolder<?>> fileStatistics = new ArrayList<>();
     fileStatistics.add(new StatisticsHolder<>(TableStatisticsKind.ROW_COUNT.mergeStatistics(rowGroups), TableStatisticsKind.ROW_COUNT));
 
     RowGroupMetadata rowGroupMetadata = rowGroups.iterator().next();
@@ -255,10 +255,10 @@
    * @param rowGroupMetadata metadata to convert
    * @return map with converted row group metadata
    */
-  public static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
+  public static Map<SchemaPath, ColumnStatistics<?>> getRowGroupColumnStatistics(
       MetadataBase.ParquetTableMetadataBase tableMetadata, MetadataBase.RowGroupMetadata rowGroupMetadata) {
 
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
 
     for (MetadataBase.ColumnMetadata column : rowGroupMetadata.getColumns()) {
       SchemaPath colPath = SchemaPath.getCompoundPath(column.getName());
@@ -271,7 +271,7 @@
       OriginalType originalType = getOriginalType(tableMetadata, column);
       TypeProtos.MinorType type = ParquetReaderUtility.getMinorType(primitiveType, originalType);
 
-      List<StatisticsHolder> statistics = new ArrayList<>();
+      List<StatisticsHolder<?>> statistics = new ArrayList<>();
       statistics.add(new StatisticsHolder<>(getValue(column.getMinValue(), primitiveType, originalType), ColumnStatisticsKind.MIN_VALUE));
       statistics.add(new StatisticsHolder<>(getValue(column.getMaxValue(), primitiveType, originalType), ColumnStatisticsKind.MAX_VALUE));
       statistics.add(new StatisticsHolder<>(nulls, ColumnStatisticsKind.NULLS_COUNT));
@@ -286,7 +286,7 @@
    * @return returns non-interesting columns metadata
    */
   public static NonInterestingColumnsMetadata getNonInterestingColumnsMeta(MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
     if (parquetTableMetadata instanceof Metadata_V4.ParquetTableMetadata_v4) {
       Map<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4> columnTypeInfoMap =
               ((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap();
@@ -298,7 +298,7 @@
       for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata : columnTypeInfoMap.values()) {
         if (!columnTypeMetadata.isInteresting) {
           SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
-          List<StatisticsHolder> statistics = new ArrayList<>();
+          List<StatisticsHolder<?>> statistics = new ArrayList<>();
           statistics.add(new StatisticsHolder<>(Statistic.NO_COLUMN_STATS, ColumnStatisticsKind.NULLS_COUNT));
           PrimitiveType.PrimitiveTypeName primitiveType = columnTypeMetadata.primitiveType;
           OriginalType originalType = columnTypeMetadata.originalType;
@@ -371,7 +371,8 @@
     } else if (value instanceof String) { // value is obtained from metadata cache v2+
       return ((String) value).getBytes();
     } else if (value instanceof Map) { // value is obtained from metadata cache v1
-      String bytesString = (String) ((Map) value).get("bytes");
+      @SuppressWarnings("unchecked")
+      String bytesString = ((Map<String,String>) value).get("bytes");
       if (bytesString != null) {
         return bytesString.getBytes();
       }
@@ -647,7 +648,7 @@
    * @param statistics source of column statistics
    * @return map with schema path and {@link ColumnStatistics}
    */
-  public static Map<SchemaPath, ColumnStatistics> getColumnStatistics(TupleMetadata schema, DrillStatsTable statistics) {
+  public static Map<SchemaPath, ColumnStatistics<?>> getColumnStatistics(TupleMetadata schema, DrillStatsTable statistics) {
     List<SchemaPath> schemaPaths = SchemaUtil.getSchemaPaths(schema);
 
     return schemaPaths.stream()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
index 6ce32e4..c89472b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
@@ -209,21 +209,21 @@
     schema.addColumn(varcharCol);
     schema.addColumn(timestampColumn);
 
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
     columnsStatistics.put(SchemaPath.parseFromString("varchar_col"),
-      new ColumnStatistics(Arrays.asList(
+      new ColumnStatistics<>(Arrays.asList(
         new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE))));
     columnsStatistics.put(SchemaPath.parseFromString("struct_col.nested_struct.nested_struct_varchar"),
-      new ColumnStatistics(Arrays.asList(
+      new ColumnStatistics<>(Arrays.asList(
         new StatisticsHolder<>("bbb", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("ccc", ColumnStatisticsKind.MAX_VALUE))));
     columnsStatistics.put(SchemaPath.parseFromString("bigint_col"),
-      new ColumnStatistics(Arrays.asList(
+      new ColumnStatistics<>(Arrays.asList(
         new StatisticsHolder<>(100L, ColumnStatisticsKind.NULLS_COUNT),
         new StatisticsHolder<>(10.5D, ColumnStatisticsKind.NDV))));
     columnsStatistics.put(SchemaPath.parseFromString("struct_col.struct_bigint"),
-      new ColumnStatistics(Collections.singletonList(
+      new ColumnStatistics<>(Collections.singletonList(
         new StatisticsHolder<>(10.5D, ColumnStatisticsKind.NON_NULL_COUNT))));
 
     ZonedDateTime currentTime = currentUtcTime();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index ff2108b..b8bf889 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -81,7 +81,6 @@
 import static org.junit.Assert.assertTrue;
 
 @Category({SlowTest.class, MetastoreTest.class})
-@SuppressWarnings({"rawtypes", "unchecked"})
 public class TestMetastoreCommands extends ClusterTest {
 
   private static final TupleMetadata SCHEMA = new SchemaBuilder()
@@ -98,7 +97,8 @@
       .add("o_comment", TypeProtos.MinorType.VARCHAR)
       .build();
 
-  private static final Map<SchemaPath, ColumnStatistics> TABLE_COLUMN_STATISTICS = ImmutableMap.<SchemaPath, ColumnStatistics>builder()
+  private static final Map<SchemaPath, ColumnStatistics<?>> TABLE_COLUMN_STATISTICS =
+      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
       .put(SchemaPath.getSimplePath("o_shippriority"),
           getColumnStatistics(0, 0, 120L, TypeProtos.MinorType.INT))
       .put(SchemaPath.getSimplePath("o_orderstatus"),
@@ -124,7 +124,8 @@
           getColumnStatistics(757382400000L, 850953600000L, 120L, TypeProtos.MinorType.DATE))
       .build();
 
-  private static final Map<SchemaPath, ColumnStatistics> DIR0_1994_SEGMENT_COLUMN_STATISTICS = ImmutableMap.<SchemaPath, ColumnStatistics>builder()
+  private static final Map<SchemaPath, ColumnStatistics<?>> DIR0_1994_SEGMENT_COLUMN_STATISTICS =
+      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
       .put(SchemaPath.getSimplePath("o_shippriority"),
           getColumnStatistics(0, 0, 40L, TypeProtos.MinorType.INT))
       .put(SchemaPath.getSimplePath("o_orderstatus"),
@@ -150,7 +151,8 @@
           getColumnStatistics(757382400000L, 788140800000L, 40L, TypeProtos.MinorType.DATE))
       .build();
 
-  private static final Map<SchemaPath, ColumnStatistics> DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS = ImmutableMap.<SchemaPath, ColumnStatistics>builder()
+  private static final Map<SchemaPath, ColumnStatistics<?>> DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS =
+      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
       .put(SchemaPath.getSimplePath("o_shippriority"),
           getColumnStatistics(0, 0, 10L, TypeProtos.MinorType.INT))
       .put(SchemaPath.getSimplePath("o_orderstatus"),
@@ -594,7 +596,7 @@
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath dir0Path = SchemaPath.getSimplePath("dir0");
@@ -646,7 +648,7 @@
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new HashMap<>();
 
     SchemaPath dir0Path = SchemaPath.getSimplePath("dir0");
     SchemaPath dir1Path = SchemaPath.getSimplePath("dir1");
@@ -696,7 +698,7 @@
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath orderDatePath = SchemaPath.getSimplePath("o_orderdate");
@@ -767,7 +769,7 @@
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath orderDatePath = SchemaPath.getSimplePath("o_orderdate");
@@ -844,7 +846,7 @@
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath orderDatePath = SchemaPath.getSimplePath("o_orderdate");
@@ -977,7 +979,7 @@
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     // updates statistics values due to new segment
-    Map<SchemaPath, ColumnStatistics> updatedStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> updatedStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
     updatedStatistics.replaceAll((logicalExpressions, columnStatistics) ->
         columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
@@ -1068,15 +1070,15 @@
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     // updates statistics values due to new segment
-    Map<SchemaPath, ColumnStatistics> updatedStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> updatedStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
     updatedStatistics.replaceAll((logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(130L, TableStatisticsKind.ROW_COUNT),
                 new StatisticsHolder<>(130L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
 
     updatedStatistics.computeIfPresent(SchemaPath.getSimplePath("dir1"), (logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Collections.singletonList(new StatisticsHolder<>("Q5", ColumnStatisticsKind.MAX_VALUE)))));
 
     BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
@@ -1152,9 +1154,9 @@
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     // updates statistics values due to new segment
-    Map<SchemaPath, ColumnStatistics> updatedStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> updatedStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
     updatedStatistics.replaceAll((logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(130L, TableStatisticsKind.ROW_COUNT),
                 new StatisticsHolder<>(130L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
@@ -1625,15 +1627,15 @@
           .basicRequests()
           .tableMetadata(tableInfo);
 
-      Map<SchemaPath, ColumnStatistics> tableColumnStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
+      Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
       tableColumnStatistics.computeIfPresent(SchemaPath.getSimplePath("o_clerk"),
           (logicalExpressions, columnStatistics) ->
-              columnStatistics.cloneWith(new ColumnStatistics(
+              columnStatistics.cloneWith(new ColumnStatistics<>(
                   Collections.singletonList(new StatisticsHolder<>("Clerk#000000006", ColumnStatisticsKind.MIN_VALUE)))));
 
       tableColumnStatistics.computeIfPresent(SchemaPath.getSimplePath("o_totalprice"),
           (logicalExpressions, columnStatistics) ->
-              columnStatistics.cloneWith(new ColumnStatistics(
+              columnStatistics.cloneWith(new ColumnStatistics<>(
                   Collections.singletonList(new StatisticsHolder<>(328207.15, ColumnStatisticsKind.MAX_VALUE)))));
 
       BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
@@ -1779,7 +1781,7 @@
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> tableColumnStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics = new HashMap<>(TABLE_COLUMN_STATISTICS);
     tableColumnStatistics.remove(SchemaPath.getSimplePath("dir0"));
     tableColumnStatistics.remove(SchemaPath.getSimplePath("dir1"));
 
@@ -1800,7 +1802,7 @@
             getColumnStatistics(757382400000L, 764640000000L, 120L, TypeProtos.MinorType.DATE));
 
     tableColumnStatistics.replaceAll((logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(10L, TableStatisticsKind.ROW_COUNT),
                 new StatisticsHolder<>(10L, ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
@@ -1905,7 +1907,7 @@
             .resumeSchema()
         .build();
 
-    Map<SchemaPath, ColumnStatistics> columnStatistics = ImmutableMap.<SchemaPath, ColumnStatistics>builder()
+    Map<SchemaPath, ColumnStatistics<?>> columnStatistics = ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
         .put(SchemaPath.getCompoundPath("user_info", "state"),
             getColumnStatistics("ct", "nj", 5L, TypeProtos.MinorType.VARCHAR))
         .put(SchemaPath.getSimplePath("date"),
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index f198d3c..c44ef23 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -105,35 +105,6 @@
     props.fields.put("S_COMMENT", new FieldInfo("binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY, props));
   }
 
-  private static abstract class ValueProducer {
-
-    public abstract void reset();
-    public abstract Object getValue();
-  }
-
-  private static class ValueRepeaterProducer extends ValueProducer {
-
-    WrapAroundCounter position;
-    Object[] values;
-
-    public ValueRepeaterProducer(Object[] values) {
-      this.values = values;
-      position = new WrapAroundCounter(values.length);
-    }
-
-    @Override
-    public void reset() {
-      position.reset();
-    }
-
-    @Override
-    public Object getValue() {
-      Object ret = values[position.val];
-      position.increment();
-      return ret;
-    }
-  }
-
   public static void generateParquetFile(String filename, ParquetTestProperties props) throws Exception {
 
     int currentBooleanByte = 0;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index 86a7bf9..e51e311 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -608,13 +608,13 @@
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MIN_VALUE)).thenReturn(false); // min false
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MAX_VALUE)).thenReturn(true); // max true
     Mockito.when(booleanStatistics.getValueComparator()).thenReturn(Comparator.nullsFirst(Comparator.naturalOrder())); // comparator
-    IsPredicate isTrue = (IsPredicate) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
+    IsPredicate<Boolean> isTrue = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
     assertEquals(RowsMatch.SOME, isTrue.matches(re));
-    IsPredicate isFalse = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
+    IsPredicate<Boolean> isFalse = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
     assertEquals(RowsMatch.SOME, isFalse.matches(re));
-    IsPredicate isNotTrue = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
+    IsPredicate<Boolean> isNotTrue = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
     assertEquals(RowsMatch.SOME, isNotTrue.matches(re));
-    IsPredicate isNotFalse = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
+    IsPredicate<Boolean> isNotFalse = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
     assertEquals(RowsMatch.SOME, isNotFalse.matches(re));
   }
 
@@ -631,13 +631,13 @@
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MIN_VALUE)).thenReturn(false); // min false
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MAX_VALUE)).thenReturn(false); // max false
     Mockito.when(booleanStatistics.getValueComparator()).thenReturn(Comparator.nullsFirst(Comparator.naturalOrder())); // comparator
-    IsPredicate isTrue = (IsPredicate) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
+    IsPredicate<Boolean> isTrue = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
     assertEquals(RowsMatch.NONE, isTrue.matches(re));
-    IsPredicate isFalse = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
+    IsPredicate<Boolean> isFalse = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
     assertEquals(RowsMatch.ALL, isFalse.matches(re));
-    IsPredicate isNotTrue = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
+    IsPredicate<Boolean> isNotTrue = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
     assertEquals(RowsMatch.ALL, isNotTrue.matches(re));
-    IsPredicate isNotFalse = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
+    IsPredicate<Boolean> isNotFalse = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
     assertEquals(RowsMatch.NONE, isNotFalse.matches(re));
   }
 
@@ -653,13 +653,13 @@
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.NULLS_COUNT)).thenReturn(0L); // no nulls
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MIN_VALUE)).thenReturn(true); // min false
     Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MAX_VALUE)).thenReturn(true); // max false
-    IsPredicate isTrue = (IsPredicate) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
+    IsPredicate<Boolean> isTrue = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
     assertEquals(RowsMatch.ALL, isTrue.matches(re));
-    IsPredicate isFalse = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
+    IsPredicate<Boolean> isFalse = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
     assertEquals(RowsMatch.NONE, isFalse.matches(re));
-    IsPredicate isNotTrue = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
+    IsPredicate<Boolean> isNotTrue = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
     assertEquals(RowsMatch.NONE, isNotTrue.matches(re));
-    IsPredicate isNotFalse = (IsPredicate)  IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
+    IsPredicate<Boolean> isNotFalse = (IsPredicate<Boolean>) IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
     assertEquals(RowsMatch.ALL, isNotFalse.matches(re));
   }
 
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
index 863ba9d..d25fd2e 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
@@ -99,6 +99,9 @@
         case PARTITION:
           partitions.add(PartitionMetadata.builder().metadataUnit(unit).build());
           break;
+        default:
+          // Ignore unsupported type
+          break;
       }
     }
     return new MetadataHolder(tables, segments, files, rowGroups, partitions);
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
index 7863a4d..1d6ed84 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
@@ -511,6 +511,8 @@
               rowGroupColumns.add(name);
               partitionColumns.add(name);
               break;
+            default:
+              throw new IllegalStateException(scope.name());
           }
         }
 
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
index 96516df..f94d128 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
@@ -18,9 +18,11 @@
 package org.apache.drill.metastore.exceptions;
 
 /**
- * Drill Metastore runtime exception to indicate that exception was caused by Drill Metastore.
- * Drill Metastore implementations can use or extend it to throw Metastore specific exceptions.
+ * Drill Metastore runtime exception to indicate that exception was caused by
+ * Drill Metastore. Drill Metastore implementations can use or extend it to
+ * throw Metastore specific exceptions.
  */
+@SuppressWarnings("serial")
 public class MetastoreException extends RuntimeException {
 
   public MetastoreException(String message, Throwable cause) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
index ae6c547..c24b54f 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
@@ -42,8 +42,8 @@
   protected final TableInfo tableInfo;
   protected final MetadataInfo metadataInfo;
   protected final TupleMetadata schema;
-  protected final Map<SchemaPath, ColumnStatistics> columnsStatistics;
-  protected final Map<String, StatisticsHolder> metadataStatistics;
+  protected final Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
+  protected final Map<String, StatisticsHolder<?>> metadataStatistics;
   protected final long lastModifiedTime;
 
   protected <T extends BaseMetadataBuilder<T>> BaseMetadata(BaseMetadataBuilder<T> builder) {
@@ -60,12 +60,12 @@
   }
 
   @Override
-  public Map<SchemaPath, ColumnStatistics> getColumnsStatistics() {
+  public Map<SchemaPath, ColumnStatistics<?>> getColumnsStatistics() {
     return columnsStatistics;
   }
 
   @Override
-  public ColumnStatistics getColumnStatistics(SchemaPath columnName) {
+  public ColumnStatistics<?> getColumnStatistics(SchemaPath columnName) {
     return columnsStatistics.get(columnName);
   }
 
@@ -77,20 +77,20 @@
   @Override
   @SuppressWarnings("unchecked")
   public <V> V getStatistic(StatisticsKind<V> statisticsKind) {
-    StatisticsHolder<V> statisticsHolder = metadataStatistics.get(statisticsKind.getName());
+    StatisticsHolder<V> statisticsHolder = (StatisticsHolder<V>)
+        metadataStatistics.get(statisticsKind.getName());
     return statisticsHolder != null ? statisticsHolder.getStatisticsValue() : null;
   }
 
   @Override
-  public boolean containsExactStatistics(StatisticsKind statisticsKind) {
-    StatisticsHolder statisticsHolder = metadataStatistics.get(statisticsKind.getName());
+  public boolean containsExactStatistics(StatisticsKind<?> statisticsKind) {
+    StatisticsHolder<?> statisticsHolder = metadataStatistics.get(statisticsKind.getName());
     return statisticsHolder != null && statisticsHolder.getStatisticsKind().isExact();
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public <V> V getStatisticsForColumn(SchemaPath columnName, StatisticsKind<V> statisticsKind) {
-    return (V) columnsStatistics.get(columnName).get(statisticsKind);
+    return columnsStatistics.get(columnName).get(statisticsKind);
   }
 
   @Override
@@ -172,14 +172,14 @@
 
   protected abstract void toMetadataUnitBuilder(TableMetadataUnit.Builder builder);
 
-  protected abstract BaseMetadataBuilder toBuilder();
+  protected abstract BaseMetadataBuilder<?> toBuilder();
 
   public static abstract class BaseMetadataBuilder<T extends BaseMetadataBuilder<T>> {
     protected TableInfo tableInfo;
     protected MetadataInfo metadataInfo;
     protected TupleMetadata schema;
-    protected Map<SchemaPath, ColumnStatistics> columnsStatistics;
-    protected Collection<StatisticsHolder> metadataStatistics;
+    protected Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
+    protected Collection<StatisticsHolder<?>> metadataStatistics;
     protected long lastModifiedTime = UNDEFINED_TIME;
 
     public T tableInfo(TableInfo tableInfo) {
@@ -197,12 +197,12 @@
       return self();
     }
 
-    public T columnsStatistics(Map<SchemaPath, ColumnStatistics> columnsStatistics) {
+    public T columnsStatistics(Map<SchemaPath, ColumnStatistics<?>> columnsStatistics) {
       this.columnsStatistics = columnsStatistics;
       return self();
     }
 
-    public T metadataStatistics(Collection<StatisticsHolder> metadataStatistics) {
+    public T metadataStatistics(Collection<StatisticsHolder<?>> metadataStatistics) {
       this.metadataStatistics = metadataStatistics;
       return self();
     }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
index 3b6922d..839cd51 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
@@ -102,9 +102,8 @@
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public BaseTableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics> columnStatistics, List<StatisticsHolder> tableStatistics) {
-    Map<String, StatisticsHolder> mergedTableStatistics = new HashMap<>(this.metadataStatistics);
+  public BaseTableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics<?>> columnStatistics, List<StatisticsHolder<?>> tableStatistics) {
+    Map<String, StatisticsHolder<?>> mergedTableStatistics = new HashMap<>(this.metadataStatistics);
 
     // overrides statistics value for the case when new statistics is exact or existing one was estimated
     tableStatistics.stream()
@@ -113,12 +112,12 @@
               || !this.metadataStatistics.get(statisticsHolder.getStatisticsKind().getName()).getStatisticsKind().isExact())
         .forEach(statisticsHolder -> mergedTableStatistics.put(statisticsHolder.getStatisticsKind().getName(), statisticsHolder));
 
-    Map<SchemaPath, ColumnStatistics> newColumnsStatistics = new HashMap<>(this.columnsStatistics);
+    Map<SchemaPath, ColumnStatistics<?>> newColumnsStatistics = new HashMap<>(this.columnsStatistics);
     this.columnsStatistics.forEach(
         (columnName, value) -> {
-          ColumnStatistics sourceStatistics = columnStatistics.get(columnName);
+          ColumnStatistics<?> sourceStatistics = columnStatistics.get(columnName);
           if (sourceStatistics != null) {
-            newColumnsStatistics.put(columnName, value.cloneWith(sourceStatistics));
+            newColumnsStatistics.put(columnName, value.genericClone(sourceStatistics));
           }
         });
 
@@ -148,6 +147,7 @@
     }
   }
 
+  @Override
   public BaseTableMetadataBuilder toBuilder() {
     return builder()
         .tableInfo(tableInfo)
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
index 6b40d95..3d99ae7 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
@@ -37,7 +37,7 @@
    *
    * @return statistics stored in current metadata
    */
-  Map<SchemaPath, ColumnStatistics> getColumnsStatistics();
+  Map<SchemaPath, ColumnStatistics<?>> getColumnsStatistics();
 
   /**
    * Returns statistics for specified column stored in current metadata.
@@ -45,7 +45,7 @@
    * @param columnName column whose statistics should be returned
    * @return statistics for specified column
    */
-  ColumnStatistics getColumnStatistics(SchemaPath columnName);
+  ColumnStatistics<?> getColumnStatistics(SchemaPath columnName);
 
   /**
    * Returns schema stored in current metadata represented as
@@ -70,7 +70,7 @@
    * @param statisticsKind statistics kind to check
    * @return true if value which corresponds to the specified statistics kind is exact
    */
-  boolean containsExactStatistics(StatisticsKind statisticsKind);
+  boolean containsExactStatistics(StatisticsKind<?> statisticsKind);
 
   /**
    * Returns value of column statistics which corresponds to specified {@link StatisticsKind}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
index 6944ab0..26cf665 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
@@ -34,19 +34,19 @@
  * to NonInterestingColumnsMetadata.
  */
 public class NonInterestingColumnsMetadata implements Metadata {
-  private final Map<SchemaPath, ColumnStatistics> columnsStatistics;
+  private final Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
 
-  public NonInterestingColumnsMetadata(Map<SchemaPath, ColumnStatistics> columnsStatistics) {
+  public NonInterestingColumnsMetadata(Map<SchemaPath, ColumnStatistics<?>> columnsStatistics) {
     this.columnsStatistics = columnsStatistics;
   }
 
   @Override
-  public Map<SchemaPath, ColumnStatistics> getColumnsStatistics() {
+  public Map<SchemaPath, ColumnStatistics<?>> getColumnsStatistics() {
     return columnsStatistics;
   }
 
   @Override
-  public ColumnStatistics getColumnStatistics(SchemaPath columnName) {
+  public ColumnStatistics<?> getColumnStatistics(SchemaPath columnName) {
     return columnsStatistics.get(columnName);
   }
 
@@ -61,14 +61,13 @@
   }
 
   @Override
-  public boolean containsExactStatistics(StatisticsKind statisticsKind) {
+  public boolean containsExactStatistics(StatisticsKind<?> statisticsKind) {
     return false;
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public <V> V getStatisticsForColumn(SchemaPath columnName, StatisticsKind<V> statisticsKind) {
-    return (V) columnsStatistics.get(columnName).get(statisticsKind);
+    return columnsStatistics.get(columnName).get(statisticsKind);
   }
 
   @Override
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
index 517d232..d4704fc 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
@@ -32,6 +32,7 @@
 
   Path getLocation();
   long getLastModifiedTime();
-  TableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics> columnStatistics, List<StatisticsHolder> tableStatistics);
+  TableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics<?>> columnStatistics,
+      List<StatisticsHolder<?>> tableStatistics);
   List<SchemaPath> getInterestingColumns();
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
index 00b1f1d..8c4a2a8 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
@@ -32,5 +32,5 @@
    * @param statistics list of {@link ColumnStatistics} instances to be collected
    * @return column statistics value received by collecting specified {@link ColumnStatistics}
    */
-  Object mergeStatistics(List<? extends ColumnStatistics> statistics);
+  Object mergeStatistics(List<? extends ColumnStatistics<?>> statistics);
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
index 0cb33db..5fbb85a 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
@@ -74,13 +74,13 @@
       .registerModule(new JodaModule())
       .readerFor(ColumnStatistics.class);
 
-  private final Map<String, StatisticsHolder> statistics;
+  private final Map<String, StatisticsHolder<?>> statistics;
   private final Comparator<T> valueComparator;
   private final TypeProtos.MinorType type;
 
   @JsonCreator
   @SuppressWarnings("unchecked")
-  public ColumnStatistics(@JsonProperty("statistics") Collection<StatisticsHolder> statistics,
+  public ColumnStatistics(@JsonProperty("statistics") Collection<StatisticsHolder<?>> statistics,
                           @JsonProperty("type") TypeProtos.MinorType type) {
     this.type = type;
     this.valueComparator = type != null
@@ -93,7 +93,7 @@
             (a, b) -> a.getStatisticsKind().isExact() ? a : b));
   }
 
-  public ColumnStatistics(Collection<StatisticsHolder> statistics) {
+  public ColumnStatistics(Collection<StatisticsHolder<?>> statistics) {
     this(statistics, TypeProtos.MinorType.INT);
   }
 
@@ -105,7 +105,8 @@
    */
   @SuppressWarnings("unchecked")
   public <V> V get(StatisticsKind<V> statisticsKind) {
-    StatisticsHolder<V> statisticsHolder = statistics.get(statisticsKind.getName());
+    StatisticsHolder<V> statisticsHolder = (StatisticsHolder<V>)
+        statistics.get(statisticsKind.getName());
     if (statisticsHolder != null) {
       return statisticsHolder.getStatisticsValue();
     }
@@ -118,7 +119,7 @@
    * @param statisticsKind statistics kind to check
    * @return true if specified statistics kind is set
    */
-  public boolean contains(StatisticsKind statisticsKind) {
+  public boolean contains(StatisticsKind<?> statisticsKind) {
     return statistics.containsKey(statisticsKind.getName());
   }
 
@@ -129,8 +130,8 @@
    * @param statisticsKind statistics kind to check
    * @return true if value which corresponds to the specified statistics kind is exact
    */
-  public boolean containsExact(StatisticsKind statisticsKind) {
-    StatisticsHolder statisticsHolder = statistics.get(statisticsKind.getName());
+  public boolean containsExact(StatisticsKind<?> statisticsKind) {
+    StatisticsHolder<?> statisticsHolder = statistics.get(statisticsKind.getName());
     if (statisticsHolder != null) {
       return statisticsHolder.getStatisticsKind().isExact();
     }
@@ -153,10 +154,10 @@
    * @return new {@link ColumnStatistics} instance with overridden statistics
    */
   public ColumnStatistics<T> cloneWith(ColumnStatistics<T> sourceStatistics) {
-    Map<String, StatisticsHolder> newStats = new HashMap<>(this.statistics);
+    Map<String, StatisticsHolder<?>> newStats = new HashMap<>(this.statistics);
     sourceStatistics.statistics.values().forEach(statisticsHolder -> {
-      StatisticsKind statisticsKindToMerge = statisticsHolder.getStatisticsKind();
-      StatisticsHolder oldStatistics = statistics.get(statisticsKindToMerge.getName());
+      StatisticsKind<?> statisticsKindToMerge = statisticsHolder.getStatisticsKind();
+      StatisticsHolder<?> oldStatistics = statistics.get(statisticsKindToMerge.getName());
       if (oldStatistics == null
           || !oldStatistics.getStatisticsKind().isExact()
           || statisticsKindToMerge.isExact()) {
@@ -167,9 +168,14 @@
     return new ColumnStatistics<>(newStats.values(), type);
   }
 
+  @SuppressWarnings("unchecked")
+  public ColumnStatistics<T> genericClone(ColumnStatistics<?> sourceStatistics) {
+    return cloneWith((ColumnStatistics<T>) sourceStatistics);
+  }
+
   @JsonProperty("statistics")
   @SuppressWarnings("unused") // used for serialization
-  private Collection<StatisticsHolder> getAll() {
+  private Collection<StatisticsHolder<?>> getAll() {
     return statistics.values();
   }
 
@@ -212,7 +218,7 @@
         .toString();
   }
 
-  public static ColumnStatistics of(String columnStatistics) {
+  public static ColumnStatistics<?> of(String columnStatistics) {
     try {
       return OBJECT_READER.readValue(columnStatistics);
     } catch (IOException e) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
index 613b602..5cfed03 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
@@ -19,6 +19,7 @@
 
 import org.apache.drill.metastore.metadata.BaseMetadata;
 
+import java.util.Comparator;
 import java.util.List;
 
 /**
@@ -33,7 +34,7 @@
   public static final ColumnStatisticsKind<Long> NULLS_COUNT =
       new ColumnStatisticsKind<Long>(ExactStatisticsConstants.NULLS_COUNT, true) {
         @Override
-        public Long mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+        public Long mergeStatistics(List<? extends ColumnStatistics<?>> statisticsList) {
           long nullsCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Long statNullsCount = statistics.get(this);
@@ -47,7 +48,7 @@
         }
 
         @Override
-        public Long getFrom(ColumnStatistics metadata) {
+        public Long getFrom(ColumnStatistics<?> metadata) {
           Long rowCount = super.getFrom(metadata);
           return rowCount != null ? rowCount : Statistic.NO_COLUMN_STATS;
         }
@@ -60,11 +61,12 @@
       new ColumnStatisticsKind<Object>(ExactStatisticsConstants.MIN_VALUE, true) {
         @Override
         @SuppressWarnings("unchecked")
-        public Object mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+        public Object mergeStatistics(List<? extends ColumnStatistics<?>> statisticsList) {
           Object minValue = null;
-          for (ColumnStatistics statistics : statisticsList) {
+          for (ColumnStatistics<?> statistics : statisticsList) {
             Object statMinValue = getValueStatistic(statistics);
-            if (statMinValue != null && (statistics.getValueComparator().compare(minValue, statMinValue) > 0 || minValue == null)) {
+            Comparator<Object> comp = (Comparator<Object>) statistics.getValueComparator();
+            if (statMinValue != null && (comp.compare(minValue, statMinValue) > 0 || minValue == null)) {
               minValue = statMinValue;
             }
           }
@@ -79,11 +81,12 @@
       new ColumnStatisticsKind<Object>(ExactStatisticsConstants.MAX_VALUE, true) {
         @Override
         @SuppressWarnings("unchecked")
-        public Object mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+        public Object mergeStatistics(List<? extends ColumnStatistics<?>> statisticsList) {
           Object maxValue = null;
-          for (ColumnStatistics statistics : statisticsList) {
+          for (ColumnStatistics<?> statistics : statisticsList) {
             Object statMaxValue = getValueStatistic(statistics);
-            if (statMaxValue != null && statistics.getValueComparator().compare(maxValue, statMaxValue) < 0) {
+            Comparator<Object> comp = (Comparator<Object>) statistics.getValueComparator();
+            if (statMaxValue != null && comp.compare(maxValue, statMaxValue) < 0) {
               maxValue = statMaxValue;
             }
           }
@@ -97,7 +100,7 @@
   public static final ColumnStatisticsKind<Long> NON_NULL_VALUES_COUNT =
       new ColumnStatisticsKind<Long>(ExactStatisticsConstants.NON_NULL_VALUES_COUNT, true) {
         @Override
-        public Long mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+        public Long mergeStatistics(List<? extends ColumnStatistics<?>> statisticsList) {
           long nonNullRowCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Long nnRowCount = statistics.get(this);
@@ -115,7 +118,7 @@
   public static final ColumnStatisticsKind<Double> NON_NULL_COUNT =
       new ColumnStatisticsKind<Double>(Statistic.NNROWCOUNT, false) {
         @Override
-        public Double mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+        public Double mergeStatistics(List<? extends ColumnStatistics<?>> statisticsList) {
           double nonNullRowCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Double nnRowCount = statistics.get(this);
@@ -133,7 +136,7 @@
   public static final ColumnStatisticsKind<Double> ROWCOUNT =
       new ColumnStatisticsKind<Double>(Statistic.ROWCOUNT, false) {
         @Override
-        public Double mergeStatistics(List<? extends ColumnStatistics> statisticsList) {
+        public Double mergeStatistics(List<? extends ColumnStatistics<?>> statisticsList) {
           double rowCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Double count = getFrom(statistics);
@@ -154,8 +157,8 @@
   /**
    * Column statistics kind which is the width of the specific column.
    */
-  public static final ColumnStatisticsKind AVG_WIDTH =
-      new ColumnStatisticsKind(Statistic.AVG_WIDTH, false);
+  public static final ColumnStatisticsKind<?> AVG_WIDTH =
+      new ColumnStatisticsKind<>(Statistic.AVG_WIDTH, false);
 
   /**
    * Column statistics kind which is the histogram of the specific column.
@@ -184,7 +187,7 @@
   }
 
   @Override
-  public T mergeStatistics(List<? extends ColumnStatistics> statistics) {
+  public T mergeStatistics(List<? extends ColumnStatistics<?>> statistics) {
     throw new UnsupportedOperationException("Cannot merge statistics for " + statisticKey);
   }
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
index 94e8b10..ffbe3ac 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
@@ -42,19 +42,19 @@
   private static final ObjectReader OBJECT_READER = new ObjectMapper().readerFor(StatisticsHolder.class);
 
   private final T statisticsValue;
-  private final BaseStatisticsKind statisticsKind;
+  private final BaseStatisticsKind<?> statisticsKind;
 
   @JsonCreator
   public StatisticsHolder(@JsonProperty("statisticsValue") T statisticsValue,
-                          @JsonProperty("statisticsKind") BaseStatisticsKind statisticsKind) {
+                          @JsonProperty("statisticsKind") BaseStatisticsKind<?> statisticsKind) {
     this.statisticsValue = statisticsValue;
     this.statisticsKind = statisticsKind;
   }
 
   public StatisticsHolder(T statisticsValue,
-                          StatisticsKind statisticsKind) {
+                          StatisticsKind<?> statisticsKind) {
     this.statisticsValue = statisticsValue;
-    this.statisticsKind = (BaseStatisticsKind) statisticsKind;
+    this.statisticsKind = (BaseStatisticsKind<?>) statisticsKind;
   }
 
   @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
@@ -63,7 +63,7 @@
     return statisticsValue;
   }
 
-  public StatisticsKind getStatisticsKind() {
+  public StatisticsKind<?> getStatisticsKind() {
     return statisticsKind;
   }
 
@@ -101,7 +101,7 @@
         .toString();
   }
 
-  public static StatisticsHolder of(String serialized) {
+  public static StatisticsHolder<?> of(String serialized) {
     try {
       return OBJECT_READER.readValue(serialized);
     } catch (IOException e) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
index 1aac570..2443663 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
@@ -46,21 +46,32 @@
    * @param type type of the column
    * @return {@link Comparator} instance
    */
-  public static Comparator getComparator(TypeProtos.MinorType type) {
+  @SuppressWarnings("unchecked")
+  public static <T> Comparator<T> getComparator(TypeProtos.MinorType type) {
     switch (type) {
       case INTERVALDAY:
       case INTERVAL:
       case INTERVALYEAR:
-        return Comparator.nullsFirst(UnsignedBytes.lexicographicalComparator());
+        // This odd cast is needed because this method is poorly designed.
+        // The method is statically typed to type T. But, the type
+        // is selected dynamically at runtime via the type parameter.
+        // As a result, we are casting a comparator to the WRONG type
+        // in some cases. We have to remove the byte[] type, then force
+        // the type to T. This works because we should only use this
+        // case if T is byte[]. But, this is a horrible hack and should
+        // be fixed.
+        return (Comparator<T>) (Comparator<?>)
+            Comparator.nullsFirst(UnsignedBytes.lexicographicalComparator());
       case UINT1:
-        return Comparator.nullsFirst(UnsignedBytes::compare);
+        return (Comparator<T>)
+            Comparator.nullsFirst(UnsignedBytes::compare);
       case UINT2:
       case UINT4:
-        return Comparator.nullsFirst(Integer::compareUnsigned);
+        return (Comparator<T>) Comparator.nullsFirst(Integer::compareUnsigned);
       case UINT8:
-        return Comparator.nullsFirst(Long::compareUnsigned);
+        return (Comparator<T>) Comparator.nullsFirst(Long::compareUnsigned);
       default:
-        return getNaturalNullsFirstComparator();
+        return (Comparator<T>) getNaturalNullsFirstComparator();
     }
   }
 
@@ -83,14 +94,15 @@
    * @param statisticsToCollect kinds of statistics that should be collected
    * @return list of merged metadata
    */
-  public static <T extends BaseMetadata> Map<SchemaPath, ColumnStatistics> mergeColumnsStatistics(
-      Collection<T> metadataList, Set<SchemaPath> columns, List<CollectableColumnStatisticsKind> statisticsToCollect) {
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static <T extends BaseMetadata> Map<SchemaPath, ColumnStatistics<?>> mergeColumnsStatistics(
+      Collection<T> metadataList, Set<SchemaPath> columns, List<CollectableColumnStatisticsKind<?>> statisticsToCollect) {
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
 
     for (SchemaPath column : columns) {
-      List<ColumnStatistics> statisticsList = new ArrayList<>();
+      List<ColumnStatistics<?>> statisticsList = new ArrayList<>();
       for (T metadata : metadataList) {
-        ColumnStatistics statistics = metadata.getColumnsStatistics().get(column);
+        ColumnStatistics<?> statistics = metadata.getColumnsStatistics().get(column);
         if (statistics == null) {
           // schema change happened, set statistics which represents all nulls
           statistics = new ColumnStatistics(
@@ -99,12 +111,12 @@
         }
         statisticsList.add(statistics);
       }
-      List<StatisticsHolder> statisticsHolders = new ArrayList<>();
-      for (CollectableColumnStatisticsKind statisticsKind : statisticsToCollect) {
+      List<StatisticsHolder<?>> statisticsHolders = new ArrayList<>();
+      for (CollectableColumnStatisticsKind<?> statisticsKind : statisticsToCollect) {
         Object mergedStatistic = statisticsKind.mergeStatistics(statisticsList);
         statisticsHolders.add(new StatisticsHolder<>(mergedStatistic, statisticsKind));
       }
-      Iterator<ColumnStatistics> iterator = statisticsList.iterator();
+      Iterator<ColumnStatistics<?>> iterator = statisticsList.iterator();
       // Use INT if statistics wasn't provided
       TypeProtos.MinorType comparatorType = iterator.hasNext() ? iterator.next().getComparatorType() : TypeProtos.MinorType.INT;
       columnsStatistics.put(column, new ColumnStatistics<>(statisticsHolders, comparatorType));
@@ -120,13 +132,13 @@
    * @return new {@link TableMetadata} instance with updated statistics
    */
   public static TableMetadata updateRowCount(TableMetadata tableMetadata, Collection<? extends BaseMetadata> statistics) {
-    List<StatisticsHolder> newStats = new ArrayList<>();
+    List<StatisticsHolder<?>> newStats = new ArrayList<>();
 
     newStats.add(new StatisticsHolder<>(TableStatisticsKind.ROW_COUNT.mergeStatistics(statistics), TableStatisticsKind.ROW_COUNT));
 
     Set<SchemaPath> columns = tableMetadata.getColumnsStatistics().keySet();
 
-    Map<SchemaPath, ColumnStatistics> columnsStatistics =
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics =
         mergeColumnsStatistics(statistics, columns,
             Collections.singletonList(ColumnStatisticsKind.NULLS_COUNT));
 
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
index c31c254..7a6212c 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
@@ -387,9 +387,9 @@
 
     private final TableInfo fullTableInfo;
     private final TableInfo basicTableInfo;
-    private final Map<SchemaPath, ColumnStatistics> columnsStatistics;
+    private final Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
     private final Map<String, String> unitColumnsStatistics;
-    private final Collection<StatisticsHolder> metadataStatistics;
+    private final Collection<StatisticsHolder<?>> metadataStatistics;
     private final List<String> unitMetadataStatistics;
     private final TupleMetadata schema;
     private final String unitSchema;
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
index 520e59e..bc6fad2 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
@@ -70,7 +70,7 @@
 
   @Test
   public void testColumnStatisticsSerialization() {
-    List<StatisticsHolder> statistics = Arrays.asList(
+    List<StatisticsHolder<?>> statistics = Arrays.asList(
         new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE),
         new StatisticsHolder<>(3, ColumnStatisticsKind.NULLS_COUNT),
@@ -95,7 +95,7 @@
 
   @Test
   public void testColumnStatisticsDeserialization() {
-    List<StatisticsHolder> statistics = Arrays.asList(
+    List<StatisticsHolder<?>> statistics = Arrays.asList(
         new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE),
         new StatisticsHolder<>(3, ColumnStatisticsKind.NULLS_COUNT),
@@ -103,13 +103,13 @@
     ColumnStatistics<String> columnStatistics = new ColumnStatistics<>(statistics, TypeProtos.MinorType.VARCHAR);
     String serializedColumnStatistics = columnStatistics.jsonString();
 
-    ColumnStatistics deserialized = ColumnStatistics.of(serializedColumnStatistics);
+    ColumnStatistics<?> deserialized = ColumnStatistics.of(serializedColumnStatistics);
 
     assertEquals("Type was incorrectly deserialized",
         columnStatistics.getComparatorType(),
         deserialized.getComparatorType());
 
-    for (StatisticsHolder statistic : statistics) {
+    for (StatisticsHolder<?> statistic : statistics) {
       assertEquals("Statistics kind was incorrectly deserialized",
           statistic.getStatisticsKind().isExact(),
           deserialized.containsExact(statistic.getStatisticsKind()));
@@ -120,7 +120,7 @@
   }
 
   private <T> void checkStatisticsHolderSerialization(T statisticsValue,
-      BaseStatisticsKind statisticsKind, String expectedString) {
+      BaseStatisticsKind<?> statisticsKind, String expectedString) {
     StatisticsHolder<T> statisticsHolder =
         new StatisticsHolder<>(statisticsValue, statisticsKind);
     String serializedStatisticsHolder = statisticsHolder.jsonString();
@@ -131,10 +131,10 @@
   }
 
   private <T> void checkStatisticsHolderDeserialization(T statisticsValue,
-      BaseStatisticsKind statisticsKind) {
+      BaseStatisticsKind<?> statisticsKind) {
     StatisticsHolder<T> rowCount =
         new StatisticsHolder<>(statisticsValue, statisticsKind);
-    StatisticsHolder deserializedRowCount = StatisticsHolder.of(rowCount.jsonString());
+    StatisticsHolder<?> deserializedRowCount = StatisticsHolder.of(rowCount.jsonString());
 
     assertTrue("Statistics value was incorrectly deserialized",
         Objects.deepEquals(rowCount.getStatisticsValue(), deserializedRowCount.getStatisticsValue()));
@@ -142,7 +142,7 @@
     assertStatisticsKindsEquals(rowCount, deserializedRowCount);
   }
 
-  private <T> void assertStatisticsKindsEquals(StatisticsHolder<T> expected, StatisticsHolder actual) {
+  private <T> void assertStatisticsKindsEquals(StatisticsHolder<T> expected, StatisticsHolder<?> actual) {
     assertEquals("isExact statistics kind was incorrectly deserialized",
         expected.getStatisticsKind().isExact(),
         actual.getStatisticsKind().isExact());