ATLAS-4158 : Atlas authorization for Add/Update/Remove classification on entities.
diff --git a/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizer.java b/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizer.java
index 5636438..7b99e97 100644
--- a/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizer.java
+++ b/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizer.java
@@ -42,6 +42,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.atlas.authorize.AtlasPrivilege.ENTITY_ADD_CLASSIFICATION;
+import static org.apache.atlas.authorize.AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION;
+import static org.apache.atlas.authorize.AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION;
 import static org.apache.atlas.authorize.AtlasPrivilege.TYPE_CREATE;
 import static org.apache.atlas.authorize.AtlasPrivilege.TYPE_DELETE;
 import static org.apache.atlas.authorize.AtlasPrivilege.TYPE_READ;
@@ -53,6 +56,12 @@
 
     private final static String WILDCARD_ASTERISK = "*";
 
+    private final static Set<AtlasPrivilege> CLASSIFICATION_PRIVILEGES = new HashSet<AtlasPrivilege>() {{
+                                                                                add(ENTITY_ADD_CLASSIFICATION);
+                                                                                add(ENTITY_REMOVE_CLASSIFICATION);
+                                                                                add(ENTITY_UPDATE_CLASSIFICATION);
+                                                                            }};
+
     private AtlasSimpleAuthzPolicy authzPolicy;
 
 
@@ -233,43 +242,38 @@
             LOG.debug("==> SimpleAtlasAuthorizer.isAccessAllowed({})", request);
         }
 
+        boolean           ret            = false;
         final String      action         = request.getAction() != null ? request.getAction().getType() : null;
         final Set<String> entityTypes    = request.getEntityTypeAndAllSuperTypes();
         final String      entityId       = request.getEntityId();
-        final String      classification = request.getClassification() != null ? request.getClassification().getTypeName() : null;
         final String      attribute      = request.getAttributeName();
         final Set<String> entClsToAuthz  = new HashSet<>(request.getEntityClassifications());
         final Set<String> roles          = getRoles(request.getUser(), request.getUserGroups());
-        boolean hasEntityAccess          = false;
-        boolean hasClassificationsAccess = false;
 
         for (String role : roles) {
             List<AtlasEntityPermission> permissions = getEntityPermissionsForRole(role);
 
             if (permissions != null) {
                 for (AtlasEntityPermission permission : permissions) {
-                    // match entity-type/entity-id/label/business-metadata/attribute
-                    if (isMatchAny(entityTypes, permission.getEntityTypes()) && isMatch(entityId, permission.getEntityIds()) && isMatch(attribute, permission.getAttributes())
-                         && isLabelMatch(request, permission) && isBusinessMetadataMatch(request, permission)) {
-                        // match permission/classification
-                        if (!hasEntityAccess) {
-                            if (isMatch(action, permission.getPrivileges()) && isMatch(classification, permission.getClassifications())) {
-                                hasEntityAccess = true;
-                            }
-                        }
+                    if (isMatch(action, permission.getPrivileges()) && isMatchAny(entityTypes, permission.getEntityTypes()) &&
+                        isMatch(entityId, permission.getEntityIds()) && isMatch(attribute, permission.getAttributes()) &&
+                        isLabelMatch(request, permission) && isBusinessMetadataMatch(request, permission) && isClassificationMatch(request, permission)) {
 
-                        // match entity-classifications
-                        for (Iterator<String> iter = entClsToAuthz.iterator(); iter.hasNext();) {
+                        // 1. entity could have multiple classifications
+                        // 2. access for these classifications could be granted by multiple AtlasEntityPermission entries
+                        // 3. classifications allowed by the current permission will be removed from entClsToAuthz
+                        // 4. request will be allowed once entClsToAuthz is empty i.e. user has permission for all classifications
+                        for (Iterator<String> iter = entClsToAuthz.iterator(); iter.hasNext(); ) {
                             String entityClassification = iter.next();
 
-                            if (isMatchAny(request.getClassificationTypeAndAllSuperTypes(entityClassification), permission.getClassifications())) {
+                            if (isMatchAny(request.getClassificationTypeAndAllSuperTypes(entityClassification), permission.getEntityClassifications())) {
                                 iter.remove();
                             }
                         }
 
-                        hasClassificationsAccess = CollectionUtils.isEmpty(entClsToAuthz);
+                        ret = CollectionUtils.isEmpty(entClsToAuthz);
 
-                        if (hasEntityAccess && hasClassificationsAccess) {
+                        if (ret) {
                             break;
                         }
                     }
@@ -277,11 +281,9 @@
             }
         }
 
-        boolean ret = hasEntityAccess && hasClassificationsAccess;
-
         if (LOG.isDebugEnabled()) {
             if (!ret) {
-                LOG.debug("hasEntityAccess={}; hasClassificationsAccess={}, classificationsWithNoAccess={}", hasEntityAccess, hasClassificationsAccess, entClsToAuthz);
+                LOG.debug("isAccessAllowed={}; classificationsWithNoAccess={}", ret, entClsToAuthz);
             }
 
             LOG.debug("<== SimpleAtlasAuthorizer.isAccessAllowed({}): {}", request, ret);
@@ -490,6 +492,10 @@
         return AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA.equals(request.getAction()) ? isMatch(request.getBusinessMetadata(), permission.getBusinessMetadata()) : true;
     }
 
+    private boolean isClassificationMatch(AtlasEntityAccessRequest request, AtlasEntityPermission permission) {
+        return (CLASSIFICATION_PRIVILEGES.contains(request.getAction()) && request.getClassification() != null) ? isMatch(request.getClassification().getTypeName(), permission.getClassifications()) : true;
+    }
+
     private void filterTypes(AtlasAccessRequest request, List<? extends AtlasBaseTypeDef> typeDefs)throws AtlasAuthorizationException {
         if (typeDefs != null) {
             for (ListIterator<? extends AtlasBaseTypeDef> iter = typeDefs.listIterator(); iter.hasNext();) {
diff --git a/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthzPolicy.java b/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthzPolicy.java
index d191128..ee13233 100644
--- a/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthzPolicy.java
+++ b/authorization/src/main/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthzPolicy.java
@@ -205,29 +205,31 @@
     public static class AtlasEntityPermission implements Serializable {
         private static final long serialVersionUID = 1L;
 
-        private List<String> privileges;       // name of AtlasPrivilege enum, wildcards supported
-        private List<String> entityTypes;      // name of entity-type, wildcards supported
-        private List<String> entityIds;        // value of entity-unique attribute, wildcards supported
-        private List<String> classifications;  // name of classification-type, wildcards supported
-        private List<String> labels;           // labels, wildcards supported
-        private List<String> businessMetadata; // name of business-metadata, wildcards supported
-        private List<String> attributes;       // name of entity-attribute, wildcards supported
+        private List<String> privileges;            // name of AtlasPrivilege enum, wildcards supported
+        private List<String> entityTypes;           // name of entity-type, wildcards supported
+        private List<String> entityIds;             // value of entity-unique attribute, wildcards supported
+        private List<String> entityClassifications; // name of entity classification-type, wildcards supported
+        private List<String> labels;                // labels, wildcards supported
+        private List<String> businessMetadata;      // name of business-metadata, wildcards supported
+        private List<String> attributes;            // name of entity-attribute, wildcards supported
+        private List<String> classifications;       // name of classification-type, wildcards supported
 
         public AtlasEntityPermission() {
         }
 
         public AtlasEntityPermission(List<String> privileges, List<String> entityTypes, List<String> entityIds, List<String> classifications, List<String> attributes) {
-            this(privileges, entityTypes, entityIds, classifications, attributes, null, null);
+            this(privileges, entityTypes, entityIds, classifications, attributes, null, null, null);
         }
 
-        public AtlasEntityPermission(List<String> privileges, List<String> entityTypes, List<String> entityIds, List<String> classifications, List<String> labels, List<String> businessMetadata, List<String> attributes) {
-            this.privileges       = privileges;
-            this.entityTypes      = entityTypes;
-            this.entityIds        = entityIds;
-            this.classifications  = classifications;
-            this.labels           = labels;
-            this.businessMetadata = businessMetadata;
-            this.attributes       = attributes;
+        public AtlasEntityPermission(List<String> privileges, List<String> entityTypes, List<String> entityIds, List<String> entityClassifications, List<String> labels, List<String> businessMetadata, List<String> attributes, List<String> classifications) {
+            this.privileges            = privileges;
+            this.entityTypes           = entityTypes;
+            this.entityIds             = entityIds;
+            this.entityClassifications = entityClassifications;
+            this.labels                = labels;
+            this.businessMetadata      = businessMetadata;
+            this.attributes            = attributes;
+            this.classifications       = classifications;
         }
 
         public List<String> getPrivileges() {
@@ -254,12 +256,12 @@
             this.entityIds = entityIds;
         }
 
-        public List<String> getClassifications() {
-            return classifications;
+        public List<String> getEntityClassifications() {
+            return entityClassifications;
         }
 
-        public void setClassifications(List<String> classifications) {
-            this.classifications = classifications;
+        public void setEntityClassifications(List<String> entityClassifications) {
+            this.entityClassifications = entityClassifications;
         }
 
         public List<String> getLabels() {
@@ -285,6 +287,14 @@
         public void setAttributes(List<String> attributes) {
             this.attributes = attributes;
         }
+
+        public List<String> getClassifications() {
+            return classifications;
+        }
+
+        public void setClassifications(List<String> classifications) {
+            this.classifications = classifications;
+        }
     }
 
 
diff --git a/authorization/src/main/resources/atlas-simple-authz-policy.json b/authorization/src/main/resources/atlas-simple-authz-policy.json
index 6b20012..bd9d5ed 100644
--- a/authorization/src/main/resources/atlas-simple-authz-policy.json
+++ b/authorization/src/main/resources/atlas-simple-authz-policy.json
@@ -15,12 +15,14 @@
       ],
       "entityPermissions": [
         {
-          "privileges":      [ ".*" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ],
-          "labels":          [ ".*" ],
-          "namespaces":      [ ".*" ]
+          "privileges":            [ ".*" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ ".*" ]
         }
       ],
       "relationshipPermissions": [
@@ -40,12 +42,14 @@
     "DATA_SCIENTIST": {
       "entityPermissions": [
         {
-          "privileges":      [ "entity-read", "entity-read-classification" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ],
-          "labels":          [ ".*" ],
-          "businessMetadata":      [ ".*" ]
+          "privileges":            [ "entity-read", "entity-read-classification" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ ".*" ]
         }
       ]
     },
@@ -53,12 +57,14 @@
     "DATA_STEWARD": {
       "entityPermissions": [
         {
-          "privileges":      [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ],
-          "labels":          [ ".*" ],
-          "businessMetadata":      [ ".*" ]
+          "privileges":            [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ ".*" ]
         }
       ],
       "relationshipPermissions": [
diff --git a/authorization/src/test/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizerTest.java b/authorization/src/test/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizerTest.java
index 7083b82..8d38ebe 100644
--- a/authorization/src/test/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizerTest.java
+++ b/authorization/src/test/java/org/apache/atlas/authorize/simple/AtlasSimpleAuthorizerTest.java
@@ -17,6 +17,8 @@
 package org.apache.atlas.authorize.simple;
 
 import org.apache.atlas.authorize.*;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -24,14 +26,48 @@
 import org.testng.annotations.Test;
 import org.testng.AssertJUnit;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class AtlasSimpleAuthorizerTest {
     private static Logger LOG = LoggerFactory.getLogger(AtlasSimpleAuthorizerTest.class);
 
-    private static final String USER_DATA_SCIENTIST = "dataScientist1";
-    private static final String USER_DATA_STEWARD   = "dataSteward1";
+    private static final String USER_ADMIN            = "admin";
+    private static final String USER_DATA_SCIENTIST   = "dataScientist";
+    private static final String USER_DATA_STEWARD     = "dataSteward";
+    private static final String USER_DATA_STEWARD_EX  = "dataStewardEx";
+    private static final String USER_FINANCE          = "finance";
+    private static final String USER_FINANCE_PII      = "financePII";
+    private static final String USER_IN_ADMIN_GROUP   = "admin-group-user";
+    private static final String USER_IN_UNKNOWN_GROUP = "unknown-group-user";
 
+    private static final Map<String, Set<String>> USER_GROUPS = new HashMap<String, Set<String>>() {{
+                                                                        put(USER_ADMIN, Collections.singleton("ROLE_ADMIN"));
+                                                                        put(USER_DATA_STEWARD, Collections.emptySet());
+                                                                        put(USER_DATA_SCIENTIST, Collections.emptySet());
+                                                                        put(USER_DATA_STEWARD_EX, Collections.singleton("DATA_STEWARD_EX"));
+                                                                        put(USER_FINANCE, Collections.singleton("FINANCE"));
+                                                                        put(USER_FINANCE_PII, Collections.singleton("FINANCE_PII"));
+                                                                        put(USER_IN_ADMIN_GROUP, Collections.singleton("ROLE_ADMIN"));
+                                                                        put(USER_IN_UNKNOWN_GROUP, Collections.singleton("UNKNOWN_GROUP"));
+                                                                    }};
+
+    private static final List<AtlasPrivilege> ENTITY_PRIVILEGES = Arrays.asList(AtlasPrivilege.ENTITY_CREATE,
+                                                                                AtlasPrivilege.ENTITY_UPDATE,
+                                                                                AtlasPrivilege.ENTITY_READ,
+                                                                                AtlasPrivilege.ENTITY_ADD_CLASSIFICATION,
+                                                                                AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION,
+                                                                                AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION,
+                                                                                AtlasPrivilege.ENTITY_READ_CLASSIFICATION,
+                                                                                AtlasPrivilege.ENTITY_ADD_LABEL,
+                                                                                AtlasPrivilege.ENTITY_REMOVE_LABEL,
+                                                                                AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA);
+
+    private static final List<AtlasPrivilege> LABEL_PRIVILEGES = Arrays.asList(AtlasPrivilege.ENTITY_ADD_LABEL, AtlasPrivilege.ENTITY_REMOVE_LABEL);
 
     private String          originalConf;
     private AtlasAuthorizer authorizer;
@@ -59,15 +95,17 @@
     }
 
     @Test(enabled = true)
-    public void testAccessAllowedForUserAndGroup() {
+    public void testAllAllowedForAdminUser() {
         try {
-            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_UPDATE);
+            for (AtlasPrivilege privilege : AtlasPrivilege.values()) {
+                AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, privilege);
 
-            request.setUser("admin", Collections.singleton("ROLE_ADMIN"));
+                setUser(request, USER_ADMIN);
 
-            boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+                boolean isAccessAllowed = authorizer.isAccessAllowed(request);
 
-            AssertJUnit.assertEquals(true, isAccessAllowed);
+                AssertJUnit.assertEquals(privilege.name() + " should have been allowed for user " + USER_DATA_SCIENTIST, true, isAccessAllowed);
+            }
         } catch (Exception e) {
             LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
 
@@ -76,15 +114,162 @@
     }
 
     @Test(enabled = true)
-    public void testAccessAllowedForGroup() {
+    public void testAddPIIForStewardExUser() {
         try {
-            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_UPDATE);
+            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null , AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, null,  new AtlasClassification("PII"));
 
-            request.setUser("nonmappeduser", Collections.singleton("ROLE_ADMIN"));
+            setUser(request, USER_DATA_STEWARD_EX);
 
             boolean isAccessAllowed = authorizer.isAccessAllowed(request);
 
-            AssertJUnit.assertEquals(true, isAccessAllowed);
+            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD_EX + " should have been allowed to add PII", true, isAccessAllowed);
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testAddClassificationOnEntityWithClassificationForStewardExUser() {
+        try {
+
+            AtlasEntityHeader entityHeader = new AtlasEntityHeader();
+            entityHeader.setClassifications(Arrays.asList(new AtlasClassification("PII_1"), new AtlasClassification("PII_2")));
+
+            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, new AtlasClassification("PII"));
+
+            setUser(request, USER_DATA_STEWARD_EX);
+
+            boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD_EX + " should have been allowed to add PII", true, isAccessAllowed);
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testAddClassificationOnEntityWithClassificationForStewardExUserShouldFail() {
+        try {
+
+            AtlasEntityHeader entityHeader = new AtlasEntityHeader();
+            entityHeader.setClassifications(Arrays.asList(new AtlasClassification("TAG1"), new AtlasClassification("TAG2")));
+
+            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, new AtlasClassification("PII"));
+
+            setUser(request, USER_DATA_STEWARD_EX);
+
+            boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD_EX + " should have not been allowed to add PII on entity with TAG1,TAG2 classification ", false, isAccessAllowed);
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testAddPIIForStewardUser() {
+        try {
+            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null , AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, null,  new AtlasClassification("PII"));
+
+            setUser(request, USER_DATA_STEWARD);
+
+            boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD + " should not have been allowed to add PII", false, isAccessAllowed);
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testFinancePIIEntityAccessForFinancePIIUser() {
+        try {
+            AtlasEntityHeader entity = new AtlasEntityHeader() {{
+                                                setClassifications(Arrays.asList(new AtlasClassification("FINANCE"), new AtlasClassification("PII")));
+                                            }};
+
+            for (AtlasPrivilege privilege : ENTITY_PRIVILEGES) {
+                AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, privilege, entity, new AtlasClassification("PII"));
+
+                setUser(request, USER_FINANCE_PII);
+
+                boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+                AssertJUnit.assertEquals("user " + USER_FINANCE_PII + " should have been allowed " + privilege + " on entity with FINANCE & PII", true, isAccessAllowed);
+            }
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testFinancePIIEntityAccessForFinanceUser() {
+        try {
+            AtlasEntityHeader entity = new AtlasEntityHeader() {{
+                                                setClassifications(Arrays.asList(new AtlasClassification("FINANCE"), new AtlasClassification("PII")));
+                                            }};
+
+            for (AtlasPrivilege privilege : ENTITY_PRIVILEGES) {
+                AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, privilege, entity, new AtlasClassification("PII"));
+
+                setUser(request, USER_FINANCE);
+
+                boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+                AssertJUnit.assertEquals("user " + USER_FINANCE + " should not have been allowed " + privilege + " on entity with FINANCE & PII", false, isAccessAllowed);
+            }
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testFinanceEntityAccess() {
+        try {
+            AtlasEntityHeader entity = new AtlasEntityHeader() {{
+                setClassifications(Arrays.asList(new AtlasClassification("FINANCE")));
+            }};
+
+            for (String userName : Arrays.asList(USER_FINANCE_PII, USER_FINANCE)) {
+                for (AtlasPrivilege privilege : ENTITY_PRIVILEGES) {
+                    AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, privilege, entity, new AtlasClassification("FINANCE"));
+
+                    setUser(request, userName);
+
+                    boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+                    AssertJUnit.assertEquals("user " + userName + " should have been allowed " + privilege + " on entity with FINANCE", true, isAccessAllowed);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
+
+            AssertJUnit.fail();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testAccessForUserInAdminGroup() {
+        try {
+            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_UPDATE);
+
+            setUser(request, USER_IN_ADMIN_GROUP);
+
+            boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+            AssertJUnit.assertEquals("user " + USER_IN_ADMIN_GROUP + " should have been allowed " + AtlasPrivilege.ENTITY_UPDATE, true, isAccessAllowed);
         } catch (AtlasAuthorizationException e) {
             LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
 
@@ -93,15 +278,15 @@
     }
 
     @Test(enabled = true)
-    public void testAccessNotAllowedForUserAndGroup() {
+    public void testAccessForUserInUnknownGroup() {
         try {
             AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_UPDATE);
 
-            request.setUser("nonmappeduser", Collections.singleton("GROUP-NOT-IN-POLICYFILE"));
+            setUser(request, USER_IN_UNKNOWN_GROUP);
 
             boolean isAccessAllowed = authorizer.isAccessAllowed(request);
 
-            AssertJUnit.assertEquals(false, isAccessAllowed);
+            AssertJUnit.assertEquals("user " + USER_IN_UNKNOWN_GROUP + " should not have been allowed " + AtlasPrivilege.ENTITY_UPDATE, false, isAccessAllowed);
         } catch (AtlasAuthorizationException e) {
             LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
 
@@ -112,36 +297,25 @@
     @Test(enabled = true)
     public void testLabels() {
         try {
-            AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_ADD_LABEL);
+            for (AtlasPrivilege privilege : LABEL_PRIVILEGES) {
+                for (String userName : Arrays.asList(USER_DATA_SCIENTIST, USER_DATA_STEWARD)) {
+                    AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, privilege);
 
-            request.setUser(USER_DATA_SCIENTIST, Collections.emptySet());
+                    setUser(request, userName);
 
-            boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+                    boolean isAccessAllowed = authorizer.isAccessAllowed(request);
 
-            AssertJUnit.assertEquals("user " + USER_DATA_SCIENTIST + " shouldn't be allowed to add label", false, isAccessAllowed);
+                    AssertJUnit.assertEquals("user " + userName + " should not have been allowed " + privilege, false, isAccessAllowed);
+                }
 
+                AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, privilege);
 
-            request.setUser(USER_DATA_STEWARD, Collections.emptySet());
+                setUser(request, USER_DATA_STEWARD_EX);
 
-            isAccessAllowed = authorizer.isAccessAllowed(request);
+                boolean isAccessAllowed = authorizer.isAccessAllowed(request);
 
-            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD + " should be allowed to add label", true, isAccessAllowed);
-
-
-            request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_REMOVE_LABEL);
-
-            request.setUser(USER_DATA_SCIENTIST, Collections.emptySet());
-
-            isAccessAllowed = authorizer.isAccessAllowed(request);
-
-            AssertJUnit.assertEquals("user " + USER_DATA_SCIENTIST + " shouldn't be allowed to remove label", false, isAccessAllowed);
-
-
-            request.setUser(USER_DATA_STEWARD, Collections.emptySet());
-
-            isAccessAllowed = authorizer.isAccessAllowed(request);
-
-            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD + " should be allowed to remove label", true, isAccessAllowed);
+                AssertJUnit.assertEquals("user " + USER_DATA_STEWARD_EX + " should have been allowed " + privilege, true, isAccessAllowed);
+            }
         } catch (AtlasAuthorizationException e) {
             LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
 
@@ -152,23 +326,33 @@
     @Test(enabled = true)
     public void testBusinessMetadata() {
         try {
+            for (String userName : Arrays.asList(USER_DATA_SCIENTIST, USER_DATA_STEWARD)) {
+                AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA);
+
+                setUser(request, userName);
+
+                boolean isAccessAllowed = authorizer.isAccessAllowed(request);
+
+                AssertJUnit.assertEquals("user " + userName + " should not have been allowed " + AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA, false, isAccessAllowed);
+            }
+
             AtlasEntityAccessRequest request = new AtlasEntityAccessRequest(null, AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA);
 
-            request.setUser(USER_DATA_SCIENTIST, Collections.emptySet());
+            setUser(request, USER_DATA_STEWARD_EX);
 
             boolean isAccessAllowed = authorizer.isAccessAllowed(request);
 
-            AssertJUnit.assertEquals("user " + USER_DATA_SCIENTIST + " shouldn't be allowed to update business-metadata", false, isAccessAllowed);
-
-            request.setUser(USER_DATA_STEWARD, Collections.emptySet());
-
-            isAccessAllowed = authorizer.isAccessAllowed(request);
-
-            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD + " should be allowed to update business-metadata", true, isAccessAllowed);
+            AssertJUnit.assertEquals("user " + USER_DATA_STEWARD_EX + " should have been allowed " + AtlasPrivilege.ENTITY_UPDATE_BUSINESS_METADATA, true, isAccessAllowed);
         } catch (AtlasAuthorizationException e) {
             LOG.error("Exception in AtlasSimpleAuthorizerTest", e);
 
             AssertJUnit.fail();
         }
     }
+
+    private void setUser(AtlasAccessRequest request, String userName) {
+        Set<String> userGroups = USER_GROUPS.get(userName);
+
+        request.setUser(userName, userGroups != null ? userGroups : Collections.emptySet());
+    }
 }
diff --git a/authorization/src/test/resources/atlas-simple-authz-policy.json b/authorization/src/test/resources/atlas-simple-authz-policy.json
index 9db6505..cada904 100644
--- a/authorization/src/test/resources/atlas-simple-authz-policy.json
+++ b/authorization/src/test/resources/atlas-simple-authz-policy.json
@@ -9,10 +9,11 @@
 
       "entityPermissions": [
         {
-          "privileges":      [ ".*" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ]
+          "privileges":            [ ".*" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "classifications":       [ ".*" ],
+          "entityClassifications": [ ".*" ]
         }
       ],
 
@@ -53,12 +54,77 @@
     "DATA_STEWARD": {
       "entityPermissions": [
         {
-          "privileges":       [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
-          "entityTypes":      [ ".*" ],
-          "entityIds":        [ ".*" ],
-          "classifications":  [ ".*" ],
-          "labels":           [ ".*" ],
-          "businessMetadata": [ ".*" ]
+          "privileges":            [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ ".*" ]
+        }
+      ]
+    },
+
+    "FINANCE": {
+      "entityPermissions": [
+        {
+          "privileges":            [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ "FINANCE" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ]
+        },
+        {
+          "privileges":            [ "entity-add-classification", "entity-update-classification", "entity-remove-classification" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ "FINANCE" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ "FINANCE" ]
+        }
+      ]
+    },
+
+    "FINANCE_PII": {
+      "entityPermissions": [
+        {
+          "privileges":            [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ "FINANCE", "PII" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ]
+        },
+        {
+          "privileges":            [ "entity-add-classification", "entity-update-classification", "entity-remove-classification" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ "FINANCE", "PII" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ "FINANCE", "PII" ]
+        }
+      ]
+    },
+
+    "DATA_STEWARD_EX": {
+      "entityPermissions": [
+        {
+          "privileges":            [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification", "entity-add-label", "entity-remove-label", "entity-update-business-metadata" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ "PII.*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ "PII.*" ]
         }
       ]
     }
@@ -75,6 +141,10 @@
     "ROLE_ADMIN":      [ "ROLE_ADMIN" ],
     "hadoop":          [ "DATA_STEWARD" ],
     "DATA_STEWARD":    [ "DATA_STEWARD" ],
-    "RANGER_TAG_SYNC": [ "DATA_SCIENTIST" ]
+    "RANGER_TAG_SYNC": [ "DATA_SCIENTIST" ],
+    "FINANCE":         [ "FINANCE" ],
+    "FINANCE_PII":     [ "FINANCE_PII" ],
+    "RANGER_TAG_SYNC": [ "DATA_SCIENTIST" ],
+    "DATA_STEWARD_EX": [ "DATA_STEWARD_EX" ]
   }
 }
diff --git a/distro/src/conf/atlas-simple-authz-policy.json b/distro/src/conf/atlas-simple-authz-policy.json
index 8a3ce60..a5761cc 100644
--- a/distro/src/conf/atlas-simple-authz-policy.json
+++ b/distro/src/conf/atlas-simple-authz-policy.json
@@ -15,13 +15,14 @@
       ],
       "entityPermissions": [
         {
-          "privileges":      [ ".*" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ],
-          "labels" : [ ".*" ],
-          "businessMetadata" : [ ".*" ],
-          "attributes" :[ ".*" ]
+          "privileges":            [ ".*" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ ".*" ]
         }
       ],
       "relationshipPermissions": [
@@ -41,10 +42,13 @@
     "DATA_SCIENTIST": {
       "entityPermissions": [
         {
-          "privileges":      [ "entity-read", "entity-read-classification" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ]
+          "privileges":            [ "entity-read", "entity-read-classification" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ]
         }
       ]
     },
@@ -52,10 +56,14 @@
     "DATA_STEWARD": {
       "entityPermissions": [
         {
-          "privileges":      [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification" ],
-          "entityTypes":     [ ".*" ],
-          "entityIds":       [ ".*" ],
-          "classifications": [ ".*" ]
+          "privileges":            [ "entity-read", "entity-create", "entity-update", "entity-read-classification", "entity-add-classification", "entity-update-classification", "entity-remove-classification" ],
+          "entityTypes":           [ ".*" ],
+          "entityIds":             [ ".*" ],
+          "entityClassifications": [ ".*" ],
+          "labels":                [ ".*" ],
+          "businessMetadata":      [ ".*" ],
+          "attributes":            [ ".*" ],
+          "classifications":       [ ".*" ]
         }
       ],
       "relationshipPermissions": [