IMPALA-7351: Add estimates to kudu table sink

The kudu table sink allocates untracked memory which is bounded by
limits that impala enforces through the kudu client API. This patch
adds a constant estimate to this table sink which is based on those
limits.

Testing:
Modified planner tests accordingly.

Change-Id: I89a45dce0cfbbe3cc0bc17d55ffdbd41cd7dbfbd
Reviewed-on: http://gerrit.cloudera.org:8080/12077
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 9f4f2d0..f78b317 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -68,6 +68,8 @@
 DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
 DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
 DECLARE_int64(exchg_node_buffer_size_bytes);
+DECLARE_int32(kudu_mutation_buffer_size);
+DECLARE_int32(kudu_error_buffer_size);
 
 namespace impala {
 
@@ -135,6 +137,8 @@
       FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
   cfg.__set_exchg_node_buffer_size_bytes(
       FLAGS_exchg_node_buffer_size_bytes);
+  cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
+  cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index dff4b8c..a56b260 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -109,4 +109,8 @@
   41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
 
   42: required i64 exchg_node_buffer_size_bytes
+
+  43: required i32 kudu_mutation_buffer_size
+
+  44: required i32 kudu_error_buffer_size
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index bb57b1f..ba6f93a 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -23,6 +23,7 @@
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -64,8 +65,16 @@
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    // TODO: add a memory estimate
-    resourceProfile_ = ResourceProfile.noReservation(0);
+    // The major chunk of memory used by this node is untracked. Part of which
+    // is allocated by the KuduSession on the write path and the rest is the
+    // memory used to store kudu client error messages. Fortunately, both of
+    // them have an upper limit which is used directly to set the estimates here.
+    long kuduMutationBufferSize = BackendConfig.INSTANCE.getBackendCfg().
+        kudu_mutation_buffer_size;
+    long kuduErrorBufferSize = BackendConfig.INSTANCE.getBackendCfg().
+        kudu_error_buffer_size;
+    resourceProfile_ = ResourceProfile.noReservation(kuduMutationBufferSize +
+        kuduErrorBufferSize);
   }
 
   @Override
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index f86dd5a..7f4a12f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2422,7 +2422,7 @@
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=51.00MB Threads=3
 Per-Host Resource Estimates: Memory=446MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN 
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
 -- +shuffle
 tpch.orders ON l_orderkey = o_orderkey
 
@@ -2462,7 +2462,7 @@
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=52.00MB Threads=6
 Per-Host Resource Estimates: Memory=300MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN 
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
 -- +shuffle
 tpch.orders ON l_orderkey = o_orderkey
 
@@ -2523,7 +2523,7 @@
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=7
 Per-Host Resource Estimates: Memory=481MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN 
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
 -- +shuffle
 tpch.orders ON l_orderkey = o_orderkey
 
@@ -4953,11 +4953,11 @@
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=99.00MB Threads=5
 Per-Host Resource Estimates: Memory=180MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
 -- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT 
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
 -- +straight_join
 t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
 tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5037,11 +5037,11 @@
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=100.50MB Threads=10
 Per-Host Resource Estimates: Memory=260MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
 -- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT 
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
 -- +straight_join
 t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
 tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5156,11 +5156,11 @@
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=176.50MB Threads=11
 Per-Host Resource Estimates: Memory=454MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
 -- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT 
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
 -- +straight_join
 t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
 tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5315,11 +5315,11 @@
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=5
 Per-Host Resource Estimates: Memory=138MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
 -- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT 
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
 -- +straight_join
 t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
 tpch_parquet.supplier t4) v2) v1
@@ -5386,11 +5386,11 @@
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=9
 Per-Host Resource Estimates: Memory=161MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
 -- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT 
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
 -- +straight_join
 t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
 tpch_parquet.supplier t4) v2) v1
@@ -5485,11 +5485,11 @@
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=352.00KB Threads=9
 Per-Host Resource Estimates: Memory=311MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
 -- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT 
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
 -- +straight_join
 t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
 tpch_parquet.supplier t4) v2) v1
@@ -5830,3 +5830,52 @@
    tuple-ids=0 row-size=231B cardinality=6001215
    in pipelines: 00(GETNEXT)
 ====
+# kudu insert
+insert into functional_kudu.tinyinttable values(1);
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=1
+Per-Host Resource Estimates: Memory=20MB
+Codegen disabled by planner
+Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT)
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=20.00MB mem-reservation=0B thread-reservation=1
+INSERT INTO KUDU [functional_kudu.tinyinttable]
+|  mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+|
+00:UNION
+   constant-operands=1
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=1B cardinality=1
+   in pipelines:
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=2.00MB Threads=2
+Per-Host Resource Estimates: Memory=22MB
+Codegen disabled by planner
+Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT)
+
+F01:PLAN FRAGMENT [KUDU(KuduPartition(1))] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=22.02MB mem-reservation=2.00MB thread-reservation=1
+INSERT INTO KUDU [functional_kudu.tinyinttable]
+|  mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+|
+02:PARTIAL SORT
+|  order by: KuduPartition(1) ASC NULLS LAST, 1 ASC NULLS LAST
+|  materialized: KuduPartition(1)
+|  mem-estimate=2.00MB mem-reservation=2.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=5B cardinality=1
+|  in pipelines:
+|
+01:EXCHANGE [KUDU(KuduPartition(1))]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=1B cardinality=1
+|  in pipelines:
+|
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+00:UNION
+   constant-operands=1
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=1B cardinality=1
+   in pipelines:
+====