IMPALA-5799: Kudu DML can crash if schema has changed

We check that the number/types of columns in a Kudu DML match the
underlying table during analysis. However, it's possible that the
schema may be modified between analysis and execution, and if it's
modified in incompatible ways it can cause Impala to crash.

Once the KuduTable object has been opened by the KuduTableSink, its
schema will remain the same, so we can check in Open() that the schema
is what we're expecting.

If the schema changes between Open() and when the WriteOp is sent to Kudu,
Kudu will send back an error and we already handle this gracefully.

Testing:
- Added an e2e test that concurrently inserts into a Kudu table while
  dropping and then adding a column. It relies on timing, but running
  in a loop locally it caused Impala to crash every time without this
  change.

Change-Id: I9fd6bf164310df0041144f75f5ee722665e9f587
Reviewed-on: http://gerrit.cloudera.org:8080/7688
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Impala Public Jenkins
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 541d398..87eeb84 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -137,6 +137,25 @@
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
       "Unable to open Kudu table");
 
+  // Verify the KuduTable's schema is what we expect, in case it was modified since
+  // analysis. If the underlying schema is changed after this point but before the write
+  // completes, the KuduTable's schema will stay the same and we'll get an error back from
+  // the Kudu server.
+  for (int i = 0; i < output_expr_evals_.size(); ++i) {
+    int col_idx = kudu_table_sink_.referenced_columns.empty() ?
+        i : kudu_table_sink_.referenced_columns[i];
+    if (col_idx >= table_->schema().num_columns()) {
+      return Status(strings::Substitute(
+          "Table $0 has fewer columns than expected.", table_desc_->name()));
+    }
+    ColumnType type = KuduDataTypeToColumnType(table_->schema().Column(col_idx).type());
+    if (type != output_expr_evals_[i]->root().type()) {
+      return Status(strings::Substitute("Column $0 has unexpected type. ($1 vs. $2)",
+          table_->schema().Column(col_idx).name(), type.DebugString(),
+          output_expr_evals_[i]->root().type().DebugString()));
+    }
+  }
+
   session_ = client_->NewSession();
   session_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms);
 
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index 6688b7e..e5a822a 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -34,6 +34,7 @@
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
+using DataType = kudu::client::KuduColumnSchema::DataType;
 
 DECLARE_bool(disable_kudu);
 DECLARE_int32(kudu_operation_timeout_ms);
@@ -177,4 +178,21 @@
 
   return Status::OK();
 }
+
+ColumnType KuduDataTypeToColumnType(DataType type) {
+  switch (type) {
+    case DataType::INT8: return ColumnType(PrimitiveType::TYPE_TINYINT);
+    case DataType::INT16: return ColumnType(PrimitiveType::TYPE_SMALLINT);
+    case DataType::INT32: return ColumnType(PrimitiveType::TYPE_INT);
+    case DataType::INT64: return ColumnType(PrimitiveType::TYPE_BIGINT);
+    case DataType::STRING: return ColumnType(PrimitiveType::TYPE_STRING);
+    case DataType::BOOL: return ColumnType(PrimitiveType::TYPE_BOOLEAN);
+    case DataType::FLOAT: return ColumnType(PrimitiveType::TYPE_FLOAT);
+    case DataType::DOUBLE: return ColumnType(PrimitiveType::TYPE_DOUBLE);
+    case DataType::BINARY: return ColumnType(PrimitiveType::TYPE_BINARY);
+    case DataType::UNIXTIME_MICROS: return ColumnType(PrimitiveType::TYPE_TIMESTAMP);
+  }
+  return ColumnType(PrimitiveType::INVALID_TYPE);
+}
+
 }  // namespace impala
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 4d9b77b..28c6b27 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -78,5 +78,8 @@
 Status WriteKuduValue(int col, PrimitiveType type, const void* value,
     bool copy_strings, kudu::KuduPartialRow* row) WARN_UNUSED_RESULT;
 
+/// Takes a Kudu client DataType and returns the corresponding Impala ColumnType.
+ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType type);
+
 } /// namespace impala
 #endif
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 9715b4e..b951237 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -30,7 +30,10 @@
 from kudu.client import Partitioning
 import logging
 import pytest
+import random
 import textwrap
+import threading
+import time
 from datetime import datetime
 from pytz import utc
 
@@ -395,6 +398,48 @@
     cursor.execute("select count(*) from %s" % table_name)
     print cursor.fetchall() == [(i, )]
 
+  def test_concurrent_schema_change(self, cursor, unique_database):
+    """Tests that an insert into a Kudu table with a concurrent schema change either
+    succeeds or fails gracefully."""
+    table_name = "%s.test_schema_change" % unique_database
+    cursor.execute("""create table %s (col0 bigint primary key, col1 bigint)
+    partition by hash(col0) partitions 16 stored as kudu""" % table_name)
+
+    iters = 5
+    def insert_values():
+      threading.current_thread().errors = []
+      client = self.create_impala_client()
+      for i in range(0, iters):
+        time.sleep(random.random()) # sleeps for up to one second
+        try:
+          client.execute("insert into %s values (0, 0), (1, 1)" % table_name)
+        except Exception as e:
+          threading.current_thread().errors.append(e)
+
+    insert_thread = threading.Thread(target=insert_values)
+    insert_thread.start()
+
+    for i in range(0, iters):
+      time.sleep(random.random()) # sleeps for up to one second
+      cursor.execute("alter table %s drop column col1" % table_name)
+      if i % 2 == 0:
+        cursor.execute("alter table %s add columns (col1 string)" % table_name)
+      else:
+        cursor.execute("alter table %s add columns (col1 bigint)" % table_name)
+
+    insert_thread.join()
+
+    for error in insert_thread.errors:
+      msg = str(error)
+      # The first two are AnalysisExceptions, the next two come from KuduTableSink::Open()
+      # if the schema has changed since analysis, the last comes from the Kudu server if
+      # the schema changes between KuduTableSink::Open() and when the write ops are sent.
+      assert "has fewer columns (1) than the SELECT / VALUES clause returns (2)" in msg \
+        or "(type: TINYINT) is not compatible with column 'col1' (type: STRING)" in msg \
+        or "has fewer columns than expected." in msg \
+        or "Column col1 has unexpected type." in msg \
+        or "Client provided column col1[int64 NULLABLE] not present in tablet" in msg
+
 class TestCreateExternalTable(KuduTestSuite):
 
   def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database):