IMPALA-8498: Write column index for floating types when NaN is not present

IMPALA-7307 disabled column index writing for floating point columns
until PARQUET-1222 is resolved. However, the problematic values are
only the NaNs. Therefore we can write column index if NaNs are not
present in data.

Testing:
  * Added tests which should fail if a column index is
    present while having NaN values in the column.

Change-Id: Ic9d367500243c8ca142a16ebfeef6c841f013434
Reviewed-on: http://gerrit.cloudera.org:8080/14264
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 00ded5c..b6a4391 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -412,16 +412,11 @@
       plain_encoded_value_size_(
           ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
     DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
-    // IMPALA-7304: Don't write column index for floating-point columns until
-    // PARQUET-1222 is resolved.
-    if (std::is_floating_point<T>::value) valid_column_index_ = false;
   }
 
   virtual void Reset() {
     BaseColumnWriter::Reset();
-    // IMPALA-7304: Don't write column index for floating-point columns until
-    // PARQUET-1222 is resolved.
-    if (std::is_floating_point<T>::value) valid_column_index_ = false;
+    valid_column_index_ = true;
     // Default to dictionary encoding.  If the cardinality ends up being too high,
     // it will fall back to plain.
     current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
@@ -440,6 +435,7 @@
 
  protected:
   virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
+    T* val = CastValue(value);
     if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
       if (UNLIKELY(num_values_since_dict_size_check_ >=
                    DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
@@ -447,7 +443,7 @@
         if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false;
       }
       ++num_values_since_dict_size_check_;
-      *bytes_needed = dict_encoder_->Put(*CastValue(value));
+      *bytes_needed = dict_encoder_->Put(*val);
       // If the dictionary contains the maximum number of values, switch to plain
       // encoding for the next page. The current page is full and must be written out.
       if (UNLIKELY(*bytes_needed < 0)) {
@@ -456,23 +452,31 @@
       }
       parent_->file_size_estimate_ += *bytes_needed;
     } else if (current_encoding_ == parquet::Encoding::PLAIN) {
-      T* v = CastValue(value);
       *bytes_needed = plain_encoded_value_size_ < 0 ?
-          ParquetPlainEncoder::ByteSize<T>(*v) :
+          ParquetPlainEncoder::ByteSize<T>(*val) :
           plain_encoded_value_size_;
       if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
         return false;
       }
       uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
       int64_t written_len =
-          ParquetPlainEncoder::Encode(*v, plain_encoded_value_size_, dst_ptr);
+          ParquetPlainEncoder::Encode(*val, plain_encoded_value_size_, dst_ptr);
       DCHECK_EQ(*bytes_needed, written_len);
       current_page_->header.uncompressed_page_size += written_len;
     } else {
       // TODO: support other encodings here
       DCHECK(false);
     }
-    page_stats_->Update(*CastValue(value));
+    // IMPALA-8498: Write column index for floating types when NaN is not present
+    if (std::is_same<float, std::remove_cv_t<T>>::value &&
+        UNLIKELY(std::isnan(*static_cast<float*>(value)))) {
+      valid_column_index_ = false;
+    } else if (std::is_same<double, std::remove_cv_t<T>>::value &&
+        UNLIKELY(std::isnan(*static_cast<double*>(value)))) {
+      valid_column_index_ = false;
+    }
+
+    page_stats_->Update(*val);
     return true;
   }
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index eac7916..313c5b9 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -14,7 +14,7 @@
 '     partitions: 0/12 rows=unavailable'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
-'   tuple-ids=0 row-size=4B cardinality=5.92K'
+'   tuple-ids=0 row-size=4B cardinality=5.97K'
 ---- TYPES
 STRING
 ====
@@ -45,8 +45,11 @@
 ---- LABELS
 YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
 ---- RESULTS
-'2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','1',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
+'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
+'2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
+'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
+'2009','2',-1,290,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
 '2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
 '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
 '2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
@@ -54,9 +57,6 @@
 '2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
 '2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
 '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
-'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
-'2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
-'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
 'Total','',3650,3650,12,regex:.*B,'0B','','','',''
 ---- TYPES
 STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING
@@ -221,7 +221,7 @@
 '     partitions: 0/24 rows=unavailable'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
-'   tuple-ids=0 row-size=4B cardinality=17.76K'
+'   tuple-ids=0 row-size=4B cardinality=17.91K'
 '   in pipelines: 00(GETNEXT)'
 ---- TYPES
 STRING
diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
index 40c7eb4..cd81277 100644
--- a/tests/query_test/test_parquet_page_index.py
+++ b/tests/query_test/test_parquet_page_index.py
@@ -231,11 +231,6 @@
           index_size = len(column_info.offset_index.page_locations)
           assert index_size > 0
           self._validate_page_locations(column_info.offset_index.page_locations)
-          # IMPALA-7304: Impala doesn't write column index for floating-point columns
-          # until PARQUET-1222 is resolved.
-          if column_info.schema.type in [4, 5]:
-            assert column_info.column_index is None
-            continue
           self._validate_null_stats(index_size, column_info)
           self._validate_min_max_values(index_size, column_info)
           self._validate_boundary_order(column_info)
@@ -274,8 +269,8 @@
     qualified_source_table = "{0}.{1}".format(unique_database, source_table)
     self._validate_parquet_page_index(hdfs_path, tmpdir.join(qualified_source_table))
 
-  def _create_string_table_with_values(self, vector, unique_database, table_name,
-                                       values_sql):
+  def _create_table_with_values_of_type(self, col_type, vector, unique_database,
+                                       table_name, values_sql):
     """Creates a parquet table that has a single string column, then invokes an insert
     statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
     It returns the HDFS path for the table.
@@ -283,8 +278,8 @@
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
     self.execute_query("drop table if exists {0}".format(qualified_table_name))
     vector.get_value('exec_option')['num_nodes'] = 1
-    query = ("create table {0} (str string) stored as parquet").format(
-        qualified_table_name)
+    query = ("create table {0} (x {1}) stored as parquet").format(
+        qualified_table_name, col_type)
     self.execute_query(query, vector.get_value('exec_option'))
     self.execute_query("insert into {0} values {1}".format(qualified_table_name,
         values_sql), vector.get_value('exec_option'))
@@ -345,23 +340,26 @@
   def test_max_string_values(self, vector, unique_database, tmpdir):
     """Test string values that are all 0xFFs or end with 0xFFs."""
 
+    col_type = "STRING"
     # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
     short_tbl = "short_tbl"
-    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
+    short_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+        unique_database, short_tbl,
+        "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
     self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
 
     # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
     fit_tbl = "fit_tbl"
-    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
+    fit_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+        unique_database, fit_tbl,
+        "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
     self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
 
     # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
     # should not write page statistics.
     too_long_tbl = "too_long_tbl"
-    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        too_long_tbl, "(rpad('', {0}, chr(255)))".format(
+    too_long_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+        unique_database, too_long_tbl, "(rpad('', {0}, chr(255)))".format(
             PAGE_INDEX_MAX_STRING_LENGTH + 1))
     row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
         tmpdir.join(too_long_tbl))
@@ -373,8 +371,9 @@
     # Test string with value that starts with 'aaa' following with 0xFFs and its length is
     # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
     aaa_tbl = "aaa_tbl"
-    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    aaa_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+        unique_database, aaa_tbl,
+        "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
     row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
         tmpdir.join(aaa_tbl))
     column = row_group_indexes[0][0]
@@ -409,8 +408,8 @@
     """
     vector.get_value('exec_option')['parquet_page_row_count_limit'] = 2
     null_tbl = 'null_table'
-    null_tbl_path = self._create_string_table_with_values(vector, unique_database,
-        null_tbl, "(NULL), (NULL), ('foo'), (NULL), (NULL), (NULL)")
+    null_tbl_path = self._create_table_with_values_of_type("STRING", vector,
+        unique_database, null_tbl, "(NULL), (NULL), ('foo'), (NULL), (NULL), (NULL)")
     row_group_indexes = self._get_row_groups_from_hdfs_folder(null_tbl_path,
         tmpdir.join(null_tbl))
     column = row_group_indexes[0][0]
@@ -435,3 +434,59 @@
       for column in row_group:
         assert column.offset_index is None
         assert column.column_index is None
+
+  def test_nan_values_for_floating_types(self, vector, unique_database, tmpdir):
+    """ IMPALA-7304: Impala doesn't write column index for floating-point columns
+    until PARQUET-1222 is resolved. This is modified by:
+    IMPALA-8498: Write column index for floating types when NaN is not present.
+    """
+    for col_type in ["float", "double"]:
+      nan_val = "(CAST('NaN' as " + col_type + "))"
+      # Table contains no NaN values.
+      no_nan_tbl = "no_nan_tbl_" + col_type
+      no_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+          unique_database, no_nan_tbl, "(1.5), (2.3), (4.5), (42.42), (3.1415), (0.0)")
+      self._validate_parquet_page_index(no_nan_hdfs_path, tmpdir.join(no_nan_tbl))
+      row_group_indexes = self._get_row_groups_from_hdfs_folder(no_nan_hdfs_path,
+          tmpdir.join(no_nan_tbl))
+      column = row_group_indexes[0][0]
+      assert column.column_index is not None
+
+      # Table contains NaN as first value.
+      first_nan_tbl = "first_nan_tbl_" + col_type
+      first_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+          unique_database, first_nan_tbl,
+          nan_val + ", (2.3), (4.5), (42.42), (3.1415), (0.0)")
+      row_group_indexes = self._get_row_groups_from_hdfs_folder(first_nan_hdfs_path,
+          tmpdir.join(first_nan_tbl))
+      column = row_group_indexes[0][0]
+      assert column.column_index is None
+
+      # Table contains NaN as last value.
+      last_nan_tbl = "last_nan_tbl_" + col_type
+      last_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+          unique_database, last_nan_tbl,
+          "(1.5), (2.3), (42.42), (3.1415), (0.0), " + nan_val)
+      row_group_indexes = self._get_row_groups_from_hdfs_folder(last_nan_hdfs_path,
+          tmpdir.join(last_nan_tbl))
+      column = row_group_indexes[0][0]
+      assert column.column_index is None
+
+      # Table contains NaN value in the middle.
+      mid_nan_tbl = "mid_nan_tbl_" + col_type
+      mid_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+          unique_database, mid_nan_tbl,
+          "(2.3), (4.5), " + nan_val + ", (42.42), (3.1415), (0.0)")
+      row_group_indexes = self._get_row_groups_from_hdfs_folder(mid_nan_hdfs_path,
+          tmpdir.join(mid_nan_tbl))
+      column = row_group_indexes[0][0]
+      assert column.column_index is None
+
+      # Table contains only NaN values.
+      only_nan_tbl = "only_nan_tbl_" + col_type
+      only_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+          unique_database, only_nan_tbl, (nan_val + ", ") * 3 + nan_val)
+      row_group_indexes = self._get_row_groups_from_hdfs_folder(only_nan_hdfs_path,
+          tmpdir.join(only_nan_tbl))
+      column = row_group_indexes[0][0]
+      assert column.column_index is None