Merge branch 'master' into RANGER-3923
diff --git a/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java b/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
index dcad9f2..a14c7a9 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
@@ -1255,7 +1255,7 @@
 					String               userName   = grantRequest.getGrantor();
 					Set<String>          userGroups = CollectionUtils.isNotEmpty(grantRequest.getGrantorGroups()) ? grantRequest.getGrantorGroups() : userMgr.getGroupsForUser(userName);
 					String				 ownerUser  = grantRequest.getOwnerUser();
-					RangerAccessResource resource   = new RangerAccessResourceImpl(StringUtil.toStringObjectMap(grantRequest.getResource()), ownerUser);
+					RangerAccessResource resource   = new RangerAccessResourceImpl(getAccessResourceObjectMap(grantRequest.getResource()), ownerUser);
 					Set<String>			 accessTypes = grantRequest.getAccessTypes();
 					VXUser               vxUser = xUserService.getXUserByUserName(userName);
 
@@ -1301,10 +1301,7 @@
 			
 						if(! CollectionUtils.isEmpty(resourceNames)) {
 							for(String resourceName : resourceNames) {
-								RangerPolicyResource policyResource = new RangerPolicyResource((String) resource.getValue(resourceName));
-								policyResource.setIsRecursive(grantRequest.getIsRecursive());
-		
-								policyResources.put(resourceName, policyResource);
+								policyResources.put(resourceName, getPolicyResource(resource.getValue(resourceName), grantRequest));
 							}
 						}
 						policy.setResources(policyResources);
@@ -1379,7 +1376,7 @@
 					Set<String>          userGroups = grantRequest.getGrantorGroups();
 					String				 ownerUser  = grantRequest.getOwnerUser();
 
-					RangerAccessResource resource   = new RangerAccessResourceImpl(StringUtil.toStringObjectMap(grantRequest.getResource()), ownerUser);
+					RangerAccessResource resource   = new RangerAccessResourceImpl(getAccessResourceObjectMap(grantRequest.getResource()), ownerUser);
 					Set<String>			 accessTypes = grantRequest.getAccessTypes();
 					String               zoneName   = getRangerAdminZoneName(serviceName, grantRequest);
 
@@ -1423,10 +1420,7 @@
 
 							if(! CollectionUtils.isEmpty(resourceNames)) {
 								for(String resourceName : resourceNames) {
-									RangerPolicyResource policyResource = new RangerPolicyResource((String) resource.getValue(resourceName));
-									policyResource.setIsRecursive(grantRequest.getIsRecursive());
-
-									policyResources.put(resourceName, policyResource);
+									policyResources.put(resourceName, getPolicyResource(resource.getValue(resourceName), grantRequest));
 								}
 							}
 							policy.setResources(policyResources);
@@ -1502,7 +1496,7 @@
 					String               userName   = revokeRequest.getGrantor();
 					Set<String>          userGroups = CollectionUtils.isNotEmpty(revokeRequest.getGrantorGroups()) ? revokeRequest.getGrantorGroups() : userMgr.getGroupsForUser(userName);
 					String				 ownerUser  = revokeRequest.getOwnerUser();
-					RangerAccessResource resource   = new RangerAccessResourceImpl(StringUtil.toStringObjectMap(revokeRequest.getResource()), ownerUser);
+					RangerAccessResource resource   = new RangerAccessResourceImpl(getAccessResourceObjectMap(revokeRequest.getResource()), ownerUser);
 					Set<String>			 accessTypes = revokeRequest.getAccessTypes();
 					VXUser vxUser = xUserService.getXUserByUserName(userName);
 
@@ -1591,7 +1585,7 @@
 					Set<String> userGroups = revokeRequest.getGrantorGroups();
 					String ownerUser = revokeRequest.getOwnerUser();
 
-					RangerAccessResource resource = new RangerAccessResourceImpl(StringUtil.toStringObjectMap(revokeRequest.getResource()), ownerUser);
+					RangerAccessResource resource = new RangerAccessResourceImpl(getAccessResourceObjectMap(revokeRequest.getResource()), ownerUser);
 					Set<String>			 accessTypes = revokeRequest.getAccessTypes();
 					String               zoneName = getRangerAdminZoneName(serviceName, revokeRequest);
 
@@ -4040,6 +4034,37 @@
 		return ret;
 	}
 
+	public RangerPolicyResource getPolicyResource(Object resourceName, GrantRevokeRequest grantRequest) {
+		RangerPolicyResource ret;
+		if (resourceName instanceof List) {
+			List<String> resourceValues = (List<String>) resourceName;
+			ret = new RangerPolicyResource(resourceValues, false, grantRequest.getIsRecursive());
+		} else {
+			ret = new RangerPolicyResource((String) resourceName);
+			ret.setIsRecursive(grantRequest.getIsRecursive());
+		}
+		return ret;
+	}
+
+	public static Map<String, Object> getAccessResourceObjectMap(Map<String, String> map) {
+		Map<String, Object> ret = null;
+
+		if (map != null) {
+			ret = new HashMap<>(map.size());
+
+			for (Map.Entry<String, String> e : map.entrySet()) {
+				if (e.getValue().contains(",")) {
+					List<String> values = Arrays.asList(e.getValue().split(","));
+					ret.put(e.getKey(),values);
+				} else {
+					ret.put(e.getKey(), e.getValue());
+				}
+			}
+		}
+
+		return ret;
+	}
+
 	private HashMap<String, Object> getCSRFPropertiesMap(HttpServletRequest request) {
 		HashMap<String, Object> map = new HashMap<String, Object>();
 		map.put(isCSRF_ENABLED, PropertiesUtil.getBooleanProperty(isCSRF_ENABLED, true));
diff --git a/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java b/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
index 2c826df..ff5fe21 100644
--- a/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
+++ b/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
@@ -21,19 +21,13 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.WebApplicationException;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.admin.client.datatype.RESTResponse;
@@ -78,6 +72,8 @@
 import org.apache.ranger.plugin.model.validation.RangerPolicyValidator;
 import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
 import org.apache.ranger.plugin.model.validation.RangerServiceValidator;
+import org.apache.ranger.plugin.policyengine.RangerAccessResource;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerPolicyEngineImpl;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
 import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
@@ -822,6 +818,73 @@
 	}
 
 	@Test
+	public void test14_1_grantAccessWithMultiColumns() throws Exception {
+		HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+
+		String serviceName = "HIVE";
+		Set<String> userList = new HashSet<String>();
+		userList.add("user1");
+		userList.add("user2");
+		userList.add("user3");
+
+		Map<String, String> grantResource = new HashMap<>();
+		grantResource.put("database", "demo");
+		grantResource.put("table", "testtbl");
+		grantResource.put("column", "column1,column2,colum3");
+		GrantRevokeRequest grantRequestObj = new GrantRevokeRequest();
+
+		grantRequestObj.setResource(grantResource);
+		grantRequestObj.setUsers(userList);
+		grantRequestObj.setAccessTypes(new HashSet<>(Arrays.asList("select")));
+		grantRequestObj.setDelegateAdmin(true);
+		grantRequestObj.setEnableAudit(true);
+		grantRequestObj.setGrantor("systest");
+		grantRequestObj.setIsRecursive(true);
+
+		RangerAccessResource resource = new RangerAccessResourceImpl(serviceREST.getAccessResourceObjectMap(grantRequestObj.getResource()), "systest");
+
+		RangerPolicy createPolicy = new RangerPolicy();
+		createPolicy.setService(serviceName);
+		createPolicy.setName("grant-" + System.currentTimeMillis());
+		createPolicy.setDescription("created by grant");
+		createPolicy.setIsAuditEnabled(grantRequestObj.getEnableAudit());
+
+		Map<String, RangerPolicyResource> policyResources = new HashMap<>();
+		Set<String> resourceNames = resource.getKeys();
+
+		if (!CollectionUtils.isEmpty(resourceNames)) {
+			for (String resourceName : resourceNames) {
+				policyResources.put(resourceName, serviceREST.getPolicyResource(resource.getValue(resourceName), grantRequestObj));
+			}
+		}
+		createPolicy.setResources(policyResources);
+
+		RangerPolicyItem policyItem = new RangerPolicyItem();
+		policyItem.setDelegateAdmin(grantRequestObj.getDelegateAdmin());
+		policyItem.getUsers().addAll(grantRequestObj.getUsers());
+		for (String accessType : grantRequestObj.getAccessTypes()) {
+			policyItem.getAccesses().add(new RangerPolicyItemAccess(accessType, Boolean.TRUE));
+		}
+		createPolicy.getPolicyItems().add(policyItem);
+		createPolicy.setZoneName(null);
+
+		List<String> grantColumns = (List<String>) resource.getValue("column");
+		Map<String, RangerPolicyResource> policyResourceMap = createPolicy.getResources();
+		List<String> createdPolicyColumns = policyResourceMap.get("column").getValues();
+
+		Assert.assertTrue(createdPolicyColumns.containsAll(grantColumns));
+
+		Mockito.when(
+						serviceUtil.isValidateHttpsAuthentication(serviceName, request))
+				.thenReturn(false);
+		RESTResponse restResponse = serviceREST.grantAccess(serviceName,
+				grantRequestObj, request);
+		Assert.assertNotNull(restResponse);
+		Mockito.verify(serviceUtil).isValidateHttpsAuthentication(serviceName,
+				request);
+	}
+
+	@Test
 	public void test15revokeAccess() throws Exception {
 		HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
 		String serviceName = "HDFS_1";