IMPALA-9302: disable ineffective filters for mt_dop > 0

The optimisation of disabling ineffective row-level
runtime filters was not implemented in the MT scan
code paths, because the ProcessSplit() functions,
where it was implemented, are not used for mt_dop > 0.

This change adds it to HdfsScanner::GetNext(), which
is used for mt_dop > 0 but not mt_dop = 0.

Testing:
Run existing runtime row filters test with mt_dop.
This reproduced the issue before I fixed it.

Change-Id: I8a55a9d4ac9e0d93cb3675dd2d5da086cb7d941d
Reviewed-on: http://gerrit.cloudera.org:8080/15065
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 62c1cb7..3e19658 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -130,7 +130,12 @@
   /// Only valid to call if the parent scan node is single-threaded.
   Status GetNext(RowBatch* row_batch) WARN_UNUSED_RESULT {
     DCHECK(!scan_node_->HasRowBatchQueue());
-    return GetNextInternal(row_batch);
+    RETURN_IF_ERROR(GetNextInternal(row_batch));
+    ++getnext_batches_returned_;
+    if ((getnext_batches_returned_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
+      CheckFiltersEffectiveness();
+    }
+    return Status::OK();
   }
 
   /// Process an entire split, reading bytes from the context's streams.  Context is
@@ -326,6 +331,10 @@
   /// Close().
   vector<LocalFilterStats> filter_stats_;
 
+  /// Counter for the number of batches returned from GetNext(). Only updated by the
+  /// GetNext() API (i.e. mt_dop > 0), not the ProcessSplit() API.
+  int64_t getnext_batches_returned_ = 0;
+
   /// Size of the file footer for ORC and Parquet. This is a guess. If this value is too
   /// little, we will need to issue another read.
   static const int64_t FOOTER_SIZE = 1024 * 100;
diff --git a/tests/custom_cluster/test_mt_dop.py b/tests/custom_cluster/test_mt_dop.py
index d4f664e..3bb4eef 100644
--- a/tests/custom_cluster/test_mt_dop.py
+++ b/tests/custom_cluster/test_mt_dop.py
@@ -49,6 +49,9 @@
     vector.get_value('exec_option')['mt_dop'] = 1
     self.run_test_case('QueryTest/runtime_filters', vector,
        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+    self.run_test_case('QueryTest/runtime_row_filters', vector,
+        use_db="functional_parquet",
+        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
 
     # Allow test to override num_nodes.
     del vector.get_value('exec_option')['num_nodes']