IMPALA-7351: Add estimates to kudu table sink
The kudu table sink allocates untracked memory which is bounded by
limits that impala enforces through the kudu client API. This patch
adds a constant estimate to this table sink which is based on those
limits.
Testing:
Modified planner tests accordingly.
Change-Id: I89a45dce0cfbbe3cc0bc17d55ffdbd41cd7dbfbd
Reviewed-on: http://gerrit.cloudera.org:8080/12077
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 9f4f2d0..f78b317 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -68,6 +68,8 @@
DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
DECLARE_int64(exchg_node_buffer_size_bytes);
+DECLARE_int32(kudu_mutation_buffer_size);
+DECLARE_int32(kudu_error_buffer_size);
namespace impala {
@@ -135,6 +137,8 @@
FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
cfg.__set_exchg_node_buffer_size_bytes(
FLAGS_exchg_node_buffer_size_bytes);
+ cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
+ cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index dff4b8c..a56b260 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -109,4 +109,8 @@
41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
42: required i64 exchg_node_buffer_size_bytes
+
+ 43: required i32 kudu_mutation_buffer_size
+
+ 44: required i32 kudu_error_buffer_size
}
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index bb57b1f..ba6f93a 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -23,6 +23,7 @@
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.catalog.FeTable;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
@@ -64,8 +65,16 @@
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
- // TODO: add a memory estimate
- resourceProfile_ = ResourceProfile.noReservation(0);
+ // The major chunk of memory used by this node is untracked. Part of which
+ // is allocated by the KuduSession on the write path and the rest is the
+ // memory used to store kudu client error messages. Fortunately, both of
+ // them have an upper limit which is used directly to set the estimates here.
+ long kuduMutationBufferSize = BackendConfig.INSTANCE.getBackendCfg().
+ kudu_mutation_buffer_size;
+ long kuduErrorBufferSize = BackendConfig.INSTANCE.getBackendCfg().
+ kudu_error_buffer_size;
+ resourceProfile_ = ResourceProfile.noReservation(kuduMutationBufferSize +
+ kuduErrorBufferSize);
}
@Override
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index f86dd5a..7f4a12f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2422,7 +2422,7 @@
---- PLAN
Max Per-Host Resource Reservation: Memory=51.00MB Threads=3
Per-Host Resource Estimates: Memory=446MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
-- +shuffle
tpch.orders ON l_orderkey = o_orderkey
@@ -2462,7 +2462,7 @@
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=52.00MB Threads=6
Per-Host Resource Estimates: Memory=300MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
-- +shuffle
tpch.orders ON l_orderkey = o_orderkey
@@ -2523,7 +2523,7 @@
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=104.00MB Threads=7
Per-Host Resource Estimates: Memory=481MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
-- +shuffle
tpch.orders ON l_orderkey = o_orderkey
@@ -4953,11 +4953,11 @@
---- PLAN
Max Per-Host Resource Reservation: Memory=99.00MB Threads=5
Per-Host Resource Estimates: Memory=180MB
-Analyzed query: SELECT
+Analyzed query: SELECT
-- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
-- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
-- +straight_join
t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5037,11 +5037,11 @@
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=100.50MB Threads=10
Per-Host Resource Estimates: Memory=260MB
-Analyzed query: SELECT
+Analyzed query: SELECT
-- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
-- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
-- +straight_join
t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5156,11 +5156,11 @@
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=176.50MB Threads=11
Per-Host Resource Estimates: Memory=454MB
-Analyzed query: SELECT
+Analyzed query: SELECT
-- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
-- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
-- +straight_join
t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5315,11 +5315,11 @@
---- PLAN
Max Per-Host Resource Reservation: Memory=176.00KB Threads=5
Per-Host Resource Estimates: Memory=138MB
-Analyzed query: SELECT
+Analyzed query: SELECT
-- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
-- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
-- +straight_join
t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
tpch_parquet.supplier t4) v2) v1
@@ -5386,11 +5386,11 @@
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=176.00KB Threads=9
Per-Host Resource Estimates: Memory=161MB
-Analyzed query: SELECT
+Analyzed query: SELECT
-- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
-- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
-- +straight_join
t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
tpch_parquet.supplier t4) v2) v1
@@ -5485,11 +5485,11 @@
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=352.00KB Threads=9
Per-Host Resource Estimates: Memory=311MB
-Analyzed query: SELECT
+Analyzed query: SELECT
-- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
-- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
-- +straight_join
t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
tpch_parquet.supplier t4) v2) v1
@@ -5830,3 +5830,52 @@
tuple-ids=0 row-size=231B cardinality=6001215
in pipelines: 00(GETNEXT)
====
+# kudu insert
+insert into functional_kudu.tinyinttable values(1);
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=1
+Per-Host Resource Estimates: Memory=20MB
+Codegen disabled by planner
+Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT)
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=20.00MB mem-reservation=0B thread-reservation=1
+INSERT INTO KUDU [functional_kudu.tinyinttable]
+| mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+|
+00:UNION
+ constant-operands=1
+ mem-estimate=0B mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=1B cardinality=1
+ in pipelines:
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=2.00MB Threads=2
+Per-Host Resource Estimates: Memory=22MB
+Codegen disabled by planner
+Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT)
+
+F01:PLAN FRAGMENT [KUDU(KuduPartition(1))] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=22.02MB mem-reservation=2.00MB thread-reservation=1
+INSERT INTO KUDU [functional_kudu.tinyinttable]
+| mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+|
+02:PARTIAL SORT
+| order by: KuduPartition(1) ASC NULLS LAST, 1 ASC NULLS LAST
+| materialized: KuduPartition(1)
+| mem-estimate=2.00MB mem-reservation=2.00MB spill-buffer=2.00MB thread-reservation=0
+| tuple-ids=1 row-size=5B cardinality=1
+| in pipelines:
+|
+01:EXCHANGE [KUDU(KuduPartition(1))]
+| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| tuple-ids=0 row-size=1B cardinality=1
+| in pipelines:
+|
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+00:UNION
+ constant-operands=1
+ mem-estimate=0B mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=1B cardinality=1
+ in pipelines:
+====