SENTRY-2313: alter database set owner command can be executed only by user with proper privilege (Na Li, reviewed by Kalyan Kumar Kalvagadda)
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java
index 09bd9b5..0ab636a 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java
@@ -278,6 +278,10 @@
       }
 
       break;
+
+    case HiveParser.TOK_ALTERDATABASE_OWNER:
+      currDB = currOutDB = new Database(ast.getChild(0).getText());
+      break;
     default:
         currDB = getCanonicalDb();
         break;
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzPrivilegesMap.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzPrivilegesMap.java
index d976512..78742fd 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzPrivilegesMap.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzPrivilegesMap.java
@@ -64,6 +64,12 @@
         setOperationScope(HiveOperationScope.DATABASE).
         setOperationType(HiveOperationType.DDL).
         build();
+    HiveAuthzPrivileges alterDbSetOwnerPrivilege = new HiveAuthzPrivileges.AuthzPrivilegeBuilder().
+        addOutputObjectPriviledge(AuthorizableType.Db, EnumSet.of(DBModelAction.ALL)).
+        setOperationScope(HiveOperationScope.DATABASE).
+        setOperationType(HiveOperationType.DDL).
+        setGrantOption(true).
+        build();
 
     HiveAuthzPrivileges alterTablePrivilege = new HiveAuthzPrivileges.AuthzPrivilegeBuilder().
         addOutputObjectPriviledge(AuthorizableType.Table, EnumSet.of(DBModelAction.ALTER)).
@@ -236,7 +242,7 @@
     hiveAuthzStmtPrivMap.put(HiveOperation.DROPDATABASE, dropDbPrivilege);
     hiveAuthzStmtPrivMap.put(HiveOperation.CREATETABLE, tableCreatePrivilege);
     hiveAuthzStmtPrivMap.put(HiveOperation.ALTERDATABASE, alterDbPrivilege);
-    hiveAuthzStmtPrivMap.put(HiveOperation.ALTERDATABASE_OWNER, alterDbPrivilege);
+    hiveAuthzStmtPrivMap.put(HiveOperation.ALTERDATABASE_OWNER, alterDbSetOwnerPrivilege);
     hiveAuthzStmtPrivMap.put(HiveOperation.CREATEMACRO, macroCreatePrivilege);
     hiveAuthzStmtPrivMap.put(HiveOperation.DROPMACRO, dropMacroPrivilege);
 
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryHmsEvent.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryHmsEvent.java
index 0dd8bf1..e5d4437 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryHmsEvent.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryHmsEvent.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+// import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; TODO: enable one HIVE-18031 is available
 import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
@@ -112,6 +113,27 @@
   }
 
   /**
+   * Construct SentryHmsEvent from AlterDatabaseEvent
+   *
+   * event, transaction, owner and authorizable info is initialized from event.
+   * @param inServerName name of the server associated with the event
+   * @param event AlterDatabaseEvent
+   */
+  /* TODO: Enable once HIVE-18031 is available
+  public SentryHmsEvent(String inServerName, AlterDatabaseEvent event) {
+    this(event, EventType.ALTER_DATABASE);
+
+    if (!StringUtils.equals(event.getOldDatabase().getOwnerName(), event.getNewDatabase().getOwnerName())) {
+      // Owner Changed. We don't set owner info for other cases of alter database.
+      // In this way, sentry server only updates owner privilege when object is created, dropped or
+      // owner is updated
+      setOwnerInfo(event.getNewDatabase());
+    }
+    setAuthorizable(inServerName, event.getNewDatabase());
+  }
+  */
+
+  /**
    * Construct SentryHmsEvent from CreateDatabaseEvent
    *
    * event, transaction, owner and authorizable info is initialized from event.
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
index 080eda8..c37f370 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+// import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; TODO: Enable once HIVE-18031 is available
 import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
@@ -211,6 +212,24 @@
   }
 
   /**
+   * Notify sentry server when database is altered
+   *
+   * @param dbEvent Alter database event
+   * @throws MetaException
+   */
+  /* TODO: Enable once HIVE-18031 is available
+  @Override
+  public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
+    // Failure event, Need not be notified.
+    if (failedEvent(dbEvent, EventType.ALTER_DATABASE)) {
+      return;
+    }
+    SentryHmsEvent event = new SentryHmsEvent(serverName, dbEvent);
+    notifyHmsEvent(event);
+  }
+  */
+
+  /**
    * Notifies sentry server about the HMS Event and related metadata.
    *
    * @param event Sentry HMS event.
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
index 8a4588c..07221af 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
@@ -1327,14 +1327,14 @@
              1. Owner Update
              2. Table Rename
           */
+        // case ALTER_DATABASE: TODO: Enable once HIVE-18031 is available
           // Wait till Sentry server processes HMS Notification Event.
           if(request.getId() > 0) {
             response.setId(syncEventId(request.getId()));
           } else {
             response.setId(0L);
           }
-          // Owner is updated. There is no need to wait till Sentry processes HMS Notification Event.
-          // Revoke owner privilege from old owners and grant one to the new owner.
+          // When owner is updated, revoke owner privilege from old owners and grant one to the new owner.
           updateOwnerPrivilege(request);
           break;
         default:
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivileges.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivileges.java
index 70bc3d8..c085a0c 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivileges.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivileges.java
@@ -48,10 +48,9 @@
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(TestHDFSIntegrationBase.class);
-  private final static String tableName1 = "tb_1";
+  protected final static String tableName1 = "tb_1";
 
-  protected static final String ALL_DB1 = "server=server1->db=db_1",
-      ADMIN1 = StaticUserGroup.ADMIN1,
+  protected static final String ADMIN1 = StaticUserGroup.ADMIN1,
       ADMINGROUP = StaticUserGroup.ADMINGROUP,
       USER1_1 = StaticUserGroup.USER1_1,
       USER1_2 = StaticUserGroup.USER1_2,
@@ -61,8 +60,8 @@
       DB1 = "db_1";
 
   private final static String renameTag = "_new";
-  private Connection connection;
-  private Statement statement;
+  protected Connection connection;
+  protected Statement statement;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -106,7 +105,7 @@
     statementUSER1_1.execute("CREATE DATABASE " + DB1);
 
     // verify privileges created for new database
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, "", 1);
 
     // verify that user has all privilege on this database, i.e., "OWNER" means "ALL"
@@ -152,7 +151,7 @@
     // verify user user1_2 has no privileges created for new database
     Connection connectionUSER1_2 = hiveServer2.createConnection(USER1_2, USER1_2);
     Statement statementUSER1_2 = connectionUSER1_2.createStatement();
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_2, SentryPrincipalType.USER, Lists.newArrayList(USER1_2),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_2, SentryPrincipalType.USER, Lists.newArrayList(USER1_2),
         DB1, "", 0);
 
     // verify that user user1_2 does not have any privilege on this database except create
@@ -193,7 +192,7 @@
     statement.execute("CREATE DATABASE " + DB1);
 
     // verify no privileges created for new database
-    verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.USER, Lists.newArrayList(admin),
+    verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER, Lists.newArrayList(admin),
         DB1, "", 1);
 
     statement.close();
@@ -226,7 +225,7 @@
     statementUSER1_1.execute("DROP DATABASE " + DB1 + " CASCADE");
 
     // verify owner privileges created for new database no longer exists
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, "", 0);
 
     statement.close();
@@ -236,6 +235,107 @@
     connectionUSER1_1.close();
   }
 
+  /**
+   * Verify that the user who can call alter database set owner on this table
+   *
+   * @throws Exception
+   */
+  @Ignore("Enable the test once HIVE-18031 is in the hiver version integrated with Sentry")
+  @Test
+  public void testAuthorizeAlterDatabaseSetOwner() throws Exception {
+    String ownerRole = "owner_role";
+    String allWithGrantRole = "allWithGrant_role";
+    dbNames = new String[]{DB1};
+    roles = new String[]{"admin_role", "create_on_server", ownerRole};
+
+    // create required roles, and assign them to USERGROUP1
+    setupUserRoles(roles, statement);
+
+    // create test DB
+    statement.execute("DROP DATABASE IF EXISTS " + DB1 + " CASCADE");
+
+    // setup privileges for USER1
+    statement.execute("GRANT CREATE ON SERVER " + SERVER_NAME + " TO ROLE create_on_server");
+
+    // USER1_1 create database
+    Connection connectionUSER1_1 = hiveServer2.createConnection(USER1_1, USER1_1);
+    Statement statementUSER1_1 = connectionUSER1_1.createStatement();
+    statementUSER1_1.execute("CREATE DATABASE " + DB1);
+
+    if (!ownerPrivilegeGrantEnabled) {
+      try {
+        statementUSER1_1.execute("ALTER DATABASE " + DB1 + " SET OWNER ROLE " + ownerRole);
+        Assert.fail("Expect altering database set owner to fail for owner without grant option");
+      } catch(Exception ex){
+        // owner without grant option cannot issue this command
+      }
+    }
+
+
+    // admin issues alter database set owner
+    try {
+      statement.execute("ALTER DATABASE " + DB1 + " SET OWNER ROLE " + ownerRole);
+      Assert.fail("Expect altering database set owner to fail for admin");
+    } catch (Exception ex) {
+      // admin does not have all with grant option, so cannot issue this command
+    }
+
+    Connection connectionUSER2_1 = hiveServer2.createConnection(USER2_1, USER2_1);
+    Statement statementUSER2_1 = connectionUSER2_1.createStatement();
+
+    try {
+      // create role that has all with grant on the table
+      statement.execute("create role " + allWithGrantRole);
+      statement.execute("grant role " + allWithGrantRole + " to group " + USERGROUP2);
+      statement.execute("GRANT ALL ON DATABASE " + DB1 + " to role " +
+          allWithGrantRole + " with grant option");
+
+      // cannot issue command on a different database
+      try {
+        statementUSER2_1.execute("ALTER DATABASE NON_EXIST_DB" + " SET OWNER ROLE " + ownerRole);
+        Assert.fail("Expect altering database set owner to fail on db that USER2_1 has no all with grant");
+      } catch (Exception ex) {
+        // USER2_1 does not have all with grant option on NON_EXIST_DB, so cannot issue this command
+      }
+
+      // user2_1 having all with grant on this DB and can issue command: alter database set owner
+      // alter database set owner to a role
+      statementUSER2_1
+          .execute("ALTER DATABASE " + DB1 + " SET OWNER ROLE " + ownerRole);
+
+      // verify privileges is transferred to role owner_role, which is associated with USERGROUP1,
+      // therefore to USER1_1
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.ROLE,
+          Lists.newArrayList(ownerRole),
+          DB1, "", 1);
+
+      // alter database set owner to user USER1_1 and verify privileges is transferred to USER USER1_1
+      statementUSER2_1
+          .execute("ALTER DATABASE " + DB1 + " SET OWNER USER " + USER1_1);
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER,
+          Lists.newArrayList(USER1_1), DB1, "", 1);
+
+      // alter database set owner to user USER2_1, who already has explicit all with grant
+      statementUSER2_1
+          .execute("ALTER DATABASE " + DB1 + " SET OWNER USER " + USER2_1);
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER,
+          Lists.newArrayList(USER2_1),
+          DB1, "", 1);
+
+    } finally {
+      statement.execute("drop role " + allWithGrantRole);
+
+      statement.close();
+      connection.close();
+
+      statementUSER1_1.close();
+      connectionUSER1_1.close();
+
+      statementUSER2_1.close();
+      connectionUSER2_1.close();
+    }
+  }
+
 
   /**
    * Verify that the user who creases table has owner privilege on this table
@@ -266,7 +366,7 @@
 
 
     // verify privileges created for new table
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, tableName1, 1);
 
     // verify that user has all privilege on this table, i.e., "OWNER" means "ALL"
@@ -318,7 +418,7 @@
     // verify user1_2 does not have privileges on table created by user1_1
     Connection connectionUSER1_2 = hiveServer2.createConnection(USER1_2, USER1_2);
     Statement statementUSER1_2 = connectionUSER1_2.createStatement();
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_2, SentryPrincipalType.USER, Lists.newArrayList(USER1_2),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_2, SentryPrincipalType.USER, Lists.newArrayList(USER1_2),
         DB1, tableName1, 0);
 
     // verify that user user1_2 does not have any privilege on this table
@@ -375,7 +475,7 @@
         + " (under_col int comment 'the under column')");
 
     // verify no owner privileges created for new table
-    verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.USER, Lists.newArrayList(admin),
+    verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER, Lists.newArrayList(admin),
         DB1, tableName1, 1);
 
     statement.close();
@@ -410,7 +510,7 @@
     statementUSER1_1.execute("DROP TABLE " + DB1 + "." + tableName1);
 
     // verify privileges created for new table
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, tableName1, 0);
 
     statement.close();
@@ -447,7 +547,7 @@
 
 
     // verify privileges created for new table
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, tableName1, 1);
 
     // verify that user has all privilege on this table, i.e., "OWNER" means "ALL"
@@ -463,11 +563,11 @@
     Thread.sleep(WAIT_BEFORE_TESTVERIFY);
 
     // Verify that old owner does not have owner privilege
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, tableName1, 0);
     // Verify that new owner has owner privilege
 
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.ROLE, Lists.newArrayList("owner_role"),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.ROLE, Lists.newArrayList("owner_role"),
         DB1, tableName1, 1);
 
 
@@ -476,11 +576,11 @@
         USER1_1);
 
     // Verify that old owner does not have owner privilege
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.ROLE, Lists.newArrayList("owner_role"),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.ROLE, Lists.newArrayList("owner_role"),
         DB1, tableName1, 0);
 
     // Verify that new owner has owner privilege
-    verifyTableOwnerPrivilegeExistForEntity(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statementUSER1_1, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, tableName1, 1);
 
     statement.close();
@@ -531,6 +631,17 @@
       }
     }
 
+    // owner issues alter table set owner
+    if (!ownerPrivilegeGrantEnabled) {
+      try {
+        statementUSER1_1
+            .execute("ALTER TABLE " + DB1 + "." + tableName1 + " SET OWNER ROLE " + ownerRole);
+        Assert.fail("Expect altering table set owner to fail for owner without grant option");
+      } catch (Exception ex) {
+        // owner without grant option cannot issue this command
+      }
+    }
+
     // admin issues alter table set owner
     try {
       statement.execute("ALTER TABLE " + DB1 + "." + tableName1 + " SET OWNER ROLE " + ownerRole);
@@ -564,20 +675,20 @@
 
       // verify privileges is transferred to role owner_role, which is associated with USERGROUP1,
       // therefore to USER1_1
-      verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.ROLE,
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.ROLE,
           Lists.newArrayList(ownerRole),
           DB1, tableName1, 1);
 
       // alter table set owner to user USER1_1 and verify privileges is transferred to USER USER1_1
       statementUSER2_1
           .execute("ALTER TABLE " + DB1 + "." + tableName1 + " SET OWNER USER " + USER1_1);
-      verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.USER,
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER,
           Lists.newArrayList(USER1_1), DB1, tableName1, 1);
 
       // alter table set owner to user USER2_1, who already has explicit all with grant
       statementUSER2_1
           .execute("ALTER TABLE " + DB1 + "." + tableName1 + " SET OWNER USER " + USER2_1);
-      verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.USER,
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER,
           Lists.newArrayList(USER2_1),
           DB1, tableName1, 1);
 
@@ -623,7 +734,7 @@
         + " (under_col int comment 'the under column')");
 
     // verify owner privileges created for new table
-    verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
+    verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER, Lists.newArrayList(USER1_1),
         DB1, tableName1, 1);
 
     // Changing the owner to an admin user
@@ -632,7 +743,7 @@
 
     // verify no owner privileges to the new owner as the owner is admin user
 
-    verifyTableOwnerPrivilegeExistForEntity(statement, SentryPrincipalType.USER, Lists.newArrayList(admin),
+    verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER, Lists.newArrayList(admin),
         DB1, tableName1, 1);
 
     statement.close();
@@ -640,7 +751,7 @@
   }
 
   // Create test roles
-  private void setupUserRoles(String[] roles, Statement statement) throws Exception {
+  protected void setupUserRoles(String[] roles, Statement statement) throws Exception {
     Set<String> userRoles = Sets.newHashSet(roles);
     userRoles.remove("admin_role");
 
@@ -652,16 +763,16 @@
 
   // verify given table is part of every user in the list
   // verify that each entity in the list has owner privilege on the given database or table
-  protected void verifyTableOwnerPrivilegeExistForEntity(Statement statement, SentryPrincipalType entityType,
-      List<String> entities, String dbName, String tableName, int expectedResultCount) throws Exception {
+  protected void verifyTableOwnerPrivilegeExistForPrincipal(Statement statement, SentryPrincipalType principalType,
+      List<String> principals, String dbName, String tableName, int expectedResultCount) throws Exception {
 
-    for (String entity : entities) {
+    for (String principal : principals) {
       String command;
 
       if (Strings.isNullOrEmpty(tableName)) {
-        command = "SHOW GRANT " + entityType.toString() + " " + entity + " ON DATABASE " + dbName;
+        command = "SHOW GRANT " + principalType.toString() + " " + principal + " ON DATABASE " + dbName;
       } else {
-        command = "SHOW GRANT " + entityType.toString() + " " + entity + " ON TABLE " + dbName + "." + tableName;
+        command = "SHOW GRANT " + principalType.toString() + " " + principal + " ON TABLE " + dbName + "." + tableName;
       }
 
       ResultSet resultSet = statement.executeQuery(command);
@@ -679,14 +790,15 @@
 
         if (!StringUtils.equalsIgnoreCase(tableName, resultSet.getString(2))) {
           // it is possible the entity has owner privilege on both DB and table
-          // only check the owner privilege on table
+          // only check the owner privilege on intended table. If tableName is "",
+          // resultSet.getString(2) should be "" as well
           continue;
         }
 
         assertThat(resultSet.getString(3), equalToIgnoringCase(""));//partition
         assertThat(resultSet.getString(4), equalToIgnoringCase(""));//column
-        assertThat(resultSet.getString(5), equalToIgnoringCase(entity));//principalName
-        assertThat(resultSet.getString(6), equalToIgnoringCase(entityType.toString()));//principalType
+        assertThat(resultSet.getString(5), equalToIgnoringCase(principal));//principalName
+        assertThat(resultSet.getString(6), equalToIgnoringCase(principalType.toString()));//principalType
         assertThat(resultSet.getBoolean(8), is(ownerPrivilegeGrantEnabled));//grantOption
         resultSize ++;
       }
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivilegesWithGrantOption.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivilegesWithGrantOption.java
index 4de7475..b3d98cb 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivilegesWithGrantOption.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestOwnerPrivilegesWithGrantOption.java
@@ -17,7 +17,13 @@
  */
 package org.apache.sentry.tests.e2e.dbprovider;
 
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.Statement;
+import org.apache.sentry.service.common.ServiceConstants.SentryPrincipalType;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
 
 public class TestOwnerPrivilegesWithGrantOption extends TestOwnerPrivileges {
   @BeforeClass
@@ -27,4 +33,68 @@
     TestOwnerPrivileges.setup();
   }
 
+  /**
+   * Verify that the owner with grant option can call alter table set owner on this table
+   *
+   * @throws Exception
+   */
+  @Ignore("Enable the test once HIVE-18762 is in the hiver version integrated with Sentry")
+  @Test
+  public void testAuthorizeAlterTableSetOwnerByOwner() throws Exception {
+    String ownerRole = "owner_role";
+    dbNames = new String[]{DB1};
+    roles = new String[]{"admin_role", "create_db1", ownerRole};
+
+    // create required roles, and assign them to USERGROUP1
+    setupUserRoles(roles, statement);
+
+    // create test DB
+    statement.execute("DROP DATABASE IF EXISTS " + DB1 + " CASCADE");
+    statement.execute("CREATE DATABASE " + DB1);
+
+    // setup privileges for USER1
+    statement.execute("GRANT CREATE ON DATABASE " + DB1 + " TO ROLE create_db1");
+    statement.execute("USE " + DB1);
+
+    // USER1_1 create table
+    Connection connectionUSER1_1 = hiveServer2.createConnection(USER1_1, USER1_1);
+    Statement statementUSER1_1 = connectionUSER1_1.createStatement();
+    statementUSER1_1.execute("CREATE TABLE " + DB1 + "." + tableName1
+        + " (under_col int comment 'the under column')");
+
+    Connection connectionUSER2_1 = hiveServer2.createConnection(USER2_1, USER2_1);
+    Statement statementUSER2_1 = connectionUSER2_1.createStatement();
+
+    try {
+      // user1_1 is owner of the table having all with grant on this table and can issue
+      // command: alter table set owner for user
+      statementUSER1_1
+          .execute("ALTER TABLE " + DB1 + "." + tableName1 + " SET OWNER USER " + USER2_1);
+
+      // verify privileges is transferred to USER2_1
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.USER,
+          Lists.newArrayList(USER2_1),
+          DB1, tableName1, 1);
+
+      // alter table set owner for role
+      statementUSER2_1
+          .execute("ALTER TABLE " + DB1 + "." + tableName1 + " SET OWNER ROLE " + ownerRole);
+
+      // verify privileges is transferred to ownerRole
+      verifyTableOwnerPrivilegeExistForPrincipal(statement, SentryPrincipalType.ROLE,
+          Lists.newArrayList(ownerRole),
+          DB1, tableName1, 1);
+
+    } finally {
+      statement.close();
+      connection.close();
+
+      statementUSER1_1.close();
+      connectionUSER1_1.close();
+
+      statementUSER2_1.close();
+      connectionUSER2_1.close();
+    }
+  }
+
 }
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
index f5e4db8..becdc52 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
@@ -196,6 +196,7 @@
   protected String tmpHDFSDirStr;
   protected String tmpHDFSPartitionStr;
   protected Path partitionDir;
+  protected static final String SERVER_NAME = "server1";
   protected String[] dbNames;
   protected String[] roles;
   protected String admin;
@@ -583,7 +584,7 @@
         hiveConf.set("sentry.hive.provider", LocalGroupResourceAuthorizationProvider.class.getName());
         hiveConf.set("sentry.hive.provider.resource", policyFileLocation.getPath());
         hiveConf.set("sentry.hive.testing.mode", "true");
-        hiveConf.set("sentry.hive.server", "server1");
+        hiveConf.set("sentry.hive.server", SERVER_NAME);
 
         hiveConf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
         hiveConf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
@@ -863,6 +864,9 @@
           sentryProperties.put(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS, "10000");
           sentryProperties.put(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, String.valueOf(HMSFOLLOWER_INTERVAL_MILLS));
           sentryProperties.put(ServerConfig.RPC_MIN_THREADS, "3");
+          sentryProperties.put("sentry.hive.sync.drop", "true");
+          sentryProperties.put("sentry.hive.sync.create", "true");
+
           if(hiveSyncOnCreate) {
             sentryProperties.put("sentry.hive.sync.create", "true");
           } else {
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestOperationsPart1.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestOperationsPart1.java
index f3edae2..42971d8 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestOperationsPart1.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestOperationsPart1.java
@@ -348,9 +348,11 @@
     statement = context.createStatement(connection);
     statement.execute("ALTER DATABASE " + DB1 + " SET DBPROPERTIES ('comment'='comment')");
 
+    // Negative case for admin
     connection = context.createConnection(ADMIN1);
     statement = context.createStatement(connection);
-    statement.execute("ALTER DATABASE " + DB1 + " SET OWNER USER " + USER1_1);
+    context.assertSentrySemanticException(statement, "ALTER DATABASE " + DB1 + " SET OWNER USER " + USER1_1, semanticException);
+
     statement.close();
     connection.close();