IMPALA-9302: disable ineffective filters for mt_dop > 0
The optimisation of disabling ineffective row-level
runtime filters was not implemented in the MT scan
code paths, because the ProcessSplit() functions,
where it was implemented, are not used for mt_dop > 0.
This change adds it to HdfsScanner::GetNext(), which
is used for mt_dop > 0 but not mt_dop = 0.
Testing:
Run existing runtime row filters test with mt_dop.
This reproduced the issue before I fixed it.
Change-Id: I8a55a9d4ac9e0d93cb3675dd2d5da086cb7d941d
Reviewed-on: http://gerrit.cloudera.org:8080/15065
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 62c1cb7..3e19658 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -130,7 +130,12 @@
/// Only valid to call if the parent scan node is single-threaded.
Status GetNext(RowBatch* row_batch) WARN_UNUSED_RESULT {
DCHECK(!scan_node_->HasRowBatchQueue());
- return GetNextInternal(row_batch);
+ RETURN_IF_ERROR(GetNextInternal(row_batch));
+ ++getnext_batches_returned_;
+ if ((getnext_batches_returned_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
+ CheckFiltersEffectiveness();
+ }
+ return Status::OK();
}
/// Process an entire split, reading bytes from the context's streams. Context is
@@ -326,6 +331,10 @@
/// Close().
vector<LocalFilterStats> filter_stats_;
+ /// Counter for the number of batches returned from GetNext(). Only updated by the
+ /// GetNext() API (i.e. mt_dop > 0), not the ProcessSplit() API.
+ int64_t getnext_batches_returned_ = 0;
+
/// Size of the file footer for ORC and Parquet. This is a guess. If this value is too
/// little, we will need to issue another read.
static const int64_t FOOTER_SIZE = 1024 * 100;
diff --git a/tests/custom_cluster/test_mt_dop.py b/tests/custom_cluster/test_mt_dop.py
index d4f664e..3bb4eef 100644
--- a/tests/custom_cluster/test_mt_dop.py
+++ b/tests/custom_cluster/test_mt_dop.py
@@ -49,6 +49,9 @@
vector.get_value('exec_option')['mt_dop'] = 1
self.run_test_case('QueryTest/runtime_filters', vector,
test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+ self.run_test_case('QueryTest/runtime_row_filters', vector,
+ use_db="functional_parquet",
+ test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
# Allow test to override num_nodes.
del vector.get_value('exec_option')['num_nodes']