DRILL-7968: ANALYZE TABLE ... REFRESH METADATA fails with FLOAT4 column. (#2267)

* DRILL-7968: ANALYZE TABLE ... REFRESH METADATA fails with FLOAT4 column.

This commit adds support for java.lang.Float to PojoWriters.java and a
test for the problem described in DRILL-7968 to TestMetastoreCommands.java.

* Address review comments and fix path to alltypes_optional.parquet in test.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java
index cd7d4bb..d71899c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java
@@ -28,10 +28,12 @@
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float4Vector;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
@@ -59,6 +61,8 @@
       return new NBigIntWriter(fieldName);
     } else if (type == Boolean.class) {
       return new NBooleanWriter(fieldName);
+    } else if (type == Float.class) {
+      return new NFloatWriter(fieldName);
     } else if (type == Double.class) {
       return new NDoubleWriter(fieldName);
     } else if (type.isEnum()) {
@@ -72,6 +76,8 @@
       // primitives
     } else if (type == int.class) {
       return new IntWriter(fieldName);
+    } else if (type == float.class) {
+      return new FloatWriter(fieldName);
     } else if (type == double.class) {
       return new DoubleWriter(fieldName);
     } else if (type == boolean.class) {
@@ -131,6 +137,21 @@
   }
 
   /**
+   * Pojo writer for float. Does not expect to write null value.
+   */
+  public static class FloatWriter extends AbstractPojoWriter<Float4Vector> {
+
+    public FloatWriter(String fieldName) {
+      super(fieldName, Types.required(MinorType.FLOAT4));
+    }
+
+    @Override
+    public void writeField(Object value, int outboundIndex) {
+      vector.getMutator().setSafe(outboundIndex, (float) value);
+    }
+
+  }
+  /**
    * Pojo writer for double. Does not expect to write null value.
    */
   public static class DoubleWriter extends AbstractPojoWriter<Float8Vector> {
@@ -282,6 +303,24 @@
   }
 
   /**
+   * Pojo writer for Float. If null is encountered does not write it.
+   */
+  public static class NFloatWriter extends AbstractPojoWriter<NullableFloat4Vector> {
+
+    public NFloatWriter(String fieldName) {
+      super(fieldName, Types.optional(MinorType.FLOAT4));
+    }
+
+    @Override
+    public void writeField(Object value, int outboundIndex) {
+      if (value != null) {
+        vector.getMutator().setSafe(outboundIndex, (Float) value);
+      }
+    }
+
+  }
+
+  /**
    * Pojo writer for Double. If null is encountered does not write it.
    */
   public static class NDoubleWriter extends AbstractPojoWriter<NullableFloat8Vector> {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 3ca7aa6..75f9086 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -3531,6 +3531,39 @@
     run("analyze table dfs.%s.%s refresh metadata", workspaceName, tableName);
   }
 
+  @Test
+  public void testAnalyzeAllTypes7kRows() throws Exception {
+    // DRILL-7968.
+
+    // enable CROSS JOIN
+    client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
+    String tableName = "alltypes_7k";
+    // create a ~7k row table with the schema of alltypes_optional.parquet
+    run("create table dfs.tmp.%s as select a.* from cp.`parquet/alltypes_optional.parquet` a cross join cp.`employee.json` e", tableName);
+
+    try {
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
+
+      String query = "select * from dfs.tmp.`%s`";
+
+      queryBuilder()
+          .sql(query, tableName)
+          .planMatcher()
+          .include("usedMetastore=true")
+          .match();
+
+    } finally {
+      run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
+      run("drop table if exists dfs.tmp.`%s`", tableName);
+      client.resetSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName());
+    }
+  }
+
   public static <T> ColumnStatistics<T> getColumnStatistics(T minValue, T maxValue,
       long rowCount, TypeProtos.MinorType minorType) {
     return new ColumnStatistics<>(