IMPALA-8984: Fix race condition in creating Kudu table

This patch fixes the race condition when using 'CREATE IF NOT EXISTS'
to create the same managed kudu table in parallel. Note that it won't
happend if Kudu-HMS integration is enable. The bug would cause the
table being deleted in Kudu but reserving in HMS.

The solution is adding check for HMS table existence before creating
it in HMS and after obtaining 'metastoreDdlLock_'. If the HMS table is
created by other concurrent threads, just return as
'Table already exists'. So we don't fail in creating the HMS table and
won't rollback the creation of kudu table.

Tests:
  * Add custom cluster test test_concurrent_kudu_create.py
  * Ran all front-end tests

Change-Id: I1a4047bcdaa6b346765b96e8c36bb747a2b0091d
Reviewed-on: http://gerrit.cloudera.org:8080/14319
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f177e65..14b67df 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2245,7 +2245,15 @@
       synchronized (metastoreDdlLock_) {
         if (createHMSTable) {
           try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-            msClient.getHiveClient().createTable(newTable);
+            boolean tableInMetastore =
+                msClient.getHiveClient().tableExists(newTable.getDbName(),
+                                                     newTable.getTableName());
+            if (!tableInMetastore) {
+              msClient.getHiveClient().createTable(newTable);
+            } else {
+              addSummary(response, "Table already exists.");
+              return false;
+            }
           }
         }
         // Add the table to the catalog cache
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index eef1e5f..0508334 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -66,6 +66,8 @@
 public class KuduCatalogOpExecutor {
   public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);
 
+  private static final Object kuduDdlLock_ = new Object();
+
   /**
    * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
    * Throws an exception if 'msTbl' represents an external table or if the table couldn't
@@ -84,28 +86,31 @@
     }
     KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
     try {
-      // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
-      // (see KUDU-1710).
-      boolean tableExists = kudu.tableExists(kuduTableName);
-      if (tableExists && params.if_not_exists) return;
+      // Acquire lock to protect table existence check and table creation, see IMPALA-8984
+      synchronized (kuduDdlLock_) {
+        // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
+        // (see KUDU-1710).
+        boolean tableExists = kudu.tableExists(kuduTableName);
+        if (tableExists && params.if_not_exists) return;
 
-      // if table is managed or external with external.purge.table = true in
-      // tblproperties we should create the Kudu table if it does not exist
-      if (tableExists) {
-        throw new ImpalaRuntimeException(String.format(
-            "Table '%s' already exists in Kudu.", kuduTableName));
-      }
-      Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
-      Schema schema = createTableSchema(params);
-      CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
-      org.apache.kudu.client.KuduTable table =
-          kudu.createTable(kuduTableName, schema, tableOpts);
-      // Populate table ID from Kudu table if Kudu's integration with the Hive
-      // Metastore is enabled.
-      if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
-        String tableId = table.getTableId();
-        Preconditions.checkNotNull(tableId);
-        msTbl.getParameters().put(KuduTable.KEY_TABLE_ID, tableId);
+        // if table is managed or external with external.purge.table = true in
+        // tblproperties we should create the Kudu table if it does not exist
+        if (tableExists) {
+          throw new ImpalaRuntimeException(String.format(
+              "Table '%s' already exists in Kudu.", kuduTableName));
+        }
+        Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
+        Schema schema = createTableSchema(params);
+        CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
+        org.apache.kudu.client.KuduTable table =
+            kudu.createTable(kuduTableName, schema, tableOpts);
+        // Populate table ID from Kudu table if Kudu's integration with the Hive
+        // Metastore is enabled.
+        if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
+          String tableId = table.getTableId();
+          Preconditions.checkNotNull(tableId);
+          msTbl.getParameters().put(KuduTable.KEY_TABLE_ID, tableId);
+        }
       }
     } catch (Exception e) {
       throw new ImpalaRuntimeException(String.format("Error creating Kudu table '%s'",
diff --git a/tests/custom_cluster/test_concurrent_kudu_create.py b/tests/custom_cluster/test_concurrent_kudu_create.py
new file mode 100644
index 0000000..900fffa
--- /dev/null
+++ b/tests/custom_cluster/test_concurrent_kudu_create.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import threading
+import time
+
+from multiprocessing.pool import ThreadPool
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+TBL_NAME = "test_concurrent_kudu_create"
+
+
+class TestConcurrentKuduCreate(CustomClusterTestSuite):
+  """Test concurrent create kudu managed table"""
+
+  @pytest.mark.execute_serially
+  def test_concurrent_create_kudu_table(self, unique_database):
+    table_name = unique_database + "." + TBL_NAME
+    test_self = self
+
+    class ThreadLocalClient(threading.local):
+      def __init__(self):
+        self.client = test_self.create_impala_client()
+
+    tls = ThreadLocalClient()
+
+    def run_create_table_if_not_exists():
+      self.execute_query_expect_success(
+        tls.client, "create table if not exists %s "
+                    "(id int, primary key(id)) stored as kudu" % table_name)
+      tls.client.close()
+
+    # Drop table before run test if exists
+    self.execute_query("drop table if exists %s" % table_name)
+    NUM_ITERS = 20
+    for i in xrange(NUM_ITERS):
+      # Run several commands by specific time interval to reproduce this bug
+      pool = ThreadPool(processes=3)
+      r1 = pool.apply_async(run_create_table_if_not_exists)
+      r2 = pool.apply_async(run_create_table_if_not_exists)
+      # Sleep to make race conflict happens in different places
+      time.sleep(1)
+      r3 = pool.apply_async(run_create_table_if_not_exists)
+      r1.get()
+      r2.get()
+      r3.get()
+      pool.terminate()
+      # If hit IMPALA-8984, this query would be failed due to table been deleted in kudu
+      self.execute_query_expect_success(tls.client, "select * from %s" % table_name)
+      self.execute_query("drop table if exists %s" % table_name)