IMPALA-8891: Fix non-standard null handling in concat_ws()

This patch fixes the non-standard null handling logic for
function 'concat_ws', while maintaining the original null
handling for function 'concat'

Existing statuses:
For function concat_ws, any null string element in array
argument 'strs' will result in null result, just like below:
------------------------------------------------
select concat_ws('-','foo',null,'bar') as expr1;
+-------+
| expr1 |
+-------+
| NULL  |
+-------+

New Statuses:
In this implementation, the function conforms to hive standard:
1.will join all the non-null string objects as the result
2.if all string objects are null, return empty string
3.if separator is null, return null
below is a example:
-------------------------------------------------
select concat_ws('-','foo',null,'bar') as expr1;
+----------+
|  expr1   |
+----------+
| foo-bar  |
+----------+
------------------------------------------------

Key changes:
* Reimplement function StringFunctions::ConcatWs by filtering the
  null value and only process the valid string values, based on
  original code structure.
* StringFunctions::Concat was also reimplemented, as it used to
  call ConcatWs but should keep the original NULL handling.

Testing:
* Ran exaustive tests.

Change-Id: I64cd3bfbb952e431a0cf52a5835ac05d2513d29b
Reviewed-on: http://gerrit.cloudera.org:8080/14885
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 0b80f10..f76d620 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -4652,6 +4652,7 @@
   TestIsNull("concat(NULL)", TYPE_STRING);
   TestIsNull("concat('a', NULL, 'b')", TYPE_STRING);
   TestIsNull("concat('a', 'b', NULL)", TYPE_STRING);
+  TestStringValue("concat('', '', '')", "");
 
   TestStringValue("concat_ws(',', 'a')", "a");
   TestStringValue("concat_ws(',', 'a', 'b')", "a,b");
@@ -4661,8 +4662,17 @@
   TestStringValue("concat_ws('|','a', 'b', 'cde', '', 'fg', '')", "a|b|cde||fg|");
   TestStringValue("concat_ws('', '', '', '')", "");
   TestIsNull("concat_ws(NULL, NULL)", TYPE_STRING);
-  TestIsNull("concat_ws(',', NULL, 'b')", TYPE_STRING);
-  TestIsNull("concat_ws(',', 'b', NULL)", TYPE_STRING);
+  TestStringValue("concat_ws(',', NULL, 'b')", "b");
+  TestStringValue("concat_ws(',', 'b', NULL)", "b");
+  TestStringValue("concat_ws(',', NULL, 'b', 'a')", "b,a");
+  TestStringValue("concat_ws(',', 'b', NULL, 'a')", "b,a");
+  TestStringValue("concat_ws(',', 'b', 'a', NULL)", "b,a");
+  TestStringValue("concat_ws('', NULL, 'b', 'a')", "ba");
+  TestStringValue("concat_ws('', 'b', NULL, 'a')", "ba");
+  TestStringValue("concat_ws('', 'b', 'a', NULL)", "ba");
+  TestStringValue("concat_ws(',', NULL, NULL)", "");
+  TestStringValue("concat_ws(',', '', '')", ",");
+  TestStringValue("concat_ws(',', '')", "");
 
   TestValue("find_in_set('ab', 'ab,ab,ab,ade,cde')", TYPE_INT, 1);
   TestValue("find_in_set('ab', 'abc,xyz,abc,ade,ab')", TYPE_INT, 5);
diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc
index 2aae65c..5b777e3 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -854,36 +854,82 @@
   return IntVal(count);
 }
 
-StringVal StringFunctions::Concat(FunctionContext* context, int num_children,
-    const StringVal* strs) {
-  return ConcatWs(context, StringVal(), num_children, strs);
-}
-
-StringVal StringFunctions::ConcatWs(FunctionContext* context, const StringVal& sep,
-    int num_children, const StringVal* strs) {
+// NULL handling of function Concat and ConcatWs are different.
+// Function concat was reimplemented to keep the original
+// NULL handling.
+StringVal StringFunctions::Concat(
+    FunctionContext* context, int num_children, const StringVal* strs) {
   DCHECK_GE(num_children, 1);
-  DCHECK(strs != NULL);
-  if (sep.is_null) return StringVal::null();
-
-  // Pass through if there's only one argument
+  DCHECK(strs != nullptr);
+  // Pass through if there's only one argument.
   if (num_children == 1) return strs[0];
 
-  if (strs[0].is_null) return StringVal::null();
-  int32_t total_size = strs[0].len;
-
   // Loop once to compute the final size and reserve space.
-  for (int32_t i = 1; i < num_children; ++i) {
+  int32_t total_size = 0;
+  for (int32_t i = 0; i < num_children; ++i) {
     if (strs[i].is_null) return StringVal::null();
-    total_size += sep.len + strs[i].len;
+    total_size += strs[i].len;
   }
+  // If total_size is zero, directly returns empty string
+  if (total_size <= 0) return StringVal();
+
   StringVal result(context, total_size);
   if (UNLIKELY(result.is_null)) return StringVal::null();
 
   // Loop again to append the data.
   uint8_t* ptr = result.ptr;
-  Ubsan::MemCpy(ptr, strs[0].ptr, strs[0].len);
-  ptr += strs[0].len;
-  for (int32_t i = 1; i < num_children; ++i) {
+  for (int32_t i = 0; i < num_children; ++i) {
+    Ubsan::MemCpy(ptr, strs[i].ptr, strs[i].len);
+    ptr += strs[i].len;
+  }
+  return result;
+}
+
+StringVal StringFunctions::ConcatWs(FunctionContext* context, const StringVal& sep,
+    int num_children, const StringVal* strs) {
+  DCHECK_GE(num_children, 1);
+  DCHECK(strs != nullptr);
+  if (sep.is_null) return StringVal::null();
+
+  // Loop once to compute valid start index, final string size and valid string object
+  // count.
+  int32_t valid_num_children = 0;
+  int32_t valid_start_index = -1;
+  int32_t total_size = 0;
+  for (int32_t i = 0; i < num_children; ++i) {
+    if (strs[i].is_null) continue;
+
+    if (valid_start_index == -1) {
+      valid_start_index = i;
+      // Calculate the space required by first valid string object.
+      total_size += strs[i].len;
+    } else {
+      // Calculate the space required by subsequent valid string object.
+      total_size += sep.len + strs[i].len;
+    }
+    // Record the count of valid string object.
+    valid_num_children++;
+  }
+
+  // If all data are invalid, or data size is zero, return empty string.
+  if (valid_start_index < 0 || total_size <= 0) {
+    return StringVal();
+  }
+  DCHECK_GT(valid_num_children, 0);
+
+  // Pass through if there's only one argument.
+  if (valid_num_children == 1) return strs[valid_start_index];
+
+  // Reserve space needed by final result.
+  StringVal result(context, total_size);
+  if (UNLIKELY(result.is_null)) return StringVal::null();
+
+  // Loop to append the data.
+  uint8_t* ptr = result.ptr;
+  Ubsan::MemCpy(ptr, strs[valid_start_index].ptr, strs[valid_start_index].len);
+  ptr += strs[valid_start_index].len;
+  for (int32_t i = valid_start_index + 1; i < num_children; ++i) {
+    if (strs[i].is_null) continue;
     Ubsan::MemCpy(ptr, sep.ptr, sep.len);
     ptr += sep.len;
     Ubsan::MemCpy(ptr, strs[i].ptr, strs[i].len);