RANGER-4741: Hive plugin optimization to avoid excessive metastore API calls
diff --git a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
index 9b25e2b..0492108 100644
--- a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
+++ b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
@@ -719,7 +719,7 @@
 
 		try {
 			List<HivePrivilegeObject> outputs = new ArrayList<>(Arrays.asList(hivePrivObject));
-			RangerHiveResource resource = getHiveResource(HiveOperationType.GRANT_PRIVILEGE, hivePrivObject, null, outputs);
+			RangerHiveResource resource = getHiveResource(HiveOperationType.GRANT_PRIVILEGE, hivePrivObject, null, outputs, null);
 			GrantRevokeRequest request  = createGrantRevokeData(resource, hivePrincipals, hivePrivileges, grantorPrincipal, grantOption);
 
 			LOG.info("grantPrivileges(): " + request);
@@ -760,7 +760,7 @@
 
 		try {
 			List<HivePrivilegeObject> outputs = new ArrayList<>(Arrays.asList(hivePrivObject));
-			RangerHiveResource resource = getHiveResource(HiveOperationType.REVOKE_PRIVILEGE, hivePrivObject, null, outputs);
+			RangerHiveResource resource = getHiveResource(HiveOperationType.REVOKE_PRIVILEGE, hivePrivObject, null, outputs, null);
 			GrantRevokeRequest request  = createGrantRevokeData(resource, hivePrincipals, hivePrivileges, grantorPrincipal, grantOption);
 
 			LOG.info("revokePrivileges(): " + request);
@@ -806,6 +806,7 @@
 			String                  user           = ugi.getShortUserName();
 			Set<String>             groups         = Sets.newHashSet(ugi.getGroupNames());
 			Set<String>             roles          = getCurrentRolesForUser(user, groups);
+			Map<String, String>     objOwners      = new HashMap<>();
 
 			if(LOG.isDebugEnabled()) {
 				LOG.debug(toString(hiveOpType, inputHObjs, outputHObjs, context, sessionContext));
@@ -825,7 +826,7 @@
 
 			if(!CollectionUtils.isEmpty(inputHObjs)) {
 				for(HivePrivilegeObject hiveObj : inputHObjs) {
-					RangerHiveResource resource = getHiveResource(hiveOpType, hiveObj, inputHObjs, outputHObjs);
+					RangerHiveResource resource = getHiveResource(hiveOpType, hiveObj, inputHObjs, outputHObjs, objOwners);
 
 					if (resource == null) { // possible if input object/object is of a kind that we don't currently authorize
 						continue;
@@ -923,7 +924,7 @@
 
 			if(!CollectionUtils.isEmpty(outputHObjs)) {
 				for(HivePrivilegeObject hiveObj : outputHObjs) {
-					RangerHiveResource resource = getHiveResource(hiveOpType, hiveObj, inputHObjs, outputHObjs);
+					RangerHiveResource resource = getHiveResource(hiveOpType, hiveObj, inputHObjs, outputHObjs, objOwners);
 
 					if (resource == null) { // possible if input object/object is of a kind that we don't currently authorize
 						continue;
@@ -1158,6 +1159,8 @@
 			String user = ugi.getShortUserName();
 			Set<String> groups = Sets.newHashSet(ugi.getGroupNames());
 			Set<String> roles  = getCurrentRolesForUser(user, groups);
+			Map<String, String> objOwners = new HashMap<>();
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug(String.format("filterListCmdObjects: user[%s], groups[%s], roles[%s] ", user, groups, roles));
 			}
@@ -1180,7 +1183,7 @@
 					LOG.debug(String.format(format, actionType, objectType, objectName, dbName, columns, partitionKeys, commandString, ipAddress));
 				}
 				
-				RangerHiveResource resource = createHiveResourceForFiltering(privilegeObject);
+				RangerHiveResource resource = createHiveResourceForFiltering(privilegeObject, objOwners);
 				if (resource == null) {
 					LOG.error("filterListCmdObjects: RangerHiveResource returned by createHiveResource is null");
 				} else {
@@ -1230,7 +1233,8 @@
 		}
 
 		if(CollectionUtils.isNotEmpty(hiveObjs)) {
-			IMetaStoreClient metaStoreClient = getMetaStoreClient();
+			IMetaStoreClient    metaStoreClient = getMetaStoreClient();
+			Map<String, String> objOwners       = new HashMap<>();
 
 			for (HivePrivilegeObject hiveObj : hiveObjs) {
 				HivePrivilegeObjectType hiveObjType = hiveObj.getType();
@@ -1249,7 +1253,7 @@
 					String database = hiveObj.getDbname();
 					String table    = hiveObj.getObjectName();
 
-					String rowFilterExpr = getRowFilterExpression(queryContext, hiveObj, metaStoreClient);
+					String rowFilterExpr = getRowFilterExpression(queryContext, hiveObj, metaStoreClient, objOwners);
 
 					if (StringUtils.isNotBlank(rowFilterExpr)) {
 						if(LOG.isDebugEnabled()) {
@@ -1264,7 +1268,7 @@
 						List<String> columnTransformers = new ArrayList<String>();
 
 						for (String column : hiveObj.getColumns()) {
-							boolean isColumnTransformed = addCellValueTransformerAndCheckIfTransformed(queryContext, hiveObj, column, columnTransformers, metaStoreClient);
+							boolean isColumnTransformed = addCellValueTransformerAndCheckIfTransformed(queryContext, hiveObj, column, columnTransformers, metaStoreClient, objOwners);
 
 							if(LOG.isDebugEnabled()) {
 								LOG.debug("addCellValueTransformerAndCheckIfTransformed(database=" + database + ", table=" + table + ", column=" + column + "): " + isColumnTransformed);
@@ -1333,7 +1337,7 @@
 		return result != null && result.isRowFilterEnabled() && StringUtils.isNotEmpty(result.getFilterExpr());
 	}
 
-	private String getRowFilterExpression(HiveAuthzContext context, HivePrivilegeObject tableOrView, IMetaStoreClient metaStoreClient) throws SemanticException {
+	private String getRowFilterExpression(HiveAuthzContext context, HivePrivilegeObject tableOrView, IMetaStoreClient metaStoreClient, Map<String, String> objOwners) throws SemanticException {
 		UserGroupInformation ugi = getCurrentUserGroupInfo();
 
 		if(ugi == null) {
@@ -1359,7 +1363,7 @@
 			HiveObjectType          objectType     = HiveObjectType.TABLE;
 			RangerHiveResource      resource       = new RangerHiveResource(objectType, databaseName, tableOrViewName);
 
-			setOwnerUser(resource, tableOrView, metaStoreClient);
+			setOwnerUser(resource, tableOrView, metaStoreClient, objOwners);
 
 			RangerHiveAccessRequest request = new RangerHiveAccessRequest(resource, user, groups, roles, objectType.name(), HiveAccessType.SELECT, context, sessionContext);
 			RangerAccessResult      result  = hivePlugin.evalRowFilterPolicies(request, auditHandler);
@@ -1378,7 +1382,7 @@
 		return ret;
 	}
 
-	private boolean addCellValueTransformerAndCheckIfTransformed(HiveAuthzContext context, HivePrivilegeObject tableOrView, String columnName, List<String> columnTransformers, IMetaStoreClient metaStoreClient) throws SemanticException {
+	private boolean addCellValueTransformerAndCheckIfTransformed(HiveAuthzContext context, HivePrivilegeObject tableOrView, String columnName, List<String> columnTransformers, IMetaStoreClient metaStoreClient, Map<String, String> objOwners) throws SemanticException {
 		UserGroupInformation ugi = getCurrentUserGroupInfo();
 
 		if(ugi == null) {
@@ -1406,7 +1410,7 @@
 			HiveObjectType          objectType     = HiveObjectType.COLUMN;
 			RangerHiveResource      resource       = new RangerHiveResource(objectType, databaseName, tableOrViewName, columnName);
 
-			setOwnerUser(resource, tableOrView, metaStoreClient);
+			setOwnerUser(resource, tableOrView, metaStoreClient, objOwners);
 
 			RangerHiveAccessRequest request = new RangerHiveAccessRequest(resource, user, groups, roles, objectType.name(), HiveAccessType.SELECT, context, sessionContext);
 			RangerAccessResult      result  = hivePlugin.evalDataMaskPolicies(request, auditHandler);
@@ -1457,7 +1461,7 @@
 		return ret;
 	}
 
-	private RangerHiveResource createHiveResourceForFiltering(HivePrivilegeObject privilegeObject) {
+	private RangerHiveResource createHiveResourceForFiltering(HivePrivilegeObject privilegeObject, Map<String, String> objOwners) {
 		RangerHiveResource resource = null;
 
 		HivePrivilegeObjectType objectType = privilegeObject.getType();
@@ -1465,7 +1469,7 @@
 		switch(objectType) {
 			case DATABASE:
 			case TABLE_OR_VIEW:
-				resource = createHiveResource(privilegeObject);
+				resource = createHiveResource(privilegeObject, objOwners);
 				break;
 			default:
 				LOG.warn("RangerHiveAuthorizer.createHiveResourceForFiltering: unexpected objectType:" + objectType);
@@ -1475,6 +1479,10 @@
 	}
 
 	RangerHiveResource createHiveResource(HivePrivilegeObject privilegeObject) {
+		return createHiveResource(privilegeObject, null);
+	}
+
+	RangerHiveResource createHiveResource(HivePrivilegeObject privilegeObject, Map<String, String> objOwners) {
 		RangerHiveResource resource = null;
 		HivePrivilegeObjectType objectType = privilegeObject.getType();
 		String objectName = privilegeObject.getObjectName();
@@ -1501,7 +1509,7 @@
 		}
 
 		if (resource != null) {
-			setOwnerUser(resource, privilegeObject, getMetaStoreClient());
+			setOwnerUser(resource, privilegeObject, getMetaStoreClient(), objOwners);
 
 			resource.setServiceDef(hivePlugin == null ? null : hivePlugin.getServiceDef());
 		}
@@ -1513,7 +1521,8 @@
 	private RangerHiveResource getHiveResource(HiveOperationType   hiveOpType,
 											   HivePrivilegeObject hiveObj,
 											   List<HivePrivilegeObject> inputs,
-											   List<HivePrivilegeObject> outputs) {
+											   List<HivePrivilegeObject> outputs,
+											   Map<String, String> objOwners) {
 		RangerHiveResource ret = null;
 
 		HiveObjectType objectType = getObjectType(hiveObj, hiveOpType);
@@ -1522,7 +1531,7 @@
 			case DATABASE:
 				ret = new RangerHiveResource(objectType, hiveObj.getDbname());
 				if (!isCreateOperation(hiveOpType)) {
-					setOwnerUser(ret, hiveObj, getMetaStoreClient());
+					setOwnerUser(ret, hiveObj, getMetaStoreClient(), objOwners);
 				}
 			break;
 	
@@ -1536,12 +1545,12 @@
 							", Size of outputs = [" + (CollectionUtils.isNotEmpty(outputs) ? outputs.size() : 0) + "]");
 				}
 
-				setOwnerUser(ret, hiveObj, getMetaStoreClient());
+				setOwnerUser(ret, hiveObj, getMetaStoreClient(), objOwners);
 
 				if (isCreateOperation(hiveOpType)) {
 					HivePrivilegeObject dbObject = getDatabaseObject(hiveObj.getDbname(), inputs, outputs);
 					if (dbObject != null) {
-						setOwnerUser(ret, dbObject, getMetaStoreClient());
+						setOwnerUser(ret, dbObject, getMetaStoreClient(), objOwners);
 					}
 				}
 
@@ -1554,7 +1563,7 @@
 	
 			case COLUMN:
 				ret = new RangerHiveResource(objectType, hiveObj.getDbname(), hiveObj.getObjectName(), StringUtils.join(hiveObj.getColumns(), COLUMN_SEP));
-				setOwnerUser(ret, hiveObj, getMetaStoreClient());
+				setOwnerUser(ret, hiveObj, getMetaStoreClient(), objOwners);
 			break;
 
             case URI:
@@ -3127,16 +3136,28 @@
 		return request;
 	}
 
-	static void setOwnerUser(RangerHiveResource resource, HivePrivilegeObject hiveObj, IMetaStoreClient metaStoreClient) {
+	static void setOwnerUser(RangerHiveResource resource, HivePrivilegeObject hiveObj, IMetaStoreClient metaStoreClient, Map<String, String> objOwners) {
 		if (hiveObj != null) {
+			String objName = null;
+			String owner   = null;
+
 			// resource.setOwnerUser(hiveObj.getOwnerName());
 			switch (hiveObj.getType()) {
 				case DATABASE:
 					try {
-						Database database = metaStoreClient != null ? metaStoreClient.getDatabase(hiveObj.getDbname()) : null;
+						objName = hiveObj.getDbname();
+						owner   = objOwners != null ? objOwners.get(objName) : null;
 
-						if (database != null) {
-							resource.setOwnerUser(database.getOwnerName());
+						if (StringUtils.isBlank(owner)) {
+							Database database = metaStoreClient != null ? metaStoreClient.getDatabase(hiveObj.getDbname()) : null;
+
+							if (database != null) {
+								owner = database.getOwnerName();
+							}
+						} else {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Owner for database " + objName + " is already known");
+							}
 						}
 					} catch (Exception excp) {
 						LOG.error("failed to get database object from Hive metastore. dbName=" + hiveObj.getDbname(), excp);
@@ -3146,16 +3167,33 @@
 				case TABLE_OR_VIEW:
 				case COLUMN:
 					try {
-						Table table = metaStoreClient != null ? metaStoreClient.getTable(hiveObj.getDbname(), hiveObj.getObjectName()) : null;
+						objName = hiveObj.getDbname() + "." + hiveObj.getObjectName();
+						owner   = objOwners != null ? objOwners.get(objName) : null;
 
-						if (table != null) {
-							resource.setOwnerUser(table.getOwner());
+						if (StringUtils.isBlank(owner)) {
+							Table table = metaStoreClient != null ? metaStoreClient.getTable(hiveObj.getDbname(), hiveObj.getObjectName()) : null;
+
+							if (table != null) {
+								owner = table.getOwner();
+							}
+						} else {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Owner for table " + objName + " is already known");
+							}
 						}
 					} catch (Exception excp) {
 						LOG.error("failed to get table object from Hive metastore. dbName=" + hiveObj.getDbname() + ", tblName=" + hiveObj.getObjectName(), excp);
 					}
 					break;
 			}
+
+			if (objOwners != null && objName != null) {
+				objOwners.put(objName, owner);
+			}
+
+			if (StringUtils.isNotBlank(objName) && StringUtils.isNotBlank(owner)) {
+				resource.setOwnerUser(owner);
+			}
 		}
 
 		if (LOG.isDebugEnabled()) {