[hms] Avoid blocking table alterations that are not relevant to Kudu

This patch adjust the KuduMetastorePlugin to prevent blocking table
alterations that are not relevant to Kudu metadata which needs to be
synchronized. Additionally it prevents calling the Kudu master to check
if HMS sync is enabled when the metadata changes are not relevant to
Kudu’s HMS synchronization.

Change-Id: I2517d891ab7168164700bb3ae642c49bde54b9db
Reviewed-on: http://gerrit.cloudera.org:8080/17531
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <awong@cloudera.com>
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/java/kudu-hive/src/main/java/org/apache/kudu/hive/metastore/KuduMetastorePlugin.java b/java/kudu-hive/src/main/java/org/apache/kudu/hive/metastore/KuduMetastorePlugin.java
index 878a362..a0f8469 100644
--- a/java/kudu-hive/src/main/java/org/apache/kudu/hive/metastore/KuduMetastorePlugin.java
+++ b/java/kudu-hive/src/main/java/org/apache/kudu/hive/metastore/KuduMetastorePlugin.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,6 +29,7 @@
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -80,13 +82,15 @@
   @VisibleForTesting
   static final String LEGACY_KUDU_STORAGE_HANDLER = "com.cloudera.kudu.hive.KuduStorageHandler";
   @VisibleForTesting
+  static final String KUDU_CLUSTER_ID_KEY = "kudu.cluster_id";
+  @VisibleForTesting
   static final String KUDU_TABLE_ID_KEY = "kudu.table_id";
   @VisibleForTesting
-  static final String KUDU_TABLE_NAME = "kudu.table_name";
+  static final String KUDU_TABLE_NAME_KEY = "kudu.table_name";
   @VisibleForTesting
   static final String KUDU_MASTER_ADDRS_KEY = "kudu.master_addresses";
   @VisibleForTesting
-  static final String KUDU_MASTER_EVENT = "kudu.master_event";
+  static final String KUDU_MASTER_EVENT_KEY = "kudu.master_event";
   @VisibleForTesting
   static final String KUDU_CHECK_ID_KEY = "kudu.check_id";
   // The key should keep in sync with the one used in
@@ -96,6 +100,8 @@
 
   static final String EXTERNAL_PURGE_KEY = "external.table.purge";
 
+  static final String COMMENT_KEY = "comment";
+
   // System env to track if the HMS plugin validation should be skipped.
   static final String SKIP_VALIDATION_ENV = "KUDU_SKIP_HMS_PLUGIN_VALIDATION";
 
@@ -226,6 +232,14 @@
       return;
     }
 
+    // Check if the alter changes any of the Kudu metadata.
+    // If not, we can skip checking for synchronization given Kudu doesn't care about the changes.
+    // We primarily expect this case to occur when a table is migrated from a managed table
+    // to an external table with the purge property. This change is effectively a no-op to Kudu.
+    if (kuduMetadataUnchanged(oldTable, newTable)) {
+      return;
+    }
+
     // Only validate tables for clusters with HMS sync enabled.
     if (!kuduSyncEnabled(tableEvent, oldTable)) {
       return;
@@ -398,6 +412,77 @@
           "non-Kudu table entry must not contain a table ID property (%s)",
           KUDU_TABLE_ID_KEY));
     }
+    if (table.getParameters().containsKey(KUDU_CLUSTER_ID_KEY)) {
+      throw new MetaException(String.format(
+          "non-Kudu table entry must not contain a cluster ID property (%s)",
+          KUDU_CLUSTER_ID_KEY));
+    }
+  }
+
+  /**
+   * Checks that the metadata relevant to Kudu is unchanged between the before and after table.
+   * See HmsCatalog::PopulateTable in hms_catalog.cc for a reference to the relevant metadata.
+   *
+   * @param before the table to be altered
+   * @param after the new altered table
+   * @return true if no Kudu relevant metadata has changed
+   */
+  @VisibleForTesting
+  static boolean kuduMetadataUnchanged(Table before, Table after) {
+    // If any of the Kudu table properties have changed, return false.
+    Map<String, String> beforeParams = before.getParameters();
+    Map<String, String> afterParams = after.getParameters();
+    if (!Objects.equals(beforeParams.get(hive_metastoreConstants.META_TABLE_STORAGE),
+            afterParams.get(hive_metastoreConstants.META_TABLE_STORAGE)) ||
+        !Objects.equals(beforeParams.get(KUDU_MASTER_ADDRS_KEY),
+            afterParams.get(KUDU_MASTER_ADDRS_KEY)) ||
+        !Objects.equals(beforeParams.get(KUDU_TABLE_ID_KEY),
+            afterParams.get(KUDU_TABLE_ID_KEY)) ||
+        !Objects.equals(beforeParams.get(KUDU_TABLE_NAME_KEY),
+            afterParams.get(KUDU_TABLE_NAME_KEY)) ||
+        !Objects.equals(beforeParams.get(KUDU_CLUSTER_ID_KEY),
+            afterParams.get(KUDU_CLUSTER_ID_KEY))) {
+      return false;
+    }
+
+    // If the table synchronization has changed, return false.
+    // Kudu doesn't care if the table is managed vs external with the purge property set
+    // to true, it just cares that he table is synchronized.
+    if (isSynchronizedTable(before) != isSynchronizedTable(after)) {
+      return false;
+    }
+
+    // If the table database, name, owner, or comment have changed, return false.
+    if (!Objects.equals(before.getDbName(), after.getDbName()) ||
+        !Objects.equals(before.getTableName(), after.getTableName()) ||
+        !Objects.equals(before.getOwner(), after.getOwner()) ||
+        !Objects.equals(beforeParams.get(COMMENT_KEY),
+            afterParams.get(COMMENT_KEY))) {
+      return false;
+    }
+
+    // If the column count has changed, return false.
+    List<FieldSchema> beforeCols = before.getSd().getCols();
+    List<FieldSchema> afterCols = after.getSd().getCols();
+    if (beforeCols.size() != afterCols.size()) {
+      return false;
+    }
+
+    // If any of the columns have changed (name, type, or comment), return false.
+    // We don't have the Kudu internal column ID, so we assume the column index
+    // in both tables aligns if there are no changes.
+    for (int i = 0; i < beforeCols.size(); i++) {
+      FieldSchema beforeCol = beforeCols.get(i);
+      FieldSchema afterCol = afterCols.get(i);
+      if (!Objects.equals(beforeCol.getName(), afterCol.getName()) ||
+          !Objects.equals(beforeCol.getType(), afterCol.getType()) ||
+          !Objects.equals(beforeCol.getComment(), afterCol.getComment())) {
+        return false;
+      }
+    }
+
+    // Kudu doesn't have metadata related to all other changes.
+    return true;
   }
 
   /**
@@ -428,11 +513,11 @@
       return false;
     }
 
-    if (!properties.containsKey(KUDU_MASTER_EVENT)) {
+    if (!properties.containsKey(KUDU_MASTER_EVENT_KEY)) {
       return false;
     }
 
-    return Boolean.parseBoolean(properties.get(KUDU_MASTER_EVENT));
+    return Boolean.parseBoolean(properties.get(KUDU_MASTER_EVENT_KEY));
   }
 
   /**
diff --git a/java/kudu-hive/src/test/java/org/apache/kudu/hive/metastore/TestKuduMetastorePlugin.java b/java/kudu-hive/src/test/java/org/apache/kudu/hive/metastore/TestKuduMetastorePlugin.java
index f35b025..350badf 100644
--- a/java/kudu-hive/src/test/java/org/apache/kudu/hive/metastore/TestKuduMetastorePlugin.java
+++ b/java/kudu-hive/src/test/java/org/apache/kudu/hive/metastore/TestKuduMetastorePlugin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.hive.metastore;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -55,7 +56,8 @@
   private MiniKuduCluster miniCluster;
 
   private EnvironmentContext masterContext() {
-    return new EnvironmentContext(ImmutableMap.of(KuduMetastorePlugin.KUDU_MASTER_EVENT, "true"));
+    return new EnvironmentContext(
+        ImmutableMap.of(KuduMetastorePlugin.KUDU_MASTER_EVENT_KEY, "true"));
   }
 
   public void startCluster(boolean syncEnabled) throws Exception {
@@ -408,8 +410,15 @@
 
       // Check that adding a column succeeds with the master event property set.
       client.alter_table_with_environmentContext(
-          table.getDbName(), table.getTableName(), table,
-          new EnvironmentContext(ImmutableMap.of(KuduMetastorePlugin.KUDU_MASTER_EVENT, "true")));
+          table.getDbName(), table.getTableName(), table, new EnvironmentContext(
+              ImmutableMap.of(KuduMetastorePlugin.KUDU_MASTER_EVENT_KEY, "true")));
+
+      // Check that altering a table property unrelated to Kudu succeeds.
+      {
+        Table alteredTable = table.deepCopy();
+        alteredTable.putToParameters("some.random.property", "foo");
+        client.alter_table(table.getDbName(), table.getTableName(), alteredTable);
+      }
 
       // Check that altering table with Kudu storage handler to legacy format
       // succeeds.
@@ -421,7 +430,7 @@
         alteredTable.putToParameters(KuduMetastorePlugin.EXTERNAL_PURGE_KEY, "TRUE");
         alteredTable.putToParameters(hive_metastoreConstants.META_TABLE_STORAGE,
             KuduMetastorePlugin.LEGACY_KUDU_STORAGE_HANDLER);
-        alteredTable.putToParameters(KuduMetastorePlugin.KUDU_TABLE_NAME,
+        alteredTable.putToParameters(KuduMetastorePlugin.KUDU_TABLE_NAME_KEY,
             "legacy_table");
         alteredTable.putToParameters(KuduMetastorePlugin.KUDU_MASTER_ADDRS_KEY,
             miniCluster.getMasterAddressesAsString());
@@ -633,4 +642,86 @@
     // A Kudu table should should be allowed to be dropped via Hive.
     client.dropTable(table.getDbName(), table.getTableName());
   }
+
+  @Test
+  public void testKuduMetadataUnchanged() throws Exception {
+    startCluster(/* syncEnabled */ true);
+
+    Table before = newTable("table");
+
+    // Changing from external purge to managed is true (and vice versa).
+    {
+      Table after = before.deepCopy();
+      after.setTableType(TableType.MANAGED_TABLE.name());
+      after.putToParameters(KuduMetastorePlugin.EXTERNAL_TABLE_KEY, "FALSE");
+      after.putToParameters(KuduMetastorePlugin.EXTERNAL_PURGE_KEY, "FALSE");
+      assertTrue(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+      assertTrue(KuduMetastorePlugin.kuduMetadataUnchanged(after, before));
+    }
+
+    // Changing from external purge to just external is false (and vice versa).
+    {
+      Table after = before.deepCopy();
+      after.putToParameters(KuduMetastorePlugin.EXTERNAL_PURGE_KEY, "FALSE");
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(after, before));
+    }
+
+    // Changing an unrelated property is true.
+    {
+      Table after = before.deepCopy();
+      after.putToParameters("some.random.property", "foo");
+      assertTrue(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+
+    // Changing location is true.
+    {
+      Table after = before.deepCopy();
+      after.getSd().setLocation("path/to/foo");
+      assertTrue(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+
+    // Changing the master addresses is false.
+    {
+      Table after = before.deepCopy();
+      after.putToParameters(KuduMetastorePlugin.KUDU_MASTER_ADDRS_KEY, "somehost");
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+
+    // Changing the table name is false.
+    {
+      Table after = before.deepCopy();
+      after.setTableName("different");
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+
+    // Changing the table owner is false.
+    {
+      Table after = before.deepCopy();
+      after.setOwner("different");
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+
+    // Changing the table comment is false.
+    {
+      Table after = before.deepCopy();
+      after.putToParameters(KuduMetastorePlugin.COMMENT_KEY, "new comment");
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+
+    // Adding a column or removing a column is false.
+    {
+      Table after = before.deepCopy();
+      after.getSd().addToCols(new FieldSchema("b", "int", ""));
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(after, before));
+    }
+
+    // Changing a column comment is false.
+    {
+      Table after = before.deepCopy();
+      after.getSd().getCols().get(0).setComment("new comment");
+      assertFalse(KuduMetastorePlugin.kuduMetadataUnchanged(before, after));
+    }
+  }
 }