SENTRY-2296: Add PermissionsUpdate for adding owner privilege on owner transfer. (Kalyan Kumar Kalvagadda reviewed by Lina li)
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
index 07221af..0ac19a7 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
@@ -1459,20 +1459,7 @@
     Preconditions.checkState(sentryPlugins.size() <= 1);
     Set<TSentryPrivilege> privSet = Collections.singleton(ownerPrivilege);
     Map<TSentryPrivilege, Update> privilegesUpdateMap = new HashMap<>();
-    switch (request.getOwnerType()) {
-      case ROLE:
-        for (SentryPolicyStorePlugin plugin : sentryPlugins) {
-          plugin.onAlterSentryRoleGrantPrivilege(request.getOwnerName(), privSet, privilegesUpdateMap);
-        }
-        break;
-      case USER:
-        for (SentryPolicyStorePlugin plugin : sentryPlugins) {
-          plugin.onAlterSentryUserGrantPrivilege(request.getOwnerName(), privSet, privilegesUpdateMap);
-        }
-        break;
-      default:
-        LOGGER.error("Invalid owner Type");
-    }
+    getOwnerPrivilegeUpdateForGrant(request.getOwnerName(), request.getOwnerType(), privSet, privilegesUpdateMap);
 
     // Grants owner privilege to the principal
     try {
@@ -1529,19 +1516,21 @@
     // There should only one owner privilege for an authorizable but the current schema
     // doesn't have constraints to limit it. It is possible to have multiple owners for an authorizable (which is unlikely)
     // This logic makes sure of revoking all the owner privilege.
-    for (SentryOwnerInfo ownerInfo : ownerInfoList) {
-      if (ownerInfo.getOwnerType() == SentryPrincipalType.USER) {
-        for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+    for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+      for (SentryOwnerInfo ownerInfo : ownerInfoList) {
+        if (ownerInfo.getOwnerType().equals(SentryPrincipalType.USER)) {
           plugin.onAlterSentryUserRevokePrivilege(ownerInfo.getOwnerName(), privSet, privilegesUpdateMap);
           updateList.add(privilegesUpdateMap.get(ownerPrivilege));
-        }
-      } else if (ownerInfo.getOwnerType() == SentryPrincipalType.ROLE) {
-        for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+          privilegesUpdateMap.clear();
+        } else if (ownerInfo.getOwnerType().equals(SentryPrincipalType.ROLE)) {
           plugin.onAlterSentryRoleRevokePrivilege(request.getOwnerName(), privSet, privilegesUpdateMap);
           updateList.add(privilegesUpdateMap.get(ownerPrivilege));
+          privilegesUpdateMap.clear();
         }
       }
     }
+    getOwnerPrivilegeUpdateForGrant(request.getOwnerName(), request.getOwnerType(), privSet, privilegesUpdateMap);
+    updateList.add(privilegesUpdateMap.get(ownerPrivilege));
 
     // Revokes old owner privileges and grants owner privilege for new owner.
     try {
@@ -1563,6 +1552,33 @@
   }
 
   /**
+   * Adds privilege update for grant into the privilegesUpdateMap provided.
+   * @param ownerName
+   * @param ownerType
+   * @param privSet
+   * @param privilegesUpdateMap
+   * @throws Exception
+   */
+  private void getOwnerPrivilegeUpdateForGrant(String ownerName, TSentryPrincipalType ownerType,
+      Set<TSentryPrivilege> privSet,
+      Map<TSentryPrivilege, Update> privilegesUpdateMap) throws Exception {
+    for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+      switch (ownerType) {
+        case ROLE:
+          plugin.onAlterSentryRoleGrantPrivilege(ownerName, privSet, privilegesUpdateMap);
+          break;
+        case USER:
+          plugin.onAlterSentryUserGrantPrivilege(ownerName, privSet, privilegesUpdateMap);
+          break;
+        default:
+          String error = "Invalid owner type : " + ownerType;
+          LOGGER.error(error);
+          throw new SentryInvalidInputException(error);
+      }
+    }
+  }
+
+  /**
    * This API constructs (@Link TSentryPrivilege} for authorizable provided
    * based on the configurations.
    *