SENTRY-2315: The grant all operation is not dropping the create/alter/drop/index/lock privileges (Sergio Pena, reviewed by Na Li)
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index db8c08b..1eda41b 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -813,25 +813,11 @@
     if ((!isNULL(privilege.getColumnName()) || !isNULL(privilege.getTableName())
         || !isNULL(privilege.getDbName()))
         && !AccessConstants.OWNER.equalsIgnoreCase(privilege.getAction())) {
-      // If Grant is for ALL and Either INSERT/SELECT already exists..
+      // If Grant is for ALL and individual privileges already exists (i.e. insert/select/create..)
       // need to remove it and GRANT ALL..
       if (AccessConstants.ALL.equalsIgnoreCase(privilege.getAction())
           || AccessConstants.ACTION_ALL.equalsIgnoreCase(privilege.getAction())) {
-        TSentryPrivilege tNotAll = new TSentryPrivilege(privilege);
-        tNotAll.setAction(AccessConstants.SELECT);
-        MSentryPrivilege mSelect =
-            findMatchPrivilege(mEntity.getPrivileges(), convertToMSentryPrivilege(tNotAll));
-        tNotAll.setAction(AccessConstants.INSERT);
-        MSentryPrivilege mInsert =
-            findMatchPrivilege(mEntity.getPrivileges(), convertToMSentryPrivilege(tNotAll));
-        if (mSelect != null) {
-          mSelect.removePrincipal(mEntity);
-          persistPrivilege(pm, mSelect);
-        }
-        if (mInsert != null) {
-          mInsert.removePrincipal(mEntity);
-          persistPrivilege(pm, mInsert);
-        }
+        dropPrivilegesForGrantAll(pm, mEntity, privilege);
       } else {
         // If Grant is for Either INSERT/SELECT and ALL already exists..
         // do nothing..
@@ -861,6 +847,39 @@
   }
 
   /**
+   * Drop all individual privileges from the privilege entity that form the grant all operation.
+   *
+   * @param pm The PersistenceManager to persist the changes.
+   * @param principal The Sentry principal from where to drop the privileges.
+   * @param privilege The Sentry privilege that has the authorizable object from where to drop the privileges.
+   * @throws SentryInvalidInputException If an error occurs when dropping the privileges.
+   */
+  private void dropPrivilegesForGrantAll(PersistenceManager pm, PrivilegePrincipal principal,
+    TSentryPrivilege privilege) throws SentryInvalidInputException {
+
+    // Re-use this object to search for the specific privilege
+    TSentryPrivilege tNotAll = new TSentryPrivilege(privilege);
+
+    for (String action : ALL_ACTIONS) {
+      // These privileges do not form part of the grant all operation.
+      // For instance, a role/user may have the OWNER and ALL privileges together.
+      if (action.equalsIgnoreCase(AccessConstants.OWNER)) {
+        continue;
+      }
+
+      // Set the action to search in the set of privileges of the entity
+      tNotAll.setAction(action);
+
+      MSentryPrivilege mAction =
+        findMatchPrivilege(principal.getPrivileges(), convertToMSentryPrivilege(tNotAll));
+      if (mAction != null) {
+        mAction.removePrincipal(principal);
+        persistPrivilege(pm, mAction);
+      }
+    }
+  }
+
+  /**
    * Alter a given sentry user to grant a set of privileges.
    * Internally calls alterSentryGrantPrivilege.
    *
diff --git a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 6b732ea..d8c0ab4 100644
--- a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.hadoop.security.alias.UserProvider;
 import org.apache.sentry.SentryOwnerInfo;
+import org.apache.sentry.api.common.ApiConstants.PrivilegeScope;
 import org.apache.sentry.api.service.thrift.TSentryPrivilegeMap;
 import org.apache.sentry.core.common.exception.SentryAccessDeniedException;
 import org.apache.sentry.core.common.exception.SentryInvalidInputException;
@@ -922,6 +923,81 @@
 
   }
 
+  @Test
+  public void testDropIndividualPrivilegesWhenGrantAllIsGranted() throws Exception {
+    final String GRANTOR = "g1";
+    final String ROLE_NAME = "r1";
+    final String SERVER_NAME = "server1";
+    final String DB_NAME = "db1";
+    final String TABLE_NAME = "table1";
+
+    TSentryPrivilege allPrivilege = toTSentryPrivilege(AccessConstants.ACTION_ALL,
+      PrivilegeScope.SERVER.toString(), SERVER_NAME, DB_NAME, TABLE_NAME, TSentryGrantOption.FALSE);
+
+    TSentryPrivilege allWithGrant = toTSentryPrivilege(AccessConstants.ACTION_ALL,
+      PrivilegeScope.SERVER.toString(), SERVER_NAME, DB_NAME, TABLE_NAME, TSentryGrantOption.TRUE);
+
+    TSentryPrivilege ownerPrivilege = toTSentryPrivilege(AccessConstants.OWNER,
+      PrivilegeScope.DATABASE.toString(), SERVER_NAME, DB_NAME, TABLE_NAME);
+
+    Set<TSentryPrivilege> grantPrivileges = Sets.newHashSet(
+      toTSentryPrivilege(AccessConstants.SELECT, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+      toTSentryPrivilege(AccessConstants.INSERT, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+      toTSentryPrivilege(AccessConstants.CREATE, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+      toTSentryPrivilege(AccessConstants.ALTER, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+      toTSentryPrivilege(AccessConstants.DROP, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+      toTSentryPrivilege(AccessConstants.INDEX, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+      toTSentryPrivilege(AccessConstants.LOCK, PrivilegeScope.SERVER.toString(), SERVER_NAME,
+        DB_NAME, TABLE_NAME),
+
+      // This special privilege will not be removed when granting all privileges
+      ownerPrivilege
+    );
+
+    // Grant individual privileges to a role
+    createRole(ROLE_NAME);
+    sentryStore.alterSentryRoleGrantPrivileges(GRANTOR, ROLE_NAME, grantPrivileges);
+
+    // Check those individual privileges are granted
+    Set<TSentryPrivilege> rolePrivileges = sentryStore.getAllTSentryPrivilegesByRoleName(ROLE_NAME);
+    assertEquals(grantPrivileges.size(), rolePrivileges.size());
+    for (TSentryPrivilege privilege : grantPrivileges) {
+      assertTrue(String.format("Privilege %s was not granted.", privilege.getAction()),
+        rolePrivileges.contains(privilege));
+    }
+
+    // Grant the ALL privilege (this should remove all individual privileges, and grant only ALL)
+    sentryStore.alterSentryRoleGrantPrivileges(GRANTOR, ROLE_NAME, Sets.newHashSet(allPrivilege));
+
+    // Check the ALL and OWNER privileges are the only privileges
+    rolePrivileges = sentryStore.getAllTSentryPrivilegesByRoleName(ROLE_NAME);
+    assertEquals(2, rolePrivileges.size()); // ALL and OWNER privileges should be there
+    assertTrue("Privilege ALL was not granted.", rolePrivileges.contains(allPrivilege));
+    assertTrue("Privilege OWNER was dropped.", rolePrivileges.contains(ownerPrivilege));
+
+    // Check the ALL WITH GRANT privilege just replaces the ALL and keeps the OWNER privilege
+    sentryStore.alterSentryRoleGrantPrivileges(GRANTOR, ROLE_NAME, Sets.newHashSet(allWithGrant));
+    rolePrivileges = sentryStore.getAllTSentryPrivilegesByRoleName(ROLE_NAME);
+    assertTrue("Privilege ALL WITH GRANT was not granted.", rolePrivileges.contains(allWithGrant));
+    assertTrue("Privilege OWNER was dropped.", rolePrivileges.contains(ownerPrivilege));
+
+    // Check the ALL privilege just replaces the ALL WITH GRANT and keeps the OWNER privilege
+    sentryStore.alterSentryRoleGrantPrivileges(GRANTOR, ROLE_NAME, Sets.newHashSet(allPrivilege));
+    rolePrivileges = sentryStore.getAllTSentryPrivilegesByRoleName(ROLE_NAME);
+    assertTrue("Privilege ALL was not granted.", rolePrivileges.contains(allPrivilege));
+    assertTrue("Privilege OWNER was dropped.", rolePrivileges.contains(ownerPrivilege));
+
+
+    // Clean-up test
+    sentryStore.dropSentryRole(ROLE_NAME);
+  }
+
   //TODO Use new transaction Manager logic, Instead of
 
   @Test
@@ -4464,4 +4540,11 @@
 
     return privilege;
   }
+
+  private TSentryPrivilege toTSentryPrivilege(String action, String scope, String server,
+    String dbName, String tableName, TSentryGrantOption grantOption) {
+    TSentryPrivilege privilege = toTSentryPrivilege(action, scope, server, dbName, tableName);
+    privilege.setGrantOption(grantOption);
+    return privilege;
+  }
 }