IMPALA-8709: Add Damerau-Levenshtein edit distance built-in function

This patch adds new built-in functions to calculate restricted
Damerau-Levenshtein edit distance (optimal string alignment).
Implmented as dle_dst() and damerau_levenshtein(). If either value is
NULL or both values are NULL returns NULL which differs from Netezza's
dle_dst() which returns the length of the not NULL value or 0 if both
values are NULL. The NULL behavior matches the existing levenshtein()
function.

Also cleans up levenshtein tests.

Testing:
- Added unit tests to expr-test.cc
- Manual testing on over 1400 string pairs from
  http://marvin.cs.uidaho.edu/misspell.html and results match Netezza

Change-Id: Ib759817ec15e7075bf49d51e494e45c8af4db94d
Reviewed-on: http://gerrit.cloudera.org:8080/13794
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index e932a04..0b80f10 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -3992,37 +3992,51 @@
 }
 
 TEST_P(ExprTest, StringFunctions) {
-  TestValue("levenshtein('levenshtein', 'frankenstein')", TYPE_INT, 6);
-  TestValue("levenshtein('example', 'samples')", TYPE_INT, 3);
-  TestValue("levenshtein('sturgeon', 'urgently')", TYPE_INT, 6);
-  TestValue("levenshtein('distance', 'difference')", TYPE_INT, 5);
-  TestValue("levenshtein('kitten', 'sitting')", TYPE_INT, 3);
-  TestValue("levenshtein('levenshtein', 'levenshtein')", TYPE_INT, 0);
-  TestValue("levenshtein('', 'levenshtein')", TYPE_INT, 11);
-  TestValue("levenshtein('levenshtein', '')", TYPE_INT, 11);
-  TestIsNull("levenshtein('foo', NULL)", TYPE_INT);
-  TestIsNull("levenshtein(NULL, 'foo')", TYPE_INT);
-  TestIsNull("levenshtein(NULL, NULL)", TYPE_INT);
-  TestErrorString("levenshtein('z', repeat('x', 256))",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
-  TestErrorString("levenshtein(repeat('x', 256), 'z')",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
 
-  TestValue("le_dst('levenshtein', 'frankenstein')", TYPE_INT, 6);
-  TestValue("le_dst('example', 'samples')", TYPE_INT, 3);
-  TestValue("le_dst('sturgeon', 'urgently')", TYPE_INT, 6);
-  TestValue("le_dst('distance', 'difference')", TYPE_INT, 5);
-  TestValue("le_dst('kitten', 'sitting')", TYPE_INT, 3);
-  TestValue("le_dst('levenshtein', 'levenshtein')", TYPE_INT, 0);
-  TestValue("le_dst('', 'levenshtein')", TYPE_INT, 11);
-  TestValue("le_dst('levenshtein', '')", TYPE_INT, 11);
-  TestIsNull("le_dst('foo', NULL)", TYPE_INT);
-  TestIsNull("le_dst(NULL, 'foo')", TYPE_INT);
-  TestIsNull("le_dst(NULL, NULL)", TYPE_INT);
-  TestErrorString("le_dst('z', repeat('x', 256))",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
-  TestErrorString("le_dst(repeat('x', 256), 'z')",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
+  for (const string fn_name: { "levenshtein", "le_dst" }) {
+    TestValue(fn_name + "('levenshtein', 'frankenstein')", TYPE_INT, 6);
+    TestValue(fn_name + "('example', 'samples')", TYPE_INT, 3);
+    TestValue(fn_name + "('sturgeon', 'urgently')", TYPE_INT, 6);
+    TestValue(fn_name + "('distance', 'difference')", TYPE_INT, 5);
+    TestValue(fn_name + "('kitten', 'sitting')", TYPE_INT, 3);
+    TestValue(fn_name + "('levenshtein', 'levenshtein')", TYPE_INT, 0);
+    TestValue(fn_name + "('', 'levenshtein')", TYPE_INT, 11);
+    TestValue(fn_name + "('levenshtein', '')", TYPE_INT, 11);
+    TestIsNull(fn_name + "('foo', NULL)", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, 'foo')", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, NULL)", TYPE_INT);
+    TestErrorString(fn_name + "('z', repeat('x', 256))",
+        "levenshtein argument exceeds maximum length of 255 characters\n");
+    TestErrorString(fn_name + "(repeat('x', 256), 'z')",
+        "levenshtein argument exceeds maximum length of 255 characters\n");
+  }
+
+  for (const string fn_name: { "damerau_levenshtein", "dle_dst" }) {
+    TestValue(fn_name + "('', '')", TYPE_INT, 0);
+    TestValue(fn_name + "('abc', 'abc')", TYPE_INT, 0);
+    TestValue(fn_name + "('a', 'b')", TYPE_INT, 1);
+    TestValue(fn_name + "('a', '')", TYPE_INT, 1);
+    TestValue(fn_name + "('aabc', 'abc')", TYPE_INT, 1);
+    TestValue(fn_name + "('abcc', 'abc')", TYPE_INT, 1);
+    TestValue(fn_name + "('', 'a')", TYPE_INT, 1);
+    TestValue(fn_name + "('abc', 'abcc')", TYPE_INT, 1);
+    TestValue(fn_name + "('abc', 'aabc')", TYPE_INT, 1);
+    TestValue(fn_name + "('teh', 'the')", TYPE_INT, 1);
+    TestValue(fn_name + "('tets', 'test')", TYPE_INT, 1);
+    TestValue(fn_name + "('fuor', 'four')", TYPE_INT, 1);
+    TestValue(fn_name + "('kitten', 'sitting')", TYPE_INT, 3);
+    TestValue(fn_name + "('Saturday', 'Sunday')", TYPE_INT, 3);
+    TestValue(fn_name + "('rosettacode', 'raisethysword')", TYPE_INT, 8);
+    TestValue(fn_name + "('CA', 'ABC')", TYPE_INT, 3);
+    TestValue(fn_name + "(repeat('z', 255), repeat('x', 255))", TYPE_INT, 255);
+    TestIsNull(fn_name + "('foo', NULL)", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, 'foo')", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, NULL)", TYPE_INT);
+    TestErrorString(fn_name + "('z', repeat('x', 256))",
+        "damerau-levenshtein argument exceeds maximum length of 255 characters\n");
+    TestErrorString(fn_name + "(repeat('x', 256), 'z')",
+        "damerau-levenshtein argument exceeds maximum length of 255 characters\n");
+  }
 
   for (const string fn_name: { "jaro_dst", "jaro_distance" }) {
     TestIsNull(fn_name + "('foo', NULL)", TYPE_DOUBLE);
@@ -4039,6 +4053,7 @@
     TestValue(fn_name + "('frog', 'fog')", TYPE_DOUBLE, 0.08333333333333337);
     TestValue(fn_name + "('hello', 'haloa')", TYPE_DOUBLE, 0.2666666666666666);
     TestValue(fn_name + "('atcg', 'tagc')", TYPE_DOUBLE, 0.1666666666666667);
+    TestValue(fn_name + "(repeat('z', 255), repeat('x', 255))", TYPE_DOUBLE, 1.0);
     TestErrorString(fn_name + "('z', repeat('x', 256))",
         "jaro argument exceeds maximum length of 255 characters\n");
     TestErrorString(fn_name + "(repeat('x', 256), 'z')",
@@ -4060,6 +4075,7 @@
     TestValue(fn_name + "('frog', 'fog')", TYPE_DOUBLE, 0.9166666666666666);
     TestValue(fn_name + "('hello', 'haloa')", TYPE_DOUBLE, 0.73333333333333334);
     TestValue(fn_name + "('atcg', 'tagc')", TYPE_DOUBLE, 0.8333333333333333);
+    TestValue(fn_name + "(repeat('z', 255), repeat('x', 255))", TYPE_DOUBLE, 0.0);
     TestErrorString(fn_name + "('z', repeat('x', 256))",
         "jaro argument exceeds maximum length of 255 characters\n");
     TestErrorString(fn_name + "(repeat('x', 256), 'z')",
diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc
index 11336e3..2aae65c 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -1166,7 +1166,11 @@
 
   int column_start = 1;
 
-  auto column = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s1len + 1)));
+  int* column = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s1len + 1)));
+  if (UNLIKELY(column == nullptr)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return IntVal::null();
+  }
 
   std::iota(column + column_start - 1, column + s1len + 1, column_start - 1);
 
@@ -1213,8 +1217,19 @@
   // the window size to search for matches in the other string
   int max_range = std::max(0, std::max(s1len, s2len) / 2 - 1);
 
-  int s1_matching[s1len];
-  int s2_matching[s2len];
+  int* s1_matching = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s1len)));
+  if (UNLIKELY(s1_matching == nullptr)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return DoubleVal::null();
+  }
+
+  int* s2_matching = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s2len)));
+  if (UNLIKELY(s2_matching == nullptr)) {
+    ctx->Free(reinterpret_cast<uint8_t*>(s1_matching));
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return DoubleVal::null();
+  }
+
   std::fill_n(s1_matching, s1len, -1);
   std::fill_n(s2_matching, s2len, -1);
 
@@ -1236,7 +1251,11 @@
     }
   }
 
-  if (matching_characters == 0) return DoubleVal(0.0);
+  if (matching_characters == 0) {
+    ctx->Free(reinterpret_cast<uint8_t*>(s1_matching));
+    ctx->Free(reinterpret_cast<uint8_t*>(s2_matching));
+    return DoubleVal(0.0);
+  }
 
   // transpositions (one-way only)
   double transpositions = 0.0;
@@ -1247,9 +1266,7 @@
     while (s2_matching[s2i] == -1) {
       s2i++;
     }
-    if (s1.ptr[s1i] != s2.ptr[s2i]) {
-      transpositions += 0.5;
-    }
+    if (s1.ptr[s1i] != s2.ptr[s2i]) transpositions += 0.5;
     s1i++;
     s2i++;
   }
@@ -1258,6 +1275,9 @@
                                         + m / static_cast<double>(s2len)
                                         + (m - transpositions) / m );
 
+  ctx->Free(reinterpret_cast<uint8_t*>(s1_matching));
+  ctx->Free(reinterpret_cast<uint8_t*>(s2_matching));
+
   return DoubleVal(jaro_similarity);
 }
 
@@ -1359,4 +1379,78 @@
   }
   return DoubleVal(jaro_winkler_similarity);
 }
+
+IntVal StringFunctions::DamerauLevenshtein(
+    FunctionContext* ctx, const StringVal& s1, const StringVal& s2) {
+  // Based on https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
+  // Implements restricted Damerau-Levenshtein (optimal string alignment)
+
+  int s1len = s1.len;
+  int s2len = s2.len;
+
+  // error if either input exceeds 255 characters
+  if (s1len > 255 || s2len > 255) {
+    ctx->SetError("damerau-levenshtein argument exceeds maximum length of 255 "
+                  "characters");
+    return IntVal(-1);
+  }
+
+  // short cut cases:
+  // - null strings
+  // - zero length strings
+  // - identical length and value strings
+  if (s1.is_null || s2.is_null) return IntVal::null();
+  if (s1len == 0) return IntVal(s2len);
+  if (s2len == 0) return IntVal(s1len);
+  if (s1len == s2len && memcmp(s1.ptr, s2.ptr, s1len) == 0) return IntVal(0);
+
+  int i;
+  int j;
+  int l_cost;
+  int ptr_array_length = sizeof(int*) * (s1len + 1);
+  int int_array_length = sizeof(int) * (s2len + 1) * (s1len + 1);
+
+  // Allocating a 2D array (with d being an array of pointers to the start of the rows)
+  int** d = reinterpret_cast<int**>(ctx->Allocate(ptr_array_length));
+  if (UNLIKELY(d == nullptr)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return IntVal::null();
+  }
+  int* rows = reinterpret_cast<int*>(ctx->Allocate(int_array_length));
+  if (UNLIKELY(rows == nullptr)) {
+    ctx->Free(reinterpret_cast<uint8_t*>(d));
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return IntVal::null();
+  }
+  // Setting the pointers in the pointer-array to the start of (s2len + 1) length
+  // intervals and initializing its values based on the mentioned algorithm.
+  for (i = 0; i <= s1len; ++i) {
+    d[i] = rows + (s2len + 1) * i;
+    d[i][0] = i;
+  }
+  std::iota(d[0], d[0] + s2len + 1, 0);
+
+  for (i = 1; i <= s1len; ++i) {
+    for (j = 1; j <= s2len; ++j) {
+      if (s1.ptr[i - 1] == s2.ptr[j - 1]) {
+        l_cost = 0;
+      } else {
+        l_cost = 1;
+      }
+      d[i][j] = std::min(d[i - 1][j - 1] + l_cost, // substitution
+                         std::min(d[i][j - 1] + 1, // insertion
+                                  d[i - 1][j] + 1) // deletion
+      );
+      if (i > 1 && j > 1 && s1.ptr[i - 1] == s2.ptr[j - 2]
+          && s1.ptr[i - 2] == s2.ptr[j - 1]) {
+        d[i][j] = std::min(d[i][j], d[i - 2][j - 2] + l_cost); // transposition
+      }
+    }
+  }
+  int result = d[s1len][s2len];
+
+  ctx->Free(reinterpret_cast<uint8_t*>(d));
+  ctx->Free(reinterpret_cast<uint8_t*>(rows));
+  return IntVal(result);
+}
 }
diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h
index 8386461..09a16b5 100644
--- a/be/src/exprs/string-functions.h
+++ b/be/src/exprs/string-functions.h
@@ -158,6 +158,9 @@
   static IntVal Levenshtein(
       FunctionContext* context, const StringVal& s1, const StringVal& s2);
 
+  static IntVal DamerauLevenshtein(
+      FunctionContext* context, const StringVal& s1, const StringVal& s2);
+
   static DoubleVal JaroDistance(
       FunctionContext* ctx, const StringVal& s1, const StringVal& s2);
 
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index d7d1ceb..f132743 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -586,6 +586,8 @@
    'impala::StringFunctions::GetJsonObject'],
   [['levenshtein', 'le_dst'], 'INT', ['STRING', 'STRING'],
    '_ZN6impala15StringFunctions11LevenshteinEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
+  [['damerau_levenshtein', 'dle_dst'], 'INT', ['STRING', 'STRING'],
+   '_ZN6impala15StringFunctions18DamerauLevenshteinEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['jaro_distance', 'jaro_dst'], 'DOUBLE', ['STRING', 'STRING'],
    '_ZN6impala15StringFunctions12JaroDistanceEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['jaro_similarity', 'jaro_sim'], 'DOUBLE', ['STRING', 'STRING'],