IMPALA-9092: Add support for creating external Kudu table

In HMS-3 the translation layer converts a managed kudu table into an
external kudu table and adds additional table property
'external.table.purge' to 'true'. This means any installation which
is using HMS-3 (or a Hive version which has HIVE-22158) will always
create Kudu tables as external tables. This is problematic since the
output of show create table will now be different and may confuse
the users.

In order to improve the user experience of such synchronized tables
(external tables with external.table.purge property set to true),
this patch adds support in Impala to create
external Kudu tables. Previous versions of Impala disallowed
creating a external Kudu table if the Kudu table did not exist.
After this patch, Impala will check if the Kudu table exists and if
it does not it will create a Kudu table based on the schema provided
in the create table statement. The command will error out if the Kudu
table already exists. However, this applies to only the synchronized
tables. Previous way to create a pure external table behaves the
same.

Following syntax of creating a synchronized table is now allowed:

CREATE EXTERNAL TABLE foo (
  id int PRIMARY KEY,
  name string)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='true')

The syntax is very similar to creating a managed table, except for
the EXTERNAL keyword and additional table property. A synchronized
table will behave similar to managed Kudu tables (drops and renames
are allowed). The output of show create table on a synchronized
table will display the full column and partition spec similar to the
managed tables.

Testing:
1. After the CDP version bump all of the existing Kudu tables now
create synchronized tables so there is good coverage there.
2. Added additional tests which create synchronized tables and
compares the show create table output.
3. Ran exhaustive tests with both CDP and CDH builds.

Change-Id: I76f81d41db0cf2269ee1b365857164a43677e14d
Reviewed-on: http://gerrit.cloudera.org:8080/14750
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 6f7431c..dce7fae 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -118,18 +118,13 @@
     // TODO IMPALA-6375: Allow setting kudu.table_name for synchronized Kudu tables
     if (KuduTable.isSynchronizedTable(table_.getMetaStoreTable())) {
       AnalysisUtils.throwIfNotNull(tblProperties_.get(KuduTable.KEY_TABLE_NAME),
-          String.format("Not allowed to set '%s' manually for managed Kudu tables .",
+          String.format("Not allowed to set '%s' manually for synchronized Kudu tables .",
               KuduTable.KEY_TABLE_NAME));
     }
     // Throw error if kudu.table_id is provided for Kudu tables.
     AnalysisUtils.throwIfNotNull(tblProperties_.get(KuduTable.KEY_TABLE_ID),
         String.format("Property '%s' cannot be altered for Kudu tables",
             KuduTable.KEY_TABLE_ID));
-    // Throw error if 'external.table.purge' is manually set.
-    AnalysisUtils.throwIfNotNull(
-        tblProperties_.get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE),
-        String.format("Property '%s' cannot be altered for Kudu tables",
-            KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
     AuthorizationConfig authzConfig = analyzer.getAuthzConfig();
     if (authzConfig.isEnabled()) {
       // Checking for 'EXTERNAL' is case-insensitive, see IMPALA-5637.
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 7e4d33d..4218248 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -28,6 +28,7 @@
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
+import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.RuntimeEnv;
@@ -286,10 +287,14 @@
     }
 
     analyzeKuduTableProperties(analyzer);
-    if (isExternal()) {
+    if (isExternal() && !Boolean.parseBoolean(getTblProperties().get(
+        Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
+      // this is an external table
       analyzeExternalKuduTableParams(analyzer);
     } else {
-      analyzeManagedKuduTableParams(analyzer);
+      // this is either a managed table or an external table with external.table.purge
+      // property set to true
+      analyzeSynchronizedKuduTableParams(analyzer);
     }
   }
 
@@ -323,7 +328,7 @@
     putGeneratedKuduProperty(KuduTable.KEY_STORAGE_HANDLER,
         KuduTable.KUDU_STORAGE_HANDLER);
 
-    String kuduMasters = populateKuduMasters(analyzer);
+    String kuduMasters = getKuduMasters(analyzer);
     if (kuduMasters.isEmpty()) {
       throw new AnalysisException(String.format(
           "Table property '%s' is required when the impalad startup flag " +
@@ -348,7 +353,7 @@
    *  Populates Kudu master addresses either from table property or
    *  the -kudu_master_hosts flag.
    */
-  private String populateKuduMasters(Analyzer analyzer) {
+  private String getKuduMasters(Analyzer analyzer) {
     String kuduMasters = getTblProperties().get(KuduTable.KEY_MASTER_HOSTS);
     if (Strings.isNullOrEmpty(kuduMasters)) {
       kuduMasters = analyzer.getCatalog().getDefaultKuduMasterHosts();
@@ -361,6 +366,9 @@
    */
   private void analyzeExternalKuduTableParams(Analyzer analyzer)
       throws AnalysisException {
+    Preconditions.checkState(!Boolean
+        .parseBoolean(getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE)));
+    // this is just a regular external table. Kudu table name must be specified
     AnalysisUtils.throwIfNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME),
         String.format("Table property %s must be specified when creating " +
             "an external Kudu table.", KuduTable.KEY_TABLE_NAME));
@@ -372,13 +380,6 @@
     AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS),
         String.format("Table property '%s' cannot be used with an external Kudu table.",
             KuduTable.KEY_TABLET_REPLICAS));
-    // External table cannot have 'external.table.purge' property set, which is considered
-    // equivalent to managed table.
-    if (Boolean.parseBoolean(
-            getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
-      throw new AnalysisException(String.format("Table property '%s' cannot be set to " +
-          "true with an external Kudu table.", KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
-    }
     AnalysisUtils.throwIfNotEmpty(getColumnDefs(),
         "Columns cannot be specified with an external Kudu table.");
     AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
@@ -386,10 +387,17 @@
   }
 
   /**
-   * Analyzes and checks parameters specified for managed Kudu tables.
+   * Analyzes and checks parameters specified for synchronized Kudu tables.
    */
-  private void analyzeManagedKuduTableParams(Analyzer analyzer) throws AnalysisException {
-    analyzeManagedKuduTableName(analyzer);
+  private void analyzeSynchronizedKuduTableParams(Analyzer analyzer)
+      throws AnalysisException {
+    // A managed table cannot have 'external.table.purge' property set
+    if (!isExternal() && Boolean.parseBoolean(
+        getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
+      throw new AnalysisException(String.format("Table property '%s' cannot be set to " +
+          "true with an managed Kudu table.", KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
+    }
+    analyzeSynchronizedKuduTableName(analyzer);
 
     // Check column types are valid Kudu types
     for (ColumnDef col: getColumnDefs()) {
@@ -418,13 +426,7 @@
             "zero. Given number of replicas is: " + r.toString());
       }
     }
-
-    if (!getKuduPartitionParams().isEmpty()) {
-      analyzeKuduPartitionParams(analyzer);
-    } else {
-      analyzer.addWarning(
-          "Unpartitioned Kudu tables are inefficient for large data sizes.");
-    }
+    analyzeKuduPartitionParams(analyzer);
   }
 
   /**
@@ -432,11 +434,12 @@
    * it in TableDef.generatedKuduTableName_. Throws if the Kudu table
    * name was given manually via TBLPROPERTIES.
    */
-  private void analyzeManagedKuduTableName(Analyzer analyzer) throws AnalysisException {
+  private void analyzeSynchronizedKuduTableName(Analyzer analyzer)
+      throws AnalysisException {
     AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME),
-        String.format("Not allowed to set '%s' manually for managed Kudu tables .",
+        String.format("Not allowed to set '%s' manually for synchronized Kudu tables.",
             KuduTable.KEY_TABLE_NAME));
-    String kuduMasters = populateKuduMasters(analyzer);
+    String kuduMasters = getKuduMasters(analyzer);
     boolean isHMSIntegrationEnabled;
     try {
       // Check if Kudu's integration with the Hive Metastore is enabled. Validation
@@ -452,10 +455,16 @@
   }
 
   /**
-   * Analyzes the partitioning schemes specified in the CREATE TABLE statement.
+   * Analyzes the partitioning schemes specified in the CREATE TABLE statement. Also,
+   * adds primary keys to the partitioning scheme if no partitioning keys are provided
    */
   private void analyzeKuduPartitionParams(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU);
+    if (getKuduPartitionParams().isEmpty()) {
+      analyzer.addWarning(
+          "Unpartitioned Kudu tables are inefficient for large data sizes.");
+      return;
+    }
     Map<String, ColumnDef> pkColDefsByName =
         ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs());
     for (KuduPartitionParam partitionParam: getKuduPartitionParams()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 2df6eb8..5b2dedf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -72,7 +72,7 @@
   @VisibleForTesting
   protected static final ImmutableSet<String> HIDDEN_TABLE_PROPERTIES = ImmutableSet.of(
       "EXTERNAL", "comment", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
-      AlterTableSortByStmt.TBL_PROP_SORT_ORDER);
+      AlterTableSortByStmt.TBL_PROP_SORT_ORDER, "TRANSLATED_TO_EXTERNAL");
 
   /**
    * Removes all hidden properties from the given 'tblProperties' map.
@@ -311,8 +311,8 @@
     if (properties.containsKey(Table.TBL_PROP_LAST_DDL_TIME)) {
       properties.remove(Table.TBL_PROP_LAST_DDL_TIME);
     }
-    boolean isExternal = msTable.getTableType() != null &&
-        msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString());
+    boolean isExternal = Table.isExternalTable(msTable);
+
     List<String> sortColsSql = getSortColumns(properties);
     TSortingOrder sortingOrder = TSortingOrder.valueOf(getSortingOrder(properties));
     String comment = properties.get("comment");
@@ -346,7 +346,7 @@
       storageHandlerClassName = null;
       properties.remove(KuduTable.KEY_STORAGE_HANDLER);
       String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
-      // Remove the hidden table property 'kudu.table_name' for a managed Kudu table.
+      // Remove the hidden table property 'kudu.table_name' for a synchronized Kudu table.
       if (kuduTableName != null &&
           KuduUtil.isDefaultKuduTableName(kuduTableName,
               table.getDb().getName(), table.getName())) {
@@ -357,7 +357,7 @@
       // Internal property, should not be exposed to the user.
       properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 
-      if (!isExternal) {
+      if (KuduTable.isSynchronizedTable(msTable)) {
         primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
 
         List<String> paramsSql = new ArrayList<>();
@@ -366,7 +366,8 @@
         }
         kuduPartitionByParams = Joiner.on(", ").join(paramsSql);
       } else {
-        // We shouldn't output the columns for external tables
+        // we don't output the column spec if this is not a synchronized table (not
+        // managed and not external.purge table)
         colsSql = null;
       }
     } else if (table instanceof FeFsTable) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 4ef5d95..6bdcf17 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -143,8 +143,7 @@
     // HIVE-22158: A translated table can have external purge property set to true
     // in such case we sync operations in Impala and Kudu
     // it is possible that in older versions of HMS a managed Kudu table is present
-    return isManagedTable(msTbl) || (isExternalTable(msTbl) && Boolean
-        .parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE)));
+    return isManagedTable(msTbl) || isExternalPurgeTable(msTbl);
   }
 
   /**
@@ -155,18 +154,6 @@
     return msTbl.getTableType().equalsIgnoreCase(TableType.MANAGED_TABLE.toString());
   }
 
-  /**
-   * Returns if the given HMS table is external table or not based on table type or table
-   * properties. Implementation is based on org.apache.hadoop.hive.metastore.utils
-   * .MetaStoreUtils.isExternalTable()
-   */
-  public static boolean isExternalTable(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    // HIVE-19253: table property can also indicate an external table.
-    return (msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) ||
-        ("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE)));
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index ee6e92a..ed3b5ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -195,6 +195,29 @@
     initMetrics();
   }
 
+  /**
+   * Returns if the given HMS table is an external table (uses table type if
+   * available or else uses table properties). Implementation is based on org.apache
+   * .hadoop.hive.metastore.utils.MetaStoreUtils.isExternalTable()
+   */
+  public static boolean isExternalTable(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    // HIVE-19253: table property can also indicate an external table.
+    return (msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) ||
+        ("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE)));
+  }
+
+  /**
+   * In certain versions of Hive (See HIVE-22158) HMS translates a managed table to a
+   * external and sets additional property of "external.table.purge" = true. This
+   * method can be used to identify such translated tables.
+   */
+  public static boolean isExternalPurgeTable(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    return isExternalTable(msTbl) && Boolean
+        .parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE));
+  }
+
   public ReentrantLock getLock() { return tableLock_; }
   @Override
   public abstract TTableDescriptor toThriftDescriptor(
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5689a0d..f177e65 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2209,7 +2209,8 @@
    * tables should be treated as managed (synchronized) tables to keep the user facing
    * behavior consistent.
    *
-   * For managed tables:
+   * For synchronized tables (managed or external tables with external.table.purge=true
+   * in tblproperties):
    *  1. If Kudu's integration with the Hive Metastore is not enabled, the Kudu
    *     table is first created in Kudu, then in the HMS.
    *  2. Otherwise, when the table is created in Kudu, we rely on Kudu to have
@@ -2226,20 +2227,23 @@
   private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
       TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkState(KuduTable.isKuduTable(newTable));
-    if (KuduTable.isExternalTable(newTable)) {
+    boolean createHMSTable;
+    if (!KuduTable.isSynchronizedTable(newTable)) {
+      // if this is not a synchronized table, we assume that the table must be existing
+      // in kudu and use the column spec from Kudu
       KuduCatalogOpExecutor.populateExternalTableColsFromKudu(newTable);
+      createHMSTable = true;
     } else {
-      KuduCatalogOpExecutor.createManagedTable(newTable, params);
+      // if this is a synchronized table (managed or external.purge table) then we
+      // create it in Kudu first
+      KuduCatalogOpExecutor.createSynchronizedTable(newTable, params);
+      createHMSTable = !isKuduHmsIntegrationEnabled(newTable);
     }
-    // When Kudu's integration with the Hive Metastore is enabled, Kudu will create
-    // the HMS table for managed tables.
-    boolean createsHMSTable = KuduTable.isExternalTable(newTable) ?
-        true : !isKuduHmsIntegrationEnabled(newTable);
     try {
       // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
       synchronized (metastoreDdlLock_) {
-        if (createsHMSTable) {
+        if (createHMSTable) {
           try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
             msClient.getHiveClient().createTable(newTable);
           }
@@ -2251,8 +2255,8 @@
       }
     } catch (Exception e) {
       try {
-        // Error creating the table in HMS, drop the managed table from Kudu.
-        if (!KuduTable.isExternalTable(newTable)) {
+        // Error creating the table in HMS, drop the synchronized table from Kudu.
+        if (!KuduTable.isSynchronizedTable(newTable)) {
           KuduCatalogOpExecutor.dropTable(newTable, false);
         }
       } catch (Exception logged) {
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index a04e342..eef1e5f 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -27,6 +27,7 @@
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -70,9 +71,9 @@
    * Throws an exception if 'msTbl' represents an external table or if the table couldn't
    * be created in Kudu.
    */
-  static void createManagedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+  static void createSynchronizedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
       TCreateTableParams params) throws ImpalaRuntimeException {
-    Preconditions.checkState(!KuduTable.isExternalTable(msTbl));
+    Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
     Preconditions.checkState(
         msTbl.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
     String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
@@ -85,11 +86,16 @@
     try {
       // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
       // (see KUDU-1710).
-      if (kudu.tableExists(kuduTableName)) {
-        if (params.if_not_exists) return;
+      boolean tableExists = kudu.tableExists(kuduTableName);
+      if (tableExists && params.if_not_exists) return;
+
+      // if table is managed or external with external.purge.table = true in
+      // tblproperties we should create the Kudu table if it does not exist
+      if (tableExists) {
         throw new ImpalaRuntimeException(String.format(
             "Table '%s' already exists in Kudu.", kuduTableName));
       }
+      Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
       Schema schema = createTableSchema(params);
       CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
       org.apache.kudu.client.KuduTable table =
@@ -283,7 +289,7 @@
     org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
     List<FieldSchema> cols = msTblCopy.getSd().getCols();
     // External table should not have table ID.
-    Preconditions.checkState(KuduTable.isExternalTable(msTbl));
+    Preconditions.checkState(Table.isExternalTable(msTbl));
     Preconditions.checkState(
         msTblCopy.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
     String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
index 8e9b632..288bce3 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
@@ -35,212 +35,256 @@
  */
 public class AnalyzeKuduDDLTest extends FrontendTestBase {
 
-  @Test
-  public void TestCreateManagedKuduTable() {
+  /**
+   * This is wrapper around super.AnalyzesOk method. The additional boolean is used to
+   * add tblproperties for synchronized table
+   */
+  public ParseNode AnalyzesOk(String stmt, String errorStr, boolean isExternalPurgeTbl) {
+    return super
+        .AnalyzesOk(appendSynchronizedTblProps(stmt, isExternalPurgeTbl), errorStr);
+  }
+
+  /**
+   * Wrapper around super.AnalyzesOk with additional boolean for adding synchronized
+   * table properties
+   */
+  public ParseNode AnalyzesOk(String stmt, boolean isExternalPurgeTbl) {
+    return super.AnalyzesOk(appendSynchronizedTblProps(stmt, isExternalPurgeTbl));
+  }
+
+  private String appendSynchronizedTblProps(String stmt, boolean append) {
+    if (!append) { return stmt; }
+
+    stmt = stmt.replace("create table", "create external table");
+    if (!stmt.contains("tblproperties")) {
+      stmt += " tblproperties ('external.table.purge'='true')";
+    } else {
+      stmt = stmt.replaceAll("tblproperties\\s*\\(",
+          "tblproperties ('external.table.purge'='true', ");
+    }
+    return stmt;
+  }
+
+  public void AnalysisError(String stmt, String expectedError,
+      boolean isExternalPurgeTbl) {
+    super.AnalysisError(appendSynchronizedTblProps(stmt, isExternalPurgeTbl),
+        expectedError);
+  }
+
+  private void testDDlsOnKuduTable(boolean isExternalPurgeTbl) {
     TestUtils.assumeKuduIsSupported();
     // Test primary keys and partition by clauses
     AnalyzesOk("create table tab (x int primary key) partition by hash(x) " +
-        "partitions 8 stored as kudu");
+        "partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, primary key(x)) partition by hash(x) " +
-        "partitions 8 stored as kudu");
+        "partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key (x, y)) " +
-        "partition by hash(x, y) partitions 8 stored as kudu");
+        "partition by hash(x, y) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key (x)) " +
-        "partition by hash(x) partitions 8 stored as kudu");
+        "partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " +
-        "partition by hash(y) partitions 8 stored as kudu");
+        "partition by hash(y) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x timestamp, y timestamp, primary key(x)) " +
-        "partition by hash(x) partitions 8 stored as kudu");
+        "partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y string, primary key (x)) partition by " +
         "hash (x) partitions 3, range (x) (partition values < 1, partition " +
         "1 <= values < 10, partition 10 <= values < 20, partition value = 30) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key (x, y)) partition by " +
         "range (x, y) (partition value = (2001, 1), partition value = (2002, 1), " +
-        "partition value = (2003, 2)) stored as kudu");
+        "partition value = (2003, 2)) stored as kudu", isExternalPurgeTbl);
     // Non-literal boundary values in range partitions
     AnalyzesOk("create table tab (x int, y int, primary key (x)) partition by " +
         "range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, " +
         "partition factorial(4) < values < factorial(5), " +
-        "partition value = factorial(6)) stored as kudu");
+        "partition value = factorial(6)) stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key(x, y)) partition by " +
         "range(x, y) (partition value = (1+1, 2+2), partition value = ((1+1+1)+1, 10), " +
-        "partition value = (cast (30 as int), factorial(5))) stored as kudu");
+        "partition value = (cast (30 as int), factorial(5))) stored as kudu",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values < x + 1) stored as kudu", "Only constant values are allowed " +
-        "for range-partition bounds: x + 1");
+        "for range-partition bounds: x + 1", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values <= isnull(null, null)) stored as kudu", "Range partition " +
         "values cannot be NULL. Range partition: 'PARTITION VALUES <= " +
-        "isnull(NULL, NULL)'");
+        "isnull(NULL, NULL)'", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values <= (select count(*) from functional.alltypestiny)) " +
         "stored as kudu", "Only constant values are allowed for range-partition " +
-        "bounds: (SELECT count(*) FROM functional.alltypestiny)");
+        "bounds: (SELECT count(*) FROM functional.alltypestiny)", isExternalPurgeTbl);
     // Multilevel partitioning. Data is split into 3 buckets based on 'x' and each
     // bucket is partitioned into 4 tablets based on the range partitions of 'y'.
     AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " +
         "partition by hash(x) partitions 3, range(y) " +
         "(partition values < 'aa', partition 'aa' <= values < 'bb', " +
         "partition 'bb' <= values < 'cc', partition 'cc' <= values) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     // Key column in upper case
     AnalyzesOk("create table tab (x int, y int, primary key (X)) " +
-        "partition by hash (x) partitions 8 stored as kudu");
+        "partition by hash (x) partitions 8 stored as kudu", isExternalPurgeTbl);
     // Flexible Partitioning
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
         "partition by hash (a, b) partitions 8, hash(c) partitions 2 stored as " +
-        "kudu");
+        "kudu", isExternalPurgeTbl);
     // No columns specified in the PARTITION BY HASH clause
     AnalyzesOk("create table tab (a int primary key, b int, c int, d int) " +
-        "partition by hash partitions 8 stored as kudu");
+        "partition by hash partitions 8 stored as kudu", isExternalPurgeTbl);
     // Distribute range data types are picked up during analysis and forwarded to Kudu.
     // Column names in distribute params should also be case-insensitive.
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" +
         "partition by hash (a, B, c) partitions 8, " +
         "range (A) (partition values < 1, partition 1 <= values < 2, " +
         "partition 2 <= values < 3, partition 3 <= values < 4, partition 4 <= values) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     // Allowing range partitioning on a subset of the primary keys
     AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " +
         "primary key (id, name)) partition by range (name) " +
-        "(partition 'aa' < values <= 'bb') stored as kudu");
+        "(partition 'aa' < values <= 'bb') stored as kudu", isExternalPurgeTbl);
     // Null values in range partition values
     AnalysisError("create table tab (id int, name string, primary key(id, name)) " +
         "partition by hash (id) partitions 3, range (name) " +
         "(partition value = null, partition value = 1) stored as kudu",
-"Range partition values cannot be NULL. Range partition: 'PARTITION " +
-        "VALUE = NULL'");
+        "Range partition values cannot be NULL. Range partition: 'PARTITION " +
+        "VALUE = NULL'", isExternalPurgeTbl);
     // Primary key specified in tblproperties
     AnalysisError(String.format("create table tab (x int) partition by hash (x) " +
         "partitions 8 stored as kudu tblproperties ('%s' = 'x')",
         KuduTable.KEY_KEY_COLUMNS), "PRIMARY KEY must be used instead of the table " +
-        "property");
+        "property", isExternalPurgeTbl);
     // Primary key column that doesn't exist
     AnalysisError("create table tab (x int, y int, primary key (z)) " +
         "partition by hash (x) partitions 8 stored as kudu",
-        "PRIMARY KEY column 'z' does not exist in the table");
+        "PRIMARY KEY column 'z' does not exist in the table", isExternalPurgeTbl);
     // Invalid composite primary key
     AnalysisError("create table tab (x int primary key, primary key(x)) stored " +
         "as kudu", "Multiple primary keys specified. Composite primary keys can " +
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
-        "of the column definition.");
+        "of the column definition.", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key, y int primary key) stored " +
         "as kudu", "Multiple primary keys specified. Composite primary keys can " +
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
-        "of the column definition.");
+        "of the column definition.", isExternalPurgeTbl);
     // Specifying the same primary key column multiple times
     AnalysisError("create table tab (x int, primary key (x, x)) partition by hash (x) " +
         "partitions 8 stored as kudu",
-        "Column 'x' is listed multiple times as a PRIMARY KEY.");
+        "Column 'x' is listed multiple times as a PRIMARY KEY.", isExternalPurgeTbl);
     // Number of range partition boundary values should be equal to the number of range
     // columns.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
         "partition by range(a) (partition value = (1, 2), " +
         "partition value = 3, partition value = 4) stored as kudu",
-"Number of specified range partition values is different than the number of " +
-        "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1, 2)'");
+        "Number of specified range partition values is different than the number of " +
+        "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1, 2)'",
+        isExternalPurgeTbl);
     // Key ranges must match the column types.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
         "partition by hash (a, b, c) partitions 8, range (a) " +
         "(partition value = 1, partition value = 'abc', partition 3 <= values) " +
         "stored as kudu", "Range partition value 'abc' (type: STRING) is not type " +
-        "compatible with partitioning column 'a' (type: INT).");
+        "compatible with partitioning column 'a' (type: INT).", isExternalPurgeTbl);
     AnalysisError("create table tab (a tinyint primary key) partition by range (a) " +
         "(partition value = 128) stored as kudu", "Range partition value 128 " +
         "(type: SMALLINT) is not type compatible with partitioning column 'a' " +
-        "(type: TINYINT)");
+        "(type: TINYINT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a smallint primary key) partition by range (a) " +
         "(partition value = 32768) stored as kudu", "Range partition value 32768 " +
         "(type: INT) is not type compatible with partitioning column 'a' " +
-        "(type: SMALLINT)");
+        "(type: SMALLINT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a int primary key) partition by range (a) " +
         "(partition value = 2147483648) stored as kudu", "Range partition value " +
         "2147483648 (type: BIGINT) is not type compatible with partitioning column 'a' " +
-        "(type: INT)");
+        "(type: INT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a bigint primary key) partition by range (a) " +
         "(partition value = 9223372036854775808) stored as kudu", "Range partition " +
         "value 9223372036854775808 (type: DECIMAL(19,0)) is not type compatible with " +
-        "partitioning column 'a' (type: BIGINT)");
+        "partitioning column 'a' (type: BIGINT)", isExternalPurgeTbl);
     // Test implicit casting/folding of partition values.
     AnalyzesOk("create table tab (a int primary key) partition by range (a) " +
-        "(partition value = false, partition value = true) stored as kudu");
+        "(partition value = false, partition value = true) stored as kudu",
+        isExternalPurgeTbl);
     // Non-key column used in PARTITION BY
     AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " +
         "partition by range (b) (partition value = 'abc') stored as kudu",
-  "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " +
-        "Only key columns can be used in PARTITION BY.");
+        "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " +
+        "Only key columns can be used in PARTITION BY.", isExternalPurgeTbl);
     // No float range partition values
     AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
         "partition by hash (a, b, c) partitions 8, " +
         "range (a) (partition value = 1.2, partition value = 2) stored as kudu",
         "Range partition value 1.2 (type: DECIMAL(2,1)) is not type compatible with " +
-        "partitioning column 'a' (type: INT).");
+        "partitioning column 'a' (type: INT).", isExternalPurgeTbl);
     // Non-existing column used in PARTITION BY
     AnalysisError("create table tab (a int, b int, primary key (a, b)) " +
         "partition by range(unknown_column) (partition value = 'abc') stored as kudu",
         "Column 'unknown_column' in 'RANGE (unknown_column) " +
         "(PARTITION VALUE = 'abc')' is not a key column. Only key columns can be used " +
-        "in PARTITION BY");
+        "in PARTITION BY", isExternalPurgeTbl);
     // Kudu num_tablet_replicas is specified in tblproperties
     String kuduMasters = catalog_.getDefaultKuduMasterHosts();
     AnalyzesOk(String.format("create table tab (x int primary key) partition by " +
         "hash (x) partitions 8 stored as kudu tblproperties " +
         "('kudu.num_tablet_replicas'='1', 'kudu.master_addresses' = '%s')",
-        kuduMasters));
+        kuduMasters), isExternalPurgeTbl);
     // Kudu table name is specified in tblproperties resulting in an error
     AnalysisError("create table tab (x int primary key) partition by hash (x) " +
         "partitions 8 stored as kudu tblproperties ('kudu.table_name'='tab')",
-        "Not allowed to set 'kudu.table_name' manually for managed Kudu tables");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables.",
+        isExternalPurgeTbl);
     // No port is specified in kudu master address
     AnalyzesOk(String.format("create table tdata_no_port (id int primary key, " +
         "name string, valf float, vali bigint) partition by range(id) " +
         "(partition values <= 10, partition 10 < values <= 30, " +
         "partition 30 < values) stored as kudu tblproperties" +
-        "('kudu.master_addresses' = '%s')", kuduMasters));
+        "('kudu.master_addresses' = '%s')", kuduMasters), isExternalPurgeTbl);
     // Not using the STORED AS KUDU syntax to specify a Kudu table
     AnalysisError("create table tab (x int) tblproperties (" +
         "'storage_handler'='org.apache.hadoop.hive.kudu.KuduStorageHandler')",
-        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE, isExternalPurgeTbl);
     // Creating unpartitioned table results in a warning.
     AnalyzesOk("create table tab (x int primary key) stored as kudu tblproperties (" +
         "'storage_handler'='org.apache.hadoop.hive.kudu.KuduStorageHandler')",
-        "Unpartitioned Kudu tables are inefficient for large data sizes.");
+        "Unpartitioned Kudu tables are inefficient for large data sizes.",
+        isExternalPurgeTbl);
     // Invalid value for number of replicas
     AnalysisError("create table t (x int primary key) stored as kudu tblproperties (" +
         "'kudu.num_tablet_replicas'='1.1')",
-        "Table property 'kudu.num_tablet_replicas' must be an integer.");
+        "Table property 'kudu.num_tablet_replicas' must be an integer.",
+        isExternalPurgeTbl);
     // Don't allow caching
     AnalysisError("create table tab (x int primary key) stored as kudu cached in " +
-        "'testPool'", "A Kudu table cannot be cached in HDFS.");
+        "'testPool'", "A Kudu table cannot be cached in HDFS.", isExternalPurgeTbl);
     // LOCATION cannot be used with Kudu tables
     AnalysisError("create table tab (a int primary key) partition by hash (a) " +
         "partitions 3 stored as kudu location '/test-warehouse/'",
-        "LOCATION cannot be specified for a Kudu table.");
+        "LOCATION cannot be specified for a Kudu table.", isExternalPurgeTbl);
     // Creating unpartitioned table results in a warning.
     AnalyzesOk("create table tab (a int, primary key (a)) stored as kudu",
-        "Unpartitioned Kudu tables are inefficient for large data sizes.");
+        "Unpartitioned Kudu tables are inefficient for large data sizes.",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (a int) stored as kudu",
-        "A primary key is required for a Kudu table.");
+        "A primary key is required for a Kudu table.", isExternalPurgeTbl);
     // Using ROW FORMAT with a Kudu table
     AnalysisError("create table tab (x int primary key) " +
         "row format delimited escaped by 'X' stored as kudu",
-        "ROW FORMAT cannot be specified for file format KUDU.");
+        "ROW FORMAT cannot be specified for file format KUDU.", isExternalPurgeTbl);
     // Using PARTITIONED BY with a Kudu table
     AnalysisError("create table tab (x int primary key) " +
         "partitioned by (y int) stored as kudu", "PARTITIONED BY cannot be used " +
-        "in Kudu tables.");
+        "in Kudu tables.", isExternalPurgeTbl);
     // Multi-column range partitions
     AnalyzesOk("create table tab (a bigint, b tinyint, c double, primary key(a, b)) " +
-        "partition by range(a, b) (partition (0, 0) < values <= (1, 1)) stored as kudu");
+        "partition by range(a, b) (partition (0, 0) < values <= (1, 1)) stored as kudu",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (a bigint, b tinyint, c double, primary key(a, b)) " +
         "partition by range(a, b) (partition values <= (1, 'b')) stored as kudu",
         "Range partition value 'b' (type: STRING) is not type compatible with " +
-        "partitioning column 'b' (type: TINYINT)");
+        "partitioning column 'b' (type: TINYINT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a bigint, b tinyint, c double, primary key(a, b)) " +
         "partition by range(a, b) (partition 0 < values <= 1) stored as kudu",
         "Number of specified range partition values is different than the number of " +
-        "partitioning columns: (1 vs 2). Range partition: 'PARTITION 0 < VALUES <= 1'");
+        "partitioning columns: (1 vs 2). Range partition: 'PARTITION 0 < VALUES <= 1'",
+        isExternalPurgeTbl);
 
 
     // Test unsupported Kudu types
@@ -253,12 +297,12 @@
       // Unsupported type is PK and partition col
       String stmt = String.format("create table tab (x %s primary key) " +
           "partition by hash(x) partitions 3 stored as kudu", t);
-      AnalysisError(stmt, expectedError);
+      AnalysisError(stmt, expectedError, isExternalPurgeTbl);
 
       // Unsupported type is not PK/partition col
       stmt = String.format("create table tab (x int primary key, y %s) " +
           "partition by hash(x) partitions 3 stored as kudu", t);
-      AnalysisError(stmt, expectedError);
+      AnalysisError(stmt, expectedError, isExternalPurgeTbl);
     }
 
     // Test column options
@@ -275,16 +319,17 @@
                   "not null encoding %s compression %s %s %s, y int encoding %s " +
                   "compression %s %s %s %s) partition by hash (x) " +
                   "partitions 3 stored as kudu", enc, comp, def, block, enc,
-                  comp, def, nul, block));
+                  comp, def, nul, block), isExternalPurgeTbl);
 
               // For a key column
               String createTblStr = String.format("create table tab (x int primary key " +
                   "%s encoding %s compression %s %s %s) partition by hash (x) " +
                   "partitions 3 stored as kudu", nul, enc, comp, def, block);
               if (nul.equals("null")) {
-                AnalysisError(createTblStr, "Primary key columns cannot be nullable");
+                AnalysisError(createTblStr, "Primary key columns cannot be nullable",
+                    isExternalPurgeTbl);
               } else {
-                AnalyzesOk(createTblStr);
+                AnalyzesOk(createTblStr, isExternalPurgeTbl);
               }
             }
           }
@@ -296,33 +341,34 @@
         "i2 smallint default null, i3 int default null, i4 bigint default null, " +
         "vals string default null, valf float default null, vald double default null, " +
         "valb boolean default null, valdec decimal(10, 5) default null) " +
-        "partition by hash (x) partitions 3 stored as kudu");
+        "partition by hash (x) partitions 3 stored as kudu", isExternalPurgeTbl);
     // Use NULL as a default value on a non-nullable column
     AnalysisError("create table tab (x int primary key, y int not null default null) " +
         "partition by hash (x) partitions 3 stored as kudu", "Default value of NULL " +
-        "not allowed on non-nullable column: 'y'");
+        "not allowed on non-nullable column: 'y'", isExternalPurgeTbl);
     // Primary key specified using the PRIMARY KEY clause
     AnalyzesOk("create table tab (x int not null encoding plain_encoding " +
         "compression snappy block_size 1, y int null encoding rle compression lz4 " +
         "default 1, primary key(x)) partition by hash (x) partitions 3 " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     // Primary keys can't be null
     AnalysisError("create table tab (x int primary key null, y int not null) " +
         "partition by hash (x) partitions 3 stored as kudu", "Primary key columns " +
-        "cannot be nullable: x INT PRIMARY KEY NULL");
+        "cannot be nullable: x INT PRIMARY KEY NULL", isExternalPurgeTbl);
     AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " +
         "partition by hash (x) partitions 3 stored as kudu", "Primary key columns " +
-        "cannot be nullable: y INT NULL");
+        "cannot be nullable: y INT NULL", isExternalPurgeTbl);
     // Unsupported encoding value
     AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " +
         "partition by hash (x) partitions 3 stored as kudu", "Unsupported encoding " +
         "value 'INVALID_ENC'. Supported encoding values are: " +
-        Joiner.on(", ").join(Encoding.values()));
+        Joiner.on(", ").join(Encoding.values()), isExternalPurgeTbl);
     // Unsupported compression algorithm
     AnalysisError("create table tab (x int primary key, y int compression " +
         "invalid_comp) partition by hash (x) partitions 3 stored as kudu",
         "Unsupported compression algorithm 'INVALID_COMP'. Supported compression " +
-        "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()));
+        "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()),
+        isExternalPurgeTbl);
     // Default values
     AnalyzesOk("create table tab (i1 tinyint default 1, i2 smallint default 10, " +
         "i3 int default 100, i4 bigint default 1000, vals string default 'test', " +
@@ -330,119 +376,137 @@
         "cast(3.1452 as double), valb boolean default true, " +
         "valdec decimal(10, 5) default 3.14159, " +
         "primary key (i1, i2, i3, i4, vals)) partition by hash (i1) partitions 3 " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (i int primary key default 1+1+1) " +
-        "partition by hash (i) partitions 3 stored as kudu");
+        "partition by hash (i) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (i int primary key default factorial(5)) " +
-        "partition by hash (i) partitions 3 stored as kudu");
+        "partition by hash (i) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (i int primary key, x int null default " +
-        "isnull(null, null)) partition by hash (i) partitions 3 stored as kudu");
+        "isnull(null, null)) partition by hash (i) partitions 3 stored as kudu",
+        isExternalPurgeTbl);
     // Invalid default values
     AnalysisError("create table tab (i int primary key default 'string_val') " +
         "partition by hash (i) partitions 3 stored as kudu", "Default value " +
-        "'string_val' (type: STRING) is not compatible with column 'i' (type: INT).");
+        "'string_val' (type: STRING) is not compatible with column 'i' (type: INT).",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key, x int default 1.1) " +
         "partition by hash (i) partitions 3 stored as kudu",
         "Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " +
-        "'x' (type: INT).");
+        "'x' (type: INT).", isExternalPurgeTbl);
     AnalysisError("create table tab (i tinyint primary key default 128) " +
         "partition by hash (i) partitions 3 stored as kudu", "Default value " +
-        "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).");
+        "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key default isnull(null, null)) " +
         "partition by hash (i) partitions 3 stored as kudu", "Default value of " +
-        "NULL not allowed on non-nullable column: 'i'");
+        "NULL not allowed on non-nullable column: 'i'", isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key, x int not null " +
         "default isnull(null, null)) partition by hash (i) partitions 3 " +
         "stored as kudu", "Default value of NULL not allowed on non-nullable column: " +
-        "'x'");
+        "'x'", isExternalPurgeTbl);
     // Invalid block_size values
     AnalysisError("create table tab (i int primary key block_size 1.1) " +
         "partition by hash (i) partitions 3 stored as kudu", "Invalid value " +
-        "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.");
+        "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key block_size 'val') " +
         "partition by hash (i) partitions 3 stored as kudu", "Invalid value " +
-        "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.");
+        "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.",
+        isExternalPurgeTbl);
 
     // Sort columns are not supported for Kudu tables.
     AnalysisError("create table tab (i int, x int primary key) partition by hash(x) " +
         "partitions 8 sort by(i) stored as kudu", "SORT BY is not supported for Kudu " +
-        "tables.");
+        "tables.", isExternalPurgeTbl);
 
     // Z-Sort columns are not supported for Kudu tables.
     BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
 
     AnalysisError("create table tab (i int, x int primary key) partition by hash(x) " +
         "partitions 8 sort by zorder(i) stored as kudu", "SORT BY is not " +
-        "supported for Kudu tables.");
+        "supported for Kudu tables.", isExternalPurgeTbl);
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
 
     // Range partitions with TIMESTAMP
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition cast('2009-01-01 00:00:00' as timestamp) " +
-        "<= VALUES < '2009-01-02 00:00:00') stored as kudu");
+        "<= VALUES < '2009-01-02 00:00:00') stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition value = cast('2009-01-01 00:00:00' as timestamp" +
-        ")) stored as kudu");
+        ")) stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition value = '2009-01-01 00:00:00') " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (id int, ts timestamp, primary key(id, ts))" +
         "partition by range (partition value = (9, cast('2009-01-01 00:00:00' as " +
-        "timestamp))) stored as kudu");
+        "timestamp))) stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (id int, ts timestamp, primary key(id, ts))" +
         "partition by range (partition value = (9, '2009-01-01 00:00:00')) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalysisError("create table ts_ranges (ts timestamp primary key, i int)" +
         "partition by range (partition '2009-01-01 00:00:00' <= VALUES < " +
         "'NOT A TIMESTAMP') stored as kudu",
         "Range partition value 'NOT A TIMESTAMP' cannot be cast to target TIMESTAMP " +
-        "partitioning column.");
+        "partitioning column.", isExternalPurgeTbl);
     AnalysisError("create table ts_ranges (ts timestamp primary key, i int)" +
         "partition by range (partition 100 <= VALUES < 200) stored as kudu",
         "Range partition value 100 (type: TINYINT) is not type " +
-        "compatible with partitioning column 'ts' (type: TIMESTAMP).");
+        "compatible with partitioning column 'ts' (type: TIMESTAMP).",
+        isExternalPurgeTbl);
 
     // TIMESTAMP columns with default values
     AnalyzesOk("create table tdefault (id int primary key, ts timestamp default now())" +
-        "partition by hash(id) partitions 3 stored as kudu");
+        "partition by hash(id) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tdefault (id int primary key, ts timestamp default " +
         "unix_micros_to_utc_timestamp(1230768000000000)) partition by hash(id) " +
-        "partitions 3 stored as kudu");
+        "partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tdefault (id int primary key, " +
         "ts timestamp not null default '2009-01-01 00:00:00') " +
-        "partition by hash(id) partitions 3 stored as kudu");
+        "partition by hash(id) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tdefault (id int primary key, " +
         "ts timestamp not null default cast('2009-01-01 00:00:00' as timestamp)) " +
-        "partition by hash(id) partitions 3 stored as kudu");
+        "partition by hash(id) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalysisError("create table tdefault (id int primary key, ts timestamp " +
         "default null) partition by hash(id) partitions 3 stored as kudu",
-        "NULL cannot be cast to a TIMESTAMP literal.");
+        "NULL cannot be cast to a TIMESTAMP literal.", isExternalPurgeTbl);
     AnalysisError("create table tdefault (id int primary key, " +
         "ts timestamp not null default cast('00:00:00' as timestamp)) " +
         "partition by hash(id) partitions 3 stored as kudu",
-        "CAST('00:00:00' AS TIMESTAMP) cannot be cast to a TIMESTAMP literal.");
+        "CAST('00:00:00' AS TIMESTAMP) cannot be cast to a TIMESTAMP literal.",
+        isExternalPurgeTbl);
     AnalysisError("create table tdefault (id int primary key, " +
         "ts timestamp not null default '2009-1 foo') " +
         "partition by hash(id) partitions 3 stored as kudu",
-        "String '2009-1 foo' cannot be cast to a TIMESTAMP literal.");
+        "String '2009-1 foo' cannot be cast to a TIMESTAMP literal.",
+        isExternalPurgeTbl);
 
     // Test column comments.
     AnalyzesOk("create table tab (x int comment 'x', y int comment 'y', " +
-        "primary key (x, y)) stored as kudu");
+        "primary key (x, y)) stored as kudu", isExternalPurgeTbl);
 
     // Managed table is not allowed to set table property 'kudu.table_id'.
     AnalysisError("create table tab (x int primary key) partition by hash(x) " +
         "partitions 8 stored as kudu tblproperties ('kudu.table_id'='123456')",
         String.format("Table property %s should not be specified when " +
-            "creating a Kudu table.", KuduTable.KEY_TABLE_ID));
+            "creating a Kudu table.", KuduTable.KEY_TABLE_ID), isExternalPurgeTbl);
 
     // Kudu master address needs to be valid.
     AnalysisError("create table tab (x int primary key) partition by " +
         "hash (x) partitions 8 stored as kudu tblproperties " +
         "('kudu.master_addresses' = 'foo')",
         "Cannot analyze Kudu table 'tab': Error determining if Kudu's integration " +
-        "with the Hive Metastore is enabled");
+        "with the Hive Metastore is enabled", isExternalPurgeTbl);
+  }
+
+  @Test
+  public void TestCreateManagedKuduTable() {
+    testDDlsOnKuduTable(false);
+  }
+
+  @Test
+  public void TestCreateSynchronizedKuduTable() {
+    testDDlsOnKuduTable(true);
   }
 
   @Test
@@ -502,12 +566,15 @@
         "('kudu.table_name'='t', 'kudu.table_id'='123456')",
         String.format("Table property %s should not be specified when creating " +
             "a Kudu table.", KuduTable.KEY_TABLE_ID));
-    // External table is not allowed to set table property 'external.table.purge'
-    // to true.
+    // External table is not allowed to set table property 'kudu.table_name'
     AnalysisError("create external table t stored as kudu tblproperties " +
         "('external.table.purge'='true', 'kudu.table_name'='t')",
-        "Table property 'external.table.purge' cannot be set " +
-        "to true with an external Kudu table.");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables");
+    // trying to create the legacy external table syntax with external.table.purge
+    // property should error out
+    AnalysisError("create external table t stored as kudu tblproperties " +
+            "('external.table.purge'='true')",
+        "A primary key is required for a Kudu table.");
   }
 
   @Test
@@ -613,21 +680,23 @@
         "TBLPROPERTIES ('kudu.table_id' = '1234')",
         "Property 'kudu.table_id' cannot be altered for Kudu tables");
 
-    // Setting 'external.table.purge' is not allowed for Kudu tables.
-    AnalysisError("ALTER TABLE functional_kudu.testtbl SET " +
-        "TBLPROPERTIES ('external.table.purge' = 'true')",
-        "Property 'external.table.purge' cannot be altered for Kudu tables");
+    // Setting 'external.table.purge' is allowed for Kudu tables.
+    AnalyzesOk("ALTER TABLE functional_kudu.testtbl SET " +
+        "TBLPROPERTIES ('external.table.purge' = 'true')");
+
+    AnalyzesOk("ALTER TABLE functional_kudu.testtbl SET " +
+        "TBLPROPERTIES ('external.table.purge' = 'false')");
 
     // Rename the underlying Kudu table is not supported for managed Kudu tables.
     AnalysisError("ALTER TABLE functional_kudu.testtbl SET " +
         "TBLPROPERTIES ('kudu.table_name' = 'Hans')",
-        "Not allowed to set 'kudu.table_name' manually for managed Kudu tables");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables");
 
     // TODO IMPALA-6375: Allow setting kudu.table_name for managed Kudu tables
     // if the 'EXTERNAL' property is set to TRUE in the same step.
     AnalysisError("ALTER TABLE functional_kudu.testtbl SET " +
         "TBLPROPERTIES ('EXTERNAL' = 'TRUE','kudu.table_name' = 'Hans')",
-        "Not allowed to set 'kudu.table_name' manually for managed Kudu tables");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables");
 
     // ALTER TABLE RENAME TO
     AnalyzesOk("ALTER TABLE functional_kudu.testtbl RENAME TO new_testtbl");
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index a737f0b..e486111 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -686,17 +686,44 @@
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.EXTERNAL_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
+  }
+
+  /**
+   * In Hive-3 the HMS translation layer converts non-transactional managed
+   * table definitions to external tables. This test makes sure that such tables
+   * are seen as EXTERNAL tables when loaded in catalog
+   * @throws CatalogException
+   */
+  @Test
+  public void testCreateTableMetadataHive3() throws CatalogException {
+    Assume.assumeTrue(TestUtils.getHiveMajorVersion() > 2);
     // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
-    table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
+    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
-    if (TestUtils.getHiveMajorVersion() == 2) {
-      assertEquals(TableType.MANAGED_TABLE.toString(),
-          table.getMetaStoreTable().getTableType());
-    } else {
-      // in Hive-3 due to HMS translation, this table becomes an external table
-      assertEquals(TableType.EXTERNAL_TABLE.toString(),
-          table.getMetaStoreTable().getTableType());
-    }
+    assertEquals(TableType.EXTERNAL_TABLE.toString(),
+        table.getMetaStoreTable().getTableType());
+    // ACID tables should be loaded as MANAGED tables
+    table = catalog_.getOrLoadTable("functional", "insert_only_transactional_table",
+        "test");
+    assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
+    assertEquals(TableType.MANAGED_TABLE.toString(),
+        table.getMetaStoreTable().getTableType());
+  }
+
+  /**
+   * In Hive-2 there is no HMS translation which converts non-transactional managed
+   * table definitions to external tables. This test makes sure that the such tables
+   * are seen as MANAGED tables in catalog
+   * @throws CatalogException
+   */
+  @Test
+  public void testCreateTableMetadataHive2() throws CatalogException {
+    Assume.assumeTrue(TestUtils.getHiveMajorVersion() <= 2);
+    // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
+    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
+    assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
+    assertEquals(TableType.MANAGED_TABLE.toString(),
+        table.getMetaStoreTable().getTableType());
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 34ffe74..2926fa0 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -42,6 +42,7 @@
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
+import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TMetadataOpcode;
@@ -248,18 +249,17 @@
     assertEquals("SELECT * FROM functional.alltypes", v.getQueryStmt().toSql());
   }
 
-  @Ignore("Ignored until IMPALA-9092 is fixed")
   @Test
   public void testKuduTable() throws Exception {
     LocalKuduTable t = (LocalKuduTable) catalog_.getTable("functional_kudu",  "alltypes");
     assertEquals("id,bool_col,tinyint_col,smallint_col,int_col," +
         "bigint_col,float_col,double_col,date_string_col,string_col," +
         "timestamp_col,year,month", Joiner.on(",").join(t.getColumnNames()));
-    // Assert on the generated SQL for the table, but not the table properties, since
-    // those might change based on whether this test runs before or after other
-    // tests which compute stats, etc.
-    Assert.assertThat(ToSqlUtils.getCreateTableSql(t), CoreMatchers.startsWith(
-        "CREATE TABLE functional_kudu.alltypes (\n" +
+    boolean areDefaultSynchronizedTablesExternal = TestUtils.getHiveMajorVersion() > 2;
+    String expectedOutputPrefix = areDefaultSynchronizedTablesExternal ? "CREATE "
+        + "EXTERNAL TABLE" : "CREATE TABLE";
+    String expectedOutput =
+        expectedOutputPrefix + " functional_kudu.alltypes (\n" +
         "  id INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
         "  bool_col BOOLEAN NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
         "  tinyint_col TINYINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
@@ -277,7 +277,28 @@
         ")\n" +
         "PARTITION BY HASH (id) PARTITIONS 3\n" +
         "STORED AS KUDU\n" +
-        "TBLPROPERTIES"));
+        "TBLPROPERTIES";
+
+    if (areDefaultSynchronizedTablesExternal) {
+      // Assert on the generated SQL for the table, but not the table properties, since
+      // those might change based on whether this test runs before or after other
+      // tests which compute stats, etc.
+      String output = ToSqlUtils.getCreateTableSql(t);
+      Assert.assertThat(output, CoreMatchers.startsWith(expectedOutput));
+      // the tblproperties have keys which are not in a deterministic order
+      // we will confirm if the 'external.table.purge'='TRUE' is available in the
+      // tblproperties substring separately
+      Assert.assertTrue("Synchronized Kudu tables in Hive-3 must contain external.table"
+          + ".purge table property", output.contains("'external.table.purge'='TRUE'"));
+      Assert.assertFalse("Found internal property TRANSLATED_TO_EXTERNAL in table "
+          + "properties", output.contains("TRANSLATED_TO_EXTERNAL"));
+    } else {
+    // Assert on the generated SQL for the table, but not the table properties, since
+    // those might change based on whether this test runs before or after other
+    // tests which compute stats, etc.
+      Assert.assertThat(ToSqlUtils.getCreateTableSql(t),
+          CoreMatchers.startsWith(expectedOutput));
+    }
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
index 606ea39..de1e9d8 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
@@ -208,7 +208,7 @@
         fail("Failed to add test table:\n" + createTableSql);
       }
     } else if (dummyTable instanceof KuduTable) {
-      if (!KuduTable.isExternalTable(msTbl)) {
+      if (!Table.isExternalTable(msTbl)) {
         fail("Failed to add table, external kudu table expected:\n" + createTableSql);
       }
       try {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index a0efa71..fae008c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -5,12 +5,19 @@
   id INT
 )
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test1 (
   id INT
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test1 (
+  id INT
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # simple table with all types
@@ -30,7 +37,7 @@
   timestamp_col TIMESTAMP
 )
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test2 (
   year INT,
   month INT,
@@ -48,6 +55,25 @@
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test2 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # all types and partitioned
@@ -73,7 +99,7 @@
 )
 COMMENT 'This is a test'
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test3 (
   year INT,
   month INT,
@@ -97,6 +123,31 @@
 COMMENT 'This is a test'
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test3 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+PARTITIONED BY (
+  x INT,
+  y INT,
+  a BOOLEAN
+)
+COMMENT 'This is a test'
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # With a table comment
@@ -107,7 +158,7 @@
 )
 COMMENT 'This is a test'
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test4 (
   year INT,
   month INT,
@@ -116,6 +167,16 @@
 COMMENT 'This is a test'
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test4 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+COMMENT 'This is a test'
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # With the row format specified
@@ -126,7 +187,7 @@
 )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n'
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test5 (
   year INT,
   month INT,
@@ -137,6 +198,18 @@
                       'serialization.format'=',', 'escape.delim'='\\')
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test5 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n'
+WITH SERDEPROPERTIES ('line.delim'='\n', 'field.delim'=',',
+                      'serialization.format'=',', 'escape.delim'='\\')
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # testing with parquet specified
@@ -146,7 +219,7 @@
   id INT COMMENT 'Add a comment'
 )
 STORED AS PARQUET
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test6 (
   year INT,
   month INT,
@@ -154,6 +227,15 @@
 )
 STORED AS PARQUET
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test6 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+STORED AS PARQUET
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # with extra table properties and sequencefile
@@ -164,7 +246,7 @@
 )
 STORED AS SEQUENCEFILE
 TBLPROPERTIES ('key3'='val3', 'key2'='val2', 'key1'='val1')
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test7 (
   year INT,
   month INT,
@@ -173,6 +255,15 @@
 STORED AS SEQUENCEFILE
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('key3'='val3', 'key2'='val2', 'key1'='val1')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test7 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+STORED AS SEQUENCEFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('key3'='val3', 'key2'='val2', 'key1'='val1', 'external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # testing with rcfile specified
@@ -182,7 +273,7 @@
   id INT COMMENT 'Add a comment'
 )
 STORED AS RCFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test8 (
   year INT,
   month INT,
@@ -190,6 +281,15 @@
 )
 STORED AS RCFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test8 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+STORED AS RCFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # Test create table as select
@@ -209,7 +309,7 @@
   month INT
 )
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test_as_select (
   id INT,
   bool_col BOOLEAN,
@@ -227,11 +327,30 @@
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test_as_select (
+  id INT,
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP,
+  year INT,
+  month INT
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 create table i_1687_p partitioned by (int_col) as
   select bigint_col, int_col from functional.alltypessmall;
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.i_1687_p (
   bigint_col BIGINT
 )
@@ -240,10 +359,20 @@
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.i_1687_p (
+  bigint_col BIGINT
+)
+PARTITIONED BY (
+  int_col INT
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- QUERY
 SHOW CREATE TABLE functional_text_lzo.tinytable
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional_text_lzo.tinytable (
   a STRING,
   b STRING
@@ -255,7 +384,7 @@
 ====
 ---- QUERY
 SHOW CREATE TABLE functional.allcomplextypes
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional.allcomplextypes (
   id INT,
   int_array_col ARRAY<INT>,
@@ -283,14 +412,14 @@
 ====
 ---- QUERY
 SHOW CREATE VIEW functional.alltypes_view
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.alltypes_view AS
 SELECT * FROM functional.alltypes
 ====
 ---- QUERY
 # SHOW CREATE TABLE should also work for views.
 SHOW CREATE TABLE functional.alltypes_view
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.alltypes_view AS
 SELECT * FROM functional.alltypes
 ====
@@ -299,7 +428,7 @@
 CREATE VIEW column_aliases_view (foo, bar, baz) AS
 SELECT tinyint_col, id, bigint_col
 FROM functional.alltypes;
----- RESULTS
+---- RESULTS-HIVE
 # A view with column aliases is expanded into a SELECT with an inline view.
 CREATE VIEW show_create_table_test_db.column_aliases_view AS
 SELECT column_aliases_view.tinyint_col foo, column_aliases_view.id bar, column_aliases_view.bigint_col baz FROM (SELECT tinyint_col, id, bigint_col FROM functional.alltypes) column_aliases_view
@@ -307,14 +436,14 @@
 ---- QUERY
 # Test views referencing views.
 SHOW CREATE VIEW functional.view_view;
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.view_view AS
 SELECT * FROM functional.alltypes_view
 ====
 ---- QUERY
 # Test complex views with multiple tables and Hive-style column aliases.
 SHOW CREATE VIEW functional.complex_view
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.complex_view AS
 SELECT complex_view.`_c0` abc, complex_view.string_col xyz FROM (SELECT count(a.bigint_col), b.string_col FROM functional.alltypesagg a INNER JOIN functional.alltypestiny b ON a.id = b.id WHERE a.bigint_col < 50 GROUP BY b.string_col HAVING count(a.bigint_col) > 1 ORDER BY b.string_col ASC LIMIT 100) complex_view
 ====
@@ -323,14 +452,14 @@
 CREATE VIEW _quote_view (_foo, bar) AS
 SELECT tinyint_col, id _id
 FROM functional.alltypes;
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW show_create_table_test_db.`_quote_view` AS
 SELECT `_quote_view`.tinyint_col `_foo`, `_quote_view`.`_id` bar FROM (SELECT tinyint_col, id `_id` FROM functional.alltypes) `_quote_view`
 ====
 ---- QUERY
 # SHOW CREATE VIEW should also work on tables.
 SHOW CREATE VIEW functional_parquet.tinytable;
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional_parquet.tinytable (
   a STRING,
   b STRING
@@ -343,7 +472,7 @@
 # Create view that contains a subquery (IMPALA-4579)
 CREATE VIEW view_with_subquery AS SELECT * FROM functional.alltypestiny
   WHERE id IN (SELECT id FROM functional.alltypesagg);
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW show_create_table_test_db.view_with_subquery
 AS SELECT * FROM functional.alltypestiny
 WHERE id IN (SELECT id FROM functional.alltypesagg)
@@ -353,7 +482,13 @@
 CREATE TABLE test1 (id INT)
 SORT BY (id)
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test1 (id INT)
+SORT BY LEXICAL (id)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test1 (id INT)
 SORT BY LEXICAL (id)
 STORED AS TEXTFILE
@@ -365,7 +500,14 @@
 PARTITIONED BY (x INT, y INT)
 SORT BY (id)
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test1 (id INT)
+PARTITIONED BY (x INT, y INT)
+SORT BY LEXICAL (id)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test1 (id INT)
 PARTITIONED BY (x INT, y INT)
 SORT BY LEXICAL (id)
@@ -374,7 +516,7 @@
 ====
 ---- QUERY
 SHOW CREATE TABLE functional_hbase.alltypes
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional_hbase.alltypes (
   id INT COMMENT 'Add a comment',
   bigint_col BIGINT,
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 08a8358..2ab4250 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -217,9 +217,6 @@
       reason="Kudu is not tested with Hive 3 notifications yet, see IMPALA-8751.")
   col_stat_separated_by_engine = pytest.mark.skipif(HIVE_MAJOR_VERSION >= 3,
       reason="Hive 3 separates column statistics by engine")
-  kudu_with_hms_translation = pytest.mark.skipif(HIVE_MAJOR_VERSION >= 3,
-      reason="Show create table output is different for HMS translated Kudu tables. "
-             "See IMPALA-9092 for details")
 
 
 class SkipIfHive2:
@@ -227,6 +224,9 @@
       reason="Acid tables are only supported with Hive 3.")
   col_stat_not_separated_by_engine = pytest.mark.skipif(HIVE_MAJOR_VERSION == 2,
       reason="Hive 2 doesnt support separating column statistics by engine")
+  create_external_kudu_table = pytest.mark.skipif(HIVE_MAJOR_VERSION == 2,
+      reason="Hive 2 does not support creating external.table.purge Kudu tables."
+             " See IMPALA-9092 for details.")
 
 
 class SkipIfCatalogV2:
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index e813d1b..dbcc852 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -23,6 +23,7 @@
 from tests.common.skip import SkipIf, SkipIfHive3
 from tests.common.test_dimensions import create_uncompressed_text_dimension
 from tests.util.test_file_parser import QueryTestSectionReader, remove_comments
+from tests.common.environ import HIVE_MAJOR_VERSION
 
 
 # The purpose of the show create table tests are to ensure that the "SHOW CREATE TABLE"
@@ -30,13 +31,15 @@
 # definition. The table is created, then the output of "SHOW CREATE TABLE" is used to
 # test if the table can be recreated. This test class does not support --update-results.
 class TestShowCreateTable(ImpalaTestSuite):
-  VALID_SECTION_NAMES = ["CREATE_TABLE", "CREATE_VIEW", "QUERY", "RESULTS"]
+  VALID_SECTION_NAMES = ["CREATE_TABLE", "CREATE_VIEW", "QUERY", "RESULTS-HIVE",
+                         "RESULTS-HIVE-3"]
   # Properties to filter before comparing results
   FILTER_TBL_PROPERTIES = ["transient_lastDdlTime", "numFiles", "numPartitions",
                            "numRows", "rawDataSize", "totalSize", "COLUMN_STATS_ACCURATE",
                            "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
                            "last_modified_time", "numFilesErasureCoded",
-                           "bucketing_version", "OBJCAPABILITIES"]
+                           "bucketing_version", "OBJCAPABILITIES",
+                           "TRANSLATED_TO_EXTERNAL"]
 
   @classmethod
   def get_workload(self):
@@ -54,7 +57,6 @@
         lambda v: v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none')
 
-  @SkipIfHive3.kudu_with_hms_translation
   def test_show_create_table(self, vector, unique_database):
     self.__run_show_create_table_test_case('QueryTest/show-create-table', vector,
                                            unique_database)
@@ -209,7 +211,13 @@
       assert 0, 'Error in test file %s. Test cases require a '\
           'CREATE_TABLE section.\n%s' %\
           (test_file_name, pprint.pformat(test_section))
-    expected_result = remove_comments(test_section['RESULTS'])
+    results_key = 'RESULTS-HIVE'
+    if HIVE_MAJOR_VERSION > 2:
+      if 'RESULTS-HIVE-3' in test_section:
+        # If the hive version is greater than 2 use the RESULTS-HIVE-3 available
+        results_key = 'RESULTS-HIVE-3'
+
+    expected_result = remove_comments(test_section[results_key])
     self.expected_result = expected_result.replace(
         ShowCreateTableTestCase.RESULTS_DB_NAME_TOKEN, test_db_name)
 
@@ -264,7 +272,6 @@
                              'l_comment')}]
 
   @SkipIf.kudu_not_supported
-  @SkipIfHive3.kudu_with_hms_translation
   @pytest.mark.parametrize('table_primary_keys_map', TABLE_PRIMARY_KEYS_MAPS)
   def test_primary_key_parse(self, impala_testinfra_cursor, table_primary_keys_map):
     """
@@ -276,7 +283,6 @@
         table_primary_keys_map['table']) == table_primary_keys_map['primary_keys']
 
   @SkipIf.kudu_not_supported
-  @SkipIfHive3.kudu_with_hms_translation
   @pytest.mark.parametrize('table_primary_keys_map', TABLE_PRIMARY_KEYS_MAPS)
   def test_load_table_with_primary_key_attr(self, impala_testinfra_cursor,
                                             table_primary_keys_map):
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index aa94ea0..1de184a 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -37,10 +37,10 @@
 from datetime import datetime
 from pytz import utc
 
-from tests.common.environ import ImpalaTestClusterProperties
+from tests.common.environ import ImpalaTestClusterProperties, HIVE_MAJOR_VERSION
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.impala_cluster import ImpalaCluster
-from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive3
+from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive2
 from tests.common.test_dimensions import add_exec_option_dimension
 from tests.verifiers.metric_verifier import MetricVerifier
 
@@ -809,23 +809,42 @@
       if kudu_client.table_exists(table_name):
         kudu_client.delete_table(table_name)
 
+
 class TestShowCreateTable(KuduTestSuite):
   column_properties = "ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION"
 
-  def assert_show_create_equals(self, cursor, create_sql, show_create_sql):
+  def assert_show_create_equals(self, cursor, create_sql, show_create_sql,
+                                do_exact_match=False):
     """Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks
        that the output is the same as 'show_create_sql'. 'create_sql' and
        'show_create_sql' can be templates that can be used with str.format(). format()
-       will be called with 'table' and 'db' as keyword args.
+       will be called with 'table' and 'db' as keyword args. Also, compares HMS-3 specific
+       output due to HMS translation. If do_exact_match is True does not manipulate the
+       output and compares exactly with the show_create_sql parameter.
     """
     format_args = {"table": self.random_table_name(), "db": cursor.conn.db_name}
     cursor.execute(create_sql.format(**format_args))
     cursor.execute("SHOW CREATE TABLE {table}".format(**format_args))
-    assert cursor.fetchall()[0][0] == \
+    output = cursor.fetchall()[0][0]
+    if not do_exact_match and HIVE_MAJOR_VERSION > 2:
+      # in case of HMS-3 all Kudu tables are translated to external tables with some
+      # additional properties. This code below makes sure that we have the expected table
+      # properties and the table is external
+      # TODO we should move these tests to a query.test file so that we can have better
+      # way to compare the output against different hive versions
+      assert output.startswith("CREATE EXTERNAL TABLE")
+      assert "TBLPROPERTIES ('external.table.purge'='TRUE', " in output
+      # We have made sure that the output starts with CREATE EXTERNAL TABLE, now we can
+      # change it to "CREATE TABLE" to make it easier to compare rest of the str
+      output = output.replace("CREATE EXTERNAL TABLE", "CREATE TABLE")
+      # We should also remove the additional tbl property external.table.purge so that we
+      # can compare the rest of output
+      output = output.replace("TBLPROPERTIES ('external.table.purge'='TRUE', ",
+                              "TBLPROPERTIES (")
+    assert output == \
         textwrap.dedent(show_create_sql.format(**format_args)).strip()
 
   @SkipIfKudu.hms_integration_enabled
-  @SkipIfHive3.kudu_with_hms_translation
   def test_primary_key_and_distribution(self, cursor):
     # TODO: Add case with BLOCK_SIZE
     self.assert_show_create_equals(cursor,
@@ -927,7 +946,6 @@
             kudu_addr=KUDU_MASTER_HOSTS))
 
   @SkipIfKudu.hms_integration_enabled
-  @SkipIfHive3.kudu_with_hms_translation
   def test_timestamp_default_value(self, cursor):
     create_sql_fmt = """
         CREATE TABLE {table} (c INT, d TIMESTAMP,
@@ -987,13 +1005,12 @@
           STORED AS KUDU
           TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {kudu_table})""".format(
               db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS,
-              kudu_table=table_name_prop))
+              kudu_table=table_name_prop), True)
     finally:
       if kudu_client.table_exists(kudu_table_name):
         kudu_client.delete_table(kudu_table_name)
 
   @SkipIfKudu.hms_integration_enabled
-  @SkipIfHive3.kudu_with_hms_translation
   def test_managed_kudu_table_name_with_show_create(self, cursor):
     """Check that the generated kudu.table_name tblproperty is not present with
        show create table with managed Kudu tables.
@@ -1013,6 +1030,49 @@
         TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
             db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
 
+  def test_synchronized_kudu_table_with_show_create(self, cursor):
+    # in this case we do exact match with the provided input since this is specifically
+    # creating a synchronized table
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE EXTERNAL TABLE {table} (
+          id BIGINT,
+          name STRING,
+          PRIMARY KEY(id))
+        PARTITION BY HASH PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES('external.table.purge'='true')""",
+        """
+        CREATE EXTERNAL TABLE {db}.{{table}} (
+          id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          PRIMARY KEY (id)
+        )
+        PARTITION BY HASH (id) PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='true', 'kudu.master_addresses'='{kudu_addr}')"""
+          .format(db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS), True)
+
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE EXTERNAL TABLE {table} (
+          id BIGINT PRIMARY KEY,
+          name STRING)
+        PARTITION BY HASH(id) PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES('external.table.purge'='true')""",
+        """
+        CREATE EXTERNAL TABLE {db}.{{table}} (
+          id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          PRIMARY KEY (id)
+        )
+        PARTITION BY HASH (id) PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='true', 'kudu.master_addresses'='{kudu_addr}')"""
+          .format(db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS), True)
+
+
 class TestDropDb(KuduTestSuite):
 
   @SkipIfKudu.hms_integration_enabled
@@ -1185,3 +1245,136 @@
                  for i in ImpalaCluster.get_e2e_test_cluster().impalads]
     for v in verifiers:
       v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30)
+
+
+@SkipIfHive2.create_external_kudu_table
+class TestCreateSynchronizedTable(KuduTestSuite):
+
+  def test_create_synchronized_table(self, cursor, kudu_client, unique_database):
+    """
+    Creates a synchronized Kudu table and makes sure that the statement does not fail.
+    """
+    table_name = self.random_table_name()
+    # create a external kudu table with external.table.purge=true
+    cursor.execute("""
+      CREATE EXTERNAL TABLE %s.%s (
+        id int PRIMARY KEY,
+        name string)
+      PARTITION BY HASH PARTITIONS 8
+      STORED AS KUDU
+      TBLPROPERTIES ('external.table.purge'='true')
+    """ % (unique_database, table_name))
+    # make sure that the table was created
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+    # make sure that the kudu table was created with default name
+    assert kudu_client.table_exists(self.to_kudu_table_name(unique_database, table_name))
+    # make sure that the external.table.purge property can be changed
+    cursor.execute("ALTER TABLE %s.%s set TBLPROPERTIES ("
+                   "'external.table.purge'='FALSE')" % (unique_database, table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+    cursor.execute("ALTER TABLE %s.%s set TBLPROPERTIES ("
+                   "'external.table.purge'='TRUE')" % (unique_database, table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+    # make sure that table can be renamed
+    new_table_name = self.random_table_name()
+    cursor.execute("ALTER TABLE %s.%s rename to %s.%s" %
+                   (unique_database, table_name, unique_database, new_table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (new_table_name,) in cursor.fetchall()
+    # make sure that the kudu table was created with default name
+    assert kudu_client.table_exists(
+      self.to_kudu_table_name(unique_database, new_table_name))
+    # now make sure that table disappears after we remove it
+    cursor.execute("DROP TABLE %s.%s" % (unique_database, new_table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (new_table_name,) not in cursor.fetchall()
+    assert not kudu_client.table_exists(
+      self.to_kudu_table_name(unique_database, new_table_name))
+
+  def test_invalid_sync_table_stmts(self, cursor, kudu_client, unique_database):
+    """
+    Test makes sure that a invalid way to create a synchronized table is erroring out
+    """
+    table_name = self.random_table_name()
+    try:
+      cursor.execute("""
+        CREATE EXTERNAL TABLE %s.%s (
+          a int PRIMARY KEY)
+        PARTITION BY HASH PARTITIONS 8
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='false')
+      """ % (unique_database, table_name))
+      assert False,\
+        "Create table statement with external.table.purge=False should error out"
+    except Exception as e:
+      # We throw this exception since the analyzer checks for properties one by one.
+      # This is the first property that it checks for an external table
+      assert "Table property kudu.table_name must be specified when " \
+             "creating an external Kudu table" in str(e)
+
+    try:
+      # missing external.table.purge in TBLPROPERTIES
+      cursor.execute("""
+        CREATE EXTERNAL TABLE %s.%s (
+          a int PRIMARY KEY)
+        PARTITION BY HASH PARTITIONS 8
+        STORED AS KUDU
+        TBLPROPERTIES ('FOO'='BAR')
+        """ % (unique_database, table_name))
+      assert False, \
+        "Create external table statement must include external.table.purge property"
+    except Exception as e:
+      # We throw this exception since the analyzer checks for properties one by one.
+      # This is the first property that it checks for an external table
+      assert "Table property kudu.table_name must be specified when " \
+             "creating an external Kudu table" in str(e)
+
+    try:
+      # Trying to create a managed table with external.purge.table property in it
+      cursor.execute("""
+        CREATE TABLE %s.%s (
+          a int PRIMARY KEY)
+        PARTITION BY HASH PARTITIONS 8
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='true')
+              """ % (unique_database, table_name))
+      assert False, \
+        "Managed table creation with external.table.purge property must be disallowed"
+    except Exception as e:
+      assert "Table property 'external.table.purge' cannot be set to true " \
+             "with an managed Kudu table." in str(e)
+
+    # TODO should we block this?
+    cursor.execute("""
+      CREATE TABLE %s.%s (
+        a int PRIMARY KEY)
+      PARTITION BY HASH PARTITIONS 8
+      STORED AS KUDU
+      TBLPROPERTIES ('external.table.purge'='False')""" % (unique_database, table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+
+  def test_sync_tbl_with_kudu_table(self, cursor, kudu_client, unique_database):
+    """
+    Test tries to create a synchronized table with an existing Kudu table name and
+    makes sure it fails.
+    """
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      table_name = self.random_table_name()
+      try:
+        cursor.execute("""
+            CREATE EXTERNAL TABLE %s.%s (
+              a int PRIMARY KEY)
+            PARTITION BY HASH PARTITIONS 8
+            STORED AS KUDU
+            TBLPROPERTIES('external.table.purge'='true', 'kudu.table_name' = '%s')"""
+                       % (unique_database, table_name,
+                          self.get_kudu_table_base_name(kudu_table.name)))
+        assert False, "External tables with external.purge.table property must fail " \
+          "if the kudu table already exists"
+      except Exception as e:
+        assert "Not allowed to set 'kudu.table_name' manually for" \
+               " synchronized Kudu tables" in str(e)