RANGER-2692:RangerKafkaAuthorizer support for ConsumerGroup resource for authorization
diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index 38bc31c..0fcdbb5 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -57,7 +57,7 @@
},
"label":"Cluster",
"description":"Cluster",
- "accessTypeRestrictions": ["create", "configure", "alter_configs", "describe", "describe_configs", "kafka_admin", "idempotent_write"]
+ "accessTypeRestrictions": ["create", "configure", "alter_configs", "describe", "describe_configs", "kafka_admin", "idempotent_write", "cluster_action"]
},
{
"itemId":4,
@@ -74,6 +74,22 @@
"label":"Delegation Token",
"description":"Delegation Token",
"accessTypeRestrictions": ["describe"]
+ },
+ {
+ "itemId":5,
+ "name":"consumergroup",
+ "type":"string",
+ "level":1,
+ "mandatory":true,
+ "excludesSupported":true,
+ "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+ "matcherOptions":{
+ "wildCard":true,
+ "ignoreCase":true
+ },
+ "label":"Consumer Group",
+ "description":"Consumer Group",
+ "accessTypeRestrictions": ["consume", "describe", "delete"]
}
],
"accessTypes":[
@@ -107,6 +123,23 @@
"label":"Describe"
},
{
+ "itemId":7,
+ "name":"kafka_admin",
+ "label":"Kafka Admin",
+ "impliedGrants":[
+ "publish",
+ "consume",
+ "configure",
+ "describe",
+ "create",
+ "delete",
+ "describe_configs",
+ "alter_configs",
+ "idempotent_write",
+ "cluster_action"
+ ]
+ },
+ {
"itemId":8,
"name":"create",
"label":"Create"
@@ -120,19 +153,6 @@
]
},
{
- "itemId":7,
- "name":"kafka_admin",
- "label":"Kafka Admin",
- "impliedGrants":[
- "publish",
- "consume",
- "configure",
- "describe",
- "create",
- "delete"
- ]
- },
- {
"itemId":10,
"name":"idempotent_write",
"label":"Idempotent Write"
@@ -149,6 +169,11 @@
"impliedGrants":[
"describe_configs"
]
+ },
+ {
+ "itemId":13,
+ "name":"cluster_action",
+ "label":"Cluster Action"
}
],
"configs":[
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 43dd35f..8674521 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -56,7 +56,7 @@
public static final String KEY_TOPIC = "topic";
public static final String KEY_CLUSTER = "cluster";
- public static final String KEY_CONSUMER_GROUP = "consumer_group";
+ public static final String KEY_CONSUMER_GROUP = "consumergroup";
public static final String KEY_TRANSACTIONALID = "transactionalid";
public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
@@ -66,10 +66,10 @@
public static final String ACCESS_TYPE_DELETE = "delete";
public static final String ACCESS_TYPE_CONFIGURE = "configure";
public static final String ACCESS_TYPE_DESCRIBE = "describe";
- public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+ public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
private static volatile RangerBasePlugin rangerPlugin = null;
RangerKafkaAuditHandler auditHandler = null;
@@ -142,14 +142,6 @@
return false;
}
- // TODO: If resource type is consumer group, then allow it by default
- if (resource.resourceType().equals(Group$.MODULE$)) {
- if (logger.isDebugEnabled()) {
- logger.debug("If resource type is consumer group, then we allow it by default! Returning true");
- }
- return true;
- }
-
RangerPerfTracer perf = null;
if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
@@ -332,7 +324,7 @@
} else if (operation.equals(Describe$.MODULE$)) {
return ACCESS_TYPE_DESCRIBE;
} else if (operation.equals(ClusterAction$.MODULE$)) {
- return ACCESS_TYPE_KAFKA_ADMIN;
+ return ACCESS_TYPE_CLUSTER_ACTION;
} else if (operation.equals(Create$.MODULE$)) {
return ACCESS_TYPE_CREATE;
} else if (operation.equals(Delete$.MODULE$)) {
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json
index e4f5db1..70c978c 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -61,6 +61,10 @@
{
"type": "alter_configs",
"isAllowed": true
+ },
+ {
+ "type": "cluster_action",
+ "isAllowed": true
}
],
"users": [
@@ -139,6 +143,10 @@
{
"type": "alter_configs",
"isAllowed": true
+ },
+ {
+ "type": "cluster_action",
+ "isAllowed": true
}
],
"users": [
@@ -201,6 +209,10 @@
{
"type": "alter_configs",
"isAllowed": true
+ },
+ {
+ "type": "cluster_action",
+ "isAllowed": true
}
],
"users": [],
@@ -261,6 +273,10 @@
{
"type": "alter_configs",
"isAllowed": true
+ },
+ {
+ "type": "cluster_action",
+ "isAllowed": true
}
],
"users": ["kafka"],
@@ -399,6 +415,56 @@
"id": 31,
"isEnabled": true,
"version": 2
+ },
+ {
+ "service": "cl1_kafka",
+ "name": "ConsumerGroup Policy",
+ "policyType": 0,
+ "description": "ConsumerGroup Policy",
+ "isAuditEnabled": true,
+ "resources": {
+ "consumergroup": {
+ "values": [
+ "*"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [
+ {
+ "accesses": [
+ {
+ "type": "consume",
+ "isAllowed": true
+ },
+ {
+ "type": "describe",
+ "isAllowed": true
+ },
+ {
+ "type": "delete",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "admin","kafka", "client"
+ ],
+ "groups": [
+ "IT"
+ ],
+ "conditions": [],
+ "delegateAdmin": true
+ }
+ ],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "id": 32,
+ "isEnabled": true,
+ "version": 2
}
],
"serviceDef": {
@@ -518,6 +584,26 @@
"uiHint":"",
"label":"Delegation Token",
"description":"Delegation Token"
+ },
+ {
+ "itemId":5,
+ "name":"consumergroup",
+ "type":"string",
+ "level":1,
+ "mandatory":true,
+ "lookupSupported":false,
+ "recursiveSupported":false,
+ "excludesSupported":true,
+ "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+ "matcherOptions":{
+ "wildCard":true,
+ "ignoreCase":true
+ },
+ "validationRegEx":"",
+ "validationMessage":"",
+ "uiHint":"",
+ "label":"Consumer Group",
+ "description":"Consumer Group"
}
],
"accessTypes": [
@@ -573,7 +659,11 @@
"configure",
"describe",
"create",
- "delete"
+ "delete",
+ "describe_configs",
+ "alter_configs",
+ "idempotent_write",
+ "cluster_action"
]
},
{
@@ -593,6 +683,11 @@
"impliedGrants":[
"describe_configs"
]
+ },
+ {
+ "itemId":13,
+ "name":"cluster_action",
+ "label":"Cluster Action"
}
],
"policyConditions": [
@@ -853,6 +948,10 @@
"isAllowed": true
},
{
+ "type": "kafka:cluster_action",
+ "isAllowed": true
+ },
+ {
"type": "atlas:read",
"isAllowed": true
},
@@ -1306,7 +1405,11 @@
"kafka:configure",
"kafka:describe",
"kafka:create",
- "kafka:delete"
+ "kafka:delete",
+ "kafka:describe_configs",
+ "kafka:alter_configs",
+ "kafka:idempotent_write",
+ "kafka:cluster_action"
]
},
{
@@ -1328,6 +1431,12 @@
]
},
{
+ "itemId":9022,
+ "name":"cluster_action",
+ "label":"Cluster Action",
+ "impliedGrants": []
+ },
+ {
"itemId": 11012,
"name": "atlas:read",
"label": "read",
diff --git a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10033.java b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10033.java
new file mode 100644
index 0000000..b2e9b74
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10033.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.patch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.authorization.utils.JsonUtils;
+import org.apache.ranger.biz.RangerBizUtil;
+import org.apache.ranger.biz.ServiceDBStore;
+import org.apache.ranger.common.GUIDUtil;
+import org.apache.ranger.common.JSONUtil;
+import org.apache.ranger.common.RangerValidatorFactory;
+import org.apache.ranger.common.StringUtil;
+import org.apache.ranger.db.RangerDaoManager;
+import org.apache.ranger.entity.XXAccessTypeDef;
+import org.apache.ranger.entity.XXPolicy;
+import org.apache.ranger.entity.XXPolicyItem;
+import org.apache.ranger.entity.XXPolicyItemAccess;
+import org.apache.ranger.entity.XXPolicyItemUserPerm;
+import org.apache.ranger.entity.XXPolicyResource;
+import org.apache.ranger.entity.XXPolicyResourceMap;
+import org.apache.ranger.entity.XXPortalUser;
+import org.apache.ranger.entity.XXResourceDef;
+import org.apache.ranger.entity.XXService;
+import org.apache.ranger.entity.XXServiceDef;
+import org.apache.ranger.entity.XXUser;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
+import org.apache.ranger.plugin.model.RangerServiceDef;
+import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
+import org.apache.ranger.plugin.model.validation.RangerValidator.Action;
+import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.service.RangerPolicyService;
+import org.apache.ranger.service.XPermMapService;
+import org.apache.ranger.service.XPolicyService;
+import org.apache.ranger.util.CLIUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class PatchForKafkaServiceDefUpdate_J10033 extends BaseLoader {
+ private static final Logger logger = Logger.getLogger(PatchForKafkaServiceDefUpdate_J10033.class);
+ private static final String POLICY_NAME = "all - consumergroup";
+ private static final String LOGIN_ID_ADMIN = "admin";
+
+ private static final List<String> DEFAULT_POLICY_USERS = new ArrayList<>(Arrays.asList("kafka","rangerlookup"));
+
+ public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME = "kafka";
+ public static final String CONSUMERGROUP_RESOURCE_NAME = "consumergroup";
+
+
+ @Autowired
+ RangerDaoManager daoMgr;
+
+ @Autowired
+ ServiceDBStore svcDBStore;
+
+ @Autowired
+ JSONUtil jsonUtil;
+
+ @Autowired
+ RangerPolicyService policyService;
+
+ @Autowired
+ StringUtil stringUtil;
+
+ @Autowired
+ GUIDUtil guidUtil;
+
+ @Autowired
+ XPolicyService xPolService;
+
+ @Autowired
+ XPermMapService xPermMapService;
+
+ @Autowired
+ RangerBizUtil bizUtil;
+
+ @Autowired
+ RangerValidatorFactory validatorFactory;
+
+ @Autowired
+ ServiceDBStore svcStore;
+
+ public static void main(String[] args) {
+ logger.info("main()");
+ try {
+ PatchForKafkaServiceDefUpdate_J10033 loader = (PatchForKafkaServiceDefUpdate_J10033) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10033.class);
+ loader.init();
+ while (loader.isMoreToProcess()) {
+ loader.load();
+ }
+ logger.info("Load complete. Exiting!!!");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("Error loading", e);
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void init() throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void execLoad() {
+ logger.info("==> PatchForKafkaServiceDefUpdate_J10033.execLoad()");
+ try {
+ updateKafkaServiceDef();
+ } catch (Exception e) {
+ logger.error("Error while applying PatchForKafkaServiceDefUpdate_J10033...", e);
+ }
+ logger.info("<== PatchForKafkaServiceDefUpdate_J10033.execLoad()");
+ }
+
+ @Override
+ public void printStats() {
+ logger.info("PatchForKafkaServiceDefUpdate_J10033 ");
+ }
+
+ private void updateKafkaServiceDef(){
+ RangerServiceDef ret = null;
+ RangerServiceDef embeddedKafkaServiceDef = null;
+ RangerServiceDef dbKafkaServiceDef = null;
+ List<RangerServiceDef.RangerResourceDef> embeddedKafkaResourceDefs = null;
+ List<RangerServiceDef.RangerAccessTypeDef> embeddedKafkaAccessTypes = null;
+ XXServiceDef xXServiceDefObj = null;
+ try{
+ embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+ if(embeddedKafkaServiceDef!=null){
+
+ xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+ Map<String, String> serviceDefOptionsPreUpdate=null;
+ String jsonStrPreUpdate=null;
+ if(xXServiceDefObj!=null) {
+ jsonStrPreUpdate=xXServiceDefObj.getDefOptions();
+ serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate);
+ xXServiceDefObj=null;
+ }
+ dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+
+ if(dbKafkaServiceDef!=null){
+ embeddedKafkaResourceDefs = embeddedKafkaServiceDef.getResources();
+ embeddedKafkaAccessTypes = embeddedKafkaServiceDef.getAccessTypes();
+
+ if (checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) {
+ // This is to check if CONSUMERGROUP resource is added to the resource definition, if so update the resource def and accessType def
+ if (embeddedKafkaResourceDefs != null) {
+ dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs);
+ }
+ if (embeddedKafkaAccessTypes != null) {
+ if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString())) {
+ dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes);
+ }
+ }
+ }
+
+ RangerServiceDefValidator validator = validatorFactory.getServiceDefValidator(svcStore);
+ validator.validate(dbKafkaServiceDef, Action.UPDATE);
+
+ ret = svcStore.updateServiceDef(dbKafkaServiceDef);
+ if(ret==null){
+ logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+ throw new RuntimeException("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+ }
+ xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+ if(xXServiceDefObj!=null) {
+ String jsonStrPostUpdate=xXServiceDefObj.getDefOptions();
+ Map<String, String> serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate);
+ if (serviceDefOptionsPostUpdate != null && serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) {
+ if(serviceDefOptionsPreUpdate == null || !serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) {
+ String preUpdateValue = serviceDefOptionsPreUpdate == null ? null : serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+ if (preUpdateValue == null) {
+ serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+ } else {
+ serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES, preUpdateValue);
+ }
+ xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate));
+ daoMgr.getXXServiceDef().update(xXServiceDefObj);
+ }
+ }
+ createDefaultPolicyForNewResources();
+ }
+ }
+ }
+ }catch(Exception e)
+ {
+ logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e);
+ }
+ }
+
+ private boolean checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> resourceDefs) {
+ boolean ret = false;
+ for(RangerServiceDef.RangerResourceDef resourceDef : resourceDefs) {
+ if (CONSUMERGROUP_RESOURCE_NAME.equals(resourceDef.getName()) ) {
+ ret = true ;
+ break;
+ }
+ }
+ return ret;
+ }
+
+ private String mapToJsonString(Map<String, String> map) {
+ String ret = null;
+ if(map != null) {
+ try {
+ ret = jsonUtil.readMapToString(map);
+ } catch(Exception excp) {
+ logger.warn("mapToJsonString() failed to convert map: " + map, excp);
+ }
+ }
+ return ret;
+ }
+
+ protected Map<String, String> jsonStringToMap(String jsonStr) {
+ Map<String, String> ret = null;
+ if(!StringUtils.isEmpty(jsonStr)) {
+ try {
+ ret = jsonUtil.jsonToMap(jsonStr);
+ } catch(Exception excp) {
+ // fallback to earlier format: "name1=value1;name2=value2"
+ for(String optionString : jsonStr.split(";")) {
+ if(StringUtils.isEmpty(optionString)) {
+ continue;
+ }
+ String[] nvArr = optionString.split("=");
+ String name = (nvArr != null && nvArr.length > 0) ? nvArr[0].trim() : null;
+ String value = (nvArr != null && nvArr.length > 1) ? nvArr[1].trim() : null;
+ if(StringUtils.isEmpty(name)) {
+ continue;
+ }
+ if(ret == null) {
+ ret = new HashMap<String, String>();
+ }
+ ret.put(name, value);
+ }
+ }
+ }
+ return ret;
+ }
+
+ private void createDefaultPolicyForNewResources() {
+ logger.info("==> createDefaultPolicyForNewResources ");
+ XXPortalUser xxPortalUser = daoMgr.getXXPortalUser().findByLoginId(LOGIN_ID_ADMIN);
+ Long currentUserId = xxPortalUser.getId();
+
+ XXServiceDef xXServiceDefObj = daoMgr.getXXServiceDef()
+ .findByName(EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+ if (xXServiceDefObj == null) {
+ logger.debug("ServiceDef not fount with name :" + EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+ return;
+ }
+
+ Long xServiceDefId = xXServiceDefObj.getId();
+ List<XXService> xxServices = daoMgr.getXXService().findByServiceDefId(xServiceDefId);
+
+ for (XXService xxService : xxServices) {
+ int resourceMapOrder = 0;
+ XXPolicy xxPolicy = new XXPolicy();
+ xxPolicy.setName(POLICY_NAME);
+ xxPolicy.setDescription(POLICY_NAME);
+ xxPolicy.setService(xxService.getId());
+ xxPolicy.setPolicyPriority(RangerPolicy.POLICY_PRIORITY_NORMAL);
+ xxPolicy.setIsAuditEnabled(Boolean.TRUE);
+ xxPolicy.setIsEnabled(Boolean.TRUE);
+ xxPolicy.setPolicyType(RangerPolicy.POLICY_TYPE_ACCESS);
+ xxPolicy.setGuid(guidUtil.genGUID());
+ xxPolicy.setAddedByUserId(currentUserId);
+ xxPolicy.setUpdatedByUserId(currentUserId);
+ RangerPolicy rangerPolicy = getRangerPolicy(POLICY_NAME,xxPortalUser,xxService);
+ xxPolicy.setPolicyText(JsonUtils.objectToJson(rangerPolicy));
+ xxPolicy.setResourceSignature(rangerPolicy.getResourceSignature());
+ xxPolicy.setZoneId(1L);
+ XXPolicy createdPolicy = daoMgr.getXXPolicy().create(xxPolicy);
+
+ XXPolicyItem xxPolicyItem = new XXPolicyItem();
+ xxPolicyItem.setIsEnabled(Boolean.TRUE);
+ xxPolicyItem.setDelegateAdmin(Boolean.TRUE);
+ xxPolicyItem.setItemType(0);
+ xxPolicyItem.setOrder(0);
+ xxPolicyItem.setAddedByUserId(currentUserId);
+ xxPolicyItem.setUpdatedByUserId(currentUserId);
+ xxPolicyItem.setPolicyId(createdPolicy.getId());
+ XXPolicyItem createdXXPolicyItem = daoMgr.getXXPolicyItem().create(xxPolicyItem);
+
+ List<String> accessTypes = getAccessTypes();
+ for (int i = 0; i < accessTypes.size(); i++) {
+ XXAccessTypeDef xAccTypeDef = daoMgr.getXXAccessTypeDef().findByNameAndServiceId(accessTypes.get(i),
+ xxPolicy.getService());
+ if (xAccTypeDef == null) {
+ throw new RuntimeException(accessTypes.get(i) + ": is not a valid access-type. policy='"
+ + xxPolicy.getName() + "' service='" + xxPolicy.getService() + "'");
+ }
+ XXPolicyItemAccess xPolItemAcc = new XXPolicyItemAccess();
+ xPolItemAcc.setIsAllowed(Boolean.TRUE);
+ xPolItemAcc.setType(xAccTypeDef.getId());
+ xPolItemAcc.setOrder(i);
+ xPolItemAcc.setAddedByUserId(currentUserId);
+ xPolItemAcc.setUpdatedByUserId(currentUserId);
+ xPolItemAcc.setPolicyitemid(createdXXPolicyItem.getId());
+ daoMgr.getXXPolicyItemAccess().create(xPolItemAcc);
+ }
+
+ for (int i = 0; i < DEFAULT_POLICY_USERS.size(); i++) {
+ String user = DEFAULT_POLICY_USERS.get(i);
+ if (StringUtils.isBlank(user)) {
+ continue;
+ }
+ XXUser xxUser = daoMgr.getXXUser().findByUserName(user);
+ if (xxUser == null) {
+ throw new RuntimeException(user + ": user does not exist. policy='" + xxPolicy.getName()
+ + "' service='" + xxPolicy.getService() + "' user='" + user + "'");
+ }
+ XXPolicyItemUserPerm xUserPerm = new XXPolicyItemUserPerm();
+ xUserPerm.setUserId(xxUser.getId());
+ xUserPerm.setPolicyItemId(createdXXPolicyItem.getId());
+ xUserPerm.setOrder(i);
+ xUserPerm.setAddedByUserId(currentUserId);
+ xUserPerm.setUpdatedByUserId(currentUserId);
+ daoMgr.getXXPolicyItemUserPerm().create(xUserPerm);
+ }
+
+
+ String policyResourceName = CONSUMERGROUP_RESOURCE_NAME;
+
+ XXResourceDef xResDef = daoMgr.getXXResourceDef().findByNameAndPolicyId(policyResourceName,
+ createdPolicy.getId());
+ if (xResDef == null) {
+ throw new RuntimeException(policyResourceName + ": is not a valid resource-type. policy='"
+ + createdPolicy.getName() + "' service='" + createdPolicy.getService() + "'");
+ }
+
+ XXPolicyResource xPolRes = new XXPolicyResource();
+
+ xPolRes.setAddedByUserId(currentUserId);
+ xPolRes.setUpdatedByUserId(currentUserId);
+ xPolRes.setIsExcludes(Boolean.FALSE);
+ xPolRes.setIsRecursive(Boolean.FALSE);
+ xPolRes.setPolicyId(createdPolicy.getId());
+ xPolRes.setResDefId(xResDef.getId());
+ xPolRes = daoMgr.getXXPolicyResource().create(xPolRes);
+
+ XXPolicyResourceMap xPolResMap = new XXPolicyResourceMap();
+ xPolResMap.setResourceId(xPolRes.getId());
+ xPolResMap.setValue("*");
+ xPolResMap.setOrder(resourceMapOrder);
+ xPolResMap.setAddedByUserId(currentUserId);
+ xPolResMap.setUpdatedByUserId(currentUserId);
+ daoMgr.getXXPolicyResourceMap().create(xPolResMap);
+ resourceMapOrder++;
+ logger.info("Creating policy for service id : " + xxService.getId());
+ }
+ logger.info("<== createDefaultPolicyForNewResources ");
+ }
+
+
+ private RangerPolicy getRangerPolicy(String newResource, XXPortalUser xxPortalUser, XXService xxService) {
+ RangerPolicy policy = new RangerPolicy();
+
+ List<RangerPolicy.RangerPolicyItemAccess> accesses = getPolicyItemAccesses();
+ List<String> users = new ArrayList<>(DEFAULT_POLICY_USERS);
+ List<String> groups = new ArrayList<>();
+ List<RangerPolicy.RangerPolicyItemCondition> conditions = new ArrayList<>();
+ List<RangerPolicy.RangerPolicyItem> policyItems = new ArrayList<>();
+ RangerPolicy.RangerPolicyItem rangerPolicyItem = new RangerPolicy.RangerPolicyItem();
+ rangerPolicyItem.setAccesses(accesses);
+ rangerPolicyItem.setConditions(conditions);
+ rangerPolicyItem.setGroups(groups);
+ rangerPolicyItem.setUsers(users);
+ rangerPolicyItem.setDelegateAdmin(false);
+
+ policyItems.add(rangerPolicyItem);
+
+ Map<String, RangerPolicy.RangerPolicyResource> policyResource = new HashMap<>();
+ RangerPolicy.RangerPolicyResource rangerPolicyResource = new RangerPolicy.RangerPolicyResource();
+ rangerPolicyResource.setIsExcludes(false);
+ rangerPolicyResource.setIsRecursive(false);
+ rangerPolicyResource.setValue("*");
+ String policyResourceName = CONSUMERGROUP_RESOURCE_NAME;
+ policyResource.put(policyResourceName, rangerPolicyResource);
+ policy.setCreateTime(new Date());
+ policy.setDescription(newResource);
+ policy.setIsEnabled(true);
+ policy.setName(newResource);
+ policy.setCreatedBy(xxPortalUser.getLoginId());
+ policy.setUpdatedBy(xxPortalUser.getLoginId());
+ policy.setUpdateTime(new Date());
+ policy.setService(xxService.getName());
+ policy.setIsAuditEnabled(true);
+ policy.setPolicyItems(policyItems);
+ policy.setResources(policyResource);
+ policy.setPolicyType(0);
+ policy.setId(0L);
+ policy.setGuid("");
+ policy.setPolicyLabels(new ArrayList<>());
+ policy.setVersion(1L);
+ RangerPolicyResourceSignature resourceSignature = new RangerPolicyResourceSignature(policy);
+ policy.setResourceSignature(resourceSignature.getSignature());
+ return policy;
+ }
+
+ private List<String> getAccessTypes() {
+ List<String> accessTypes = Arrays.asList("consume", "describe", "delete");
+ return accessTypes;
+ }
+
+ private ArrayList<RangerPolicy.RangerPolicyItemAccess> getPolicyItemAccesses() {
+ ArrayList<RangerPolicy.RangerPolicyItemAccess> rangerPolicyItemAccesses = new ArrayList<>();
+ for(String type:getAccessTypes()) {
+ RangerPolicy.RangerPolicyItemAccess policyItemAccess = new RangerPolicy.RangerPolicyItemAccess();
+ policyItemAccess.setType(type);
+ policyItemAccess.setIsAllowed(true);
+ rangerPolicyItemAccesses.add(policyItemAccess);
+ }
+ return rangerPolicyItemAccesses;
+ }
+}
\ No newline at end of file