RANGER-2918:Ranger Audit for HBase does not capture predicates
diff --git a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseConstants.java b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseConstants.java
new file mode 100644
index 0000000..22c6983
--- /dev/null
+++ b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.hbase;
+
+public class HbaseConstants {
+    public static final String HBASE_META_TABLE = "hbase:meta";
+    public static final String SCAN             = "scan";
+    public static final String GET              = "get";
+    public static final String SPACE            = " ";
+    public static final String COMMA            = ",";
+    public static final String OPEN_BRACES      = "{";
+    public static final String CLOSED_BRACES    = "}";
+    public static final String STARTROW         = "startRow";
+    public static final String STOPROW          = "stopRow";
+    public static final String FILTER           = "filter";
+    public static final String INFO             = "info";
+    public static final String ROW              = "row";
+    public static final String FAMILIES         = "families";
+    public static final String SINGLE_QUOTES    = "'";
+    public static final String ARROW            = "=>";
+}
diff --git a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
index c50a192..ec6dfdd 100644
--- a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
+++ b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
@@ -26,6 +26,8 @@
 import com.google.protobuf.Message;
 import com.google.protobuf.Service;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -309,7 +311,7 @@
 	}
 	
 	ColumnFamilyAccessResult evaluateAccess(ObserverContext<?> ctx, String operation, Action action, final RegionCoprocessorEnvironment env,
-											final Map<byte[], ? extends Collection<?>> familyMap) throws AccessDeniedException {
+											final Map<byte[], ? extends Collection<?>> familyMap, String commandStr) throws AccessDeniedException {
 
 		String access = _authUtils.getAccess(action);
 		User user = getActiveUser(ctx);
@@ -345,6 +347,7 @@
 		HbaseAuditHandler auditHandler = _factory.getAuditHandler();
 		AuthorizationSession session = new AuthorizationSession(hbasePlugin)
 				.operation(operation)
+				.otherInformation(commandStr)
 				.remoteAddress(getRemoteAddress())
 				.auditHandler(auditHandler)
 				.user(user)
@@ -525,7 +528,7 @@
 		return result;
 	}
 
-	Filter authorizeAccess(ObserverContext<?> ctx, String operation, Action action, final RegionCoprocessorEnvironment env, final Map<byte[], NavigableSet<byte[]>> familyMap) throws AccessDeniedException {
+	Filter authorizeAccess(ObserverContext<?> ctx, String operation, Action action, final RegionCoprocessorEnvironment env, final Map<byte[], NavigableSet<byte[]>> familyMap, String commandStr) throws AccessDeniedException {
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> authorizeAccess");
@@ -535,7 +538,7 @@
 		try {
 			perf = RangerPerfTracer.getPerfTracer(PERF_HBASEAUTH_REQUEST_LOG, "RangerAuthorizationCoprocessor.authorizeAccess(request=Operation[" + operation + "]");
 
-			ColumnFamilyAccessResult accessResult = evaluateAccess(ctx, operation, action, env, familyMap);
+			ColumnFamilyAccessResult accessResult = evaluateAccess(ctx, operation, action, env, familyMap, commandStr);
 			RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler(hbasePlugin.getConfig());
 			if (accessResult._everythingIsAccessible) {
 				auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
@@ -578,7 +581,7 @@
 			if (RangerPerfTracer.isPerfTraceEnabled(PERF_HBASEAUTH_REQUEST_LOG)) {
 				perf = RangerPerfTracer.getPerfTracer(PERF_HBASEAUTH_REQUEST_LOG, "RangerAuthorizationCoprocessor.requirePermission(request=Operation[" + operation + "]");
 			}
-			ColumnFamilyAccessResult accessResult = evaluateAccess(ctx, operation, action, regionServerEnv, familyMap);
+			ColumnFamilyAccessResult accessResult = evaluateAccess(ctx, operation, action, regionServerEnv, familyMap, null);
 			RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler(hbasePlugin.getConfig());
 			if (accessResult._everythingIsAccessible) {
 				auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
@@ -988,13 +991,16 @@
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> preScannerOpen");
 		}
-
+		String commandStr = null;
 		try {
 			RegionCoprocessorEnvironment e = c.getEnvironment();
 
 			Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
-			String operation = "scannerOpen";
-			Filter filter = authorizeAccess(c, operation, Action.READ, e, familyMap);
+			String operation    = "scannerOpen";
+			byte[] tableName    = getTableName(e);
+			String tableNameStr = tableName != null ?  new String(tableName):" ";
+			commandStr          = getCommandString(HbaseConstants.SCAN, tableNameStr, scan.toMap());
+			Filter filter       = authorizeAccess(c, operation, Action.READ, e, familyMap, commandStr);
 			if (filter == null) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("preScannerOpen: Access allowed for all families/column.  No filter added");
@@ -1009,7 +1015,7 @@
 			}
 		} finally {
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("<== preScannerOpen");
+				LOG.debug("<== preScannerOpen: commandStr: " + commandStr);
 			}
 		}
 	}
@@ -1128,12 +1134,16 @@
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> preGetOp");
 		}
+		String commandStr = null;
 		try {
 			RegionCoprocessorEnvironment e = rEnv.getEnvironment();
 			Map<byte[], NavigableSet<byte[]>> familyMap = get.getFamilyMap();
 
-			String operation = "get";
-			Filter filter = authorizeAccess(rEnv, operation, Action.READ, e, familyMap);
+			String operation    = "get";
+			byte[] tableName    = getTableName(e);
+			String tableNameStr = tableName != null ?  new String(tableName):" ";
+			commandStr          = getCommandString(HbaseConstants.GET, tableNameStr, get.toMap());
+			Filter filter       = authorizeAccess(rEnv, operation, Action.READ, e, familyMap, commandStr);
 			if (filter == null) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("preGetOp: all access allowed, no filter returned");
@@ -1148,7 +1158,7 @@
 			}
 		} finally {
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("<== preGetOp");
+				LOG.debug("<== preGetOp: commandStr: " + commandStr );
 			}
 		}
 	}
@@ -1645,6 +1655,108 @@
 
 		return ret;
 	}
+	private String getCommandString(String operationName, String tableNameStr, Map<String,Object> opMetaData) {
+		StringBuilder ret = new StringBuilder();
+		if (!HbaseConstants.HBASE_META_TABLE.equals(tableNameStr)) {
+			ret.append(operationName);
+			ret.append(HbaseConstants.SPACE);
+			ret.append(tableNameStr).append(HbaseConstants.COMMA).append(HbaseConstants.SPACE);
+			ret.append(getPredicates(operationName, opMetaData));
+		}
+		return ret.toString();
+	}
+
+	private String getPredicates(String operationName, Map<String,Object> opMetaData) {
+		StringBuilder ret = new StringBuilder();
+
+		if (MapUtils.isNotEmpty(opMetaData)) {
+			HashMap<String, ArrayList<?>> families    = (HashMap<String, ArrayList<?>>) opMetaData.get(HbaseConstants.FAMILIES);
+			String 						  startRowVal = (String) opMetaData.get(HbaseConstants.STARTROW);
+			String                        stopRowVal  = (String) opMetaData.get(HbaseConstants.STOPROW);
+			String                        filterVal   = (String) opMetaData.get(HbaseConstants.FILTER);
+			String                        rowVal      = (String) opMetaData.get(HbaseConstants.ROW);
+
+			if (!isQueryforInfo(families)) {
+				ret.append(HbaseConstants.OPEN_BRACES);
+				if (HbaseConstants.SCAN.equals(operationName)) {
+					if (StringUtils.isNotEmpty(startRowVal)) {
+						ret.append(formatPredicate(ret, PredicateType.STARTROW, startRowVal));
+					}
+					if (StringUtils.isNotEmpty(stopRowVal)) {
+						ret.append(formatPredicate(ret, PredicateType.STOPROW, stopRowVal));
+					}
+				} else {
+					if(StringUtils.isNotEmpty(rowVal)) {
+						ret.append(formatPredicate(ret, PredicateType.ROW, rowVal));
+					}
+				}
+				if (StringUtils.isNotEmpty(filterVal)) {
+					ret.append(formatPredicate(ret, PredicateType.FILTER, filterVal));
+				}
+				if (MapUtils.isNotEmpty(families)) {
+					String colfamily = families.toString();
+					ret.append(formatPredicate(ret, PredicateType.COLUMNS, colfamily));
+				}
+				ret.append(HbaseConstants.SPACE).append(HbaseConstants.CLOSED_BRACES);
+			}
+		}
+		return ret.toString();
+	}
+
+	private boolean isQueryforInfo(HashMap<String, ArrayList<?>> families) {
+		boolean ret = false;
+		for(HashMap.Entry family : families.entrySet()) {
+			String familyKey = (String) family.getKey();
+			if (HbaseConstants.INFO.equals(familyKey)) {
+				ret = true;
+				break;
+			}
+		}
+		return ret;
+	}
+
+	private String formatPredicate(StringBuilder commandStr, PredicateType predicateType, String val) {
+		StringBuilder ret = new StringBuilder();
+		if (HbaseConstants.OPEN_BRACES.equals(commandStr.toString())) {
+			ret.append(HbaseConstants.SPACE);
+		} else {
+			ret.append(HbaseConstants.COMMA).append(HbaseConstants.SPACE);
+		}
+		ret.append(buildPredicate(predicateType, val));
+		return ret.toString();
+	}
+
+	private String buildPredicate(PredicateType predicateType, String val) {
+		StringBuilder ret = new StringBuilder();
+		switch (predicateType) {
+			case STARTROW:
+				ret.append(PredicateType.STARTROW.name().toUpperCase());
+				ret.append(HbaseConstants.ARROW);
+				ret.append(HbaseConstants.SINGLE_QUOTES).append(val).append(HbaseConstants.SINGLE_QUOTES);
+				break;
+			case STOPROW:
+				ret.append(PredicateType.STOPROW.name().toUpperCase());
+				ret.append(HbaseConstants.ARROW);
+				ret.append(HbaseConstants.SINGLE_QUOTES).append(val).append(HbaseConstants.SINGLE_QUOTES);
+				break;
+			case FILTER:
+				ret.append(PredicateType.FILTER.name().toUpperCase());
+				ret.append(HbaseConstants.ARROW);
+				ret.append(HbaseConstants.SINGLE_QUOTES).append(val).append(HbaseConstants.SINGLE_QUOTES);
+				break;
+			case COLUMNS:
+				ret.append(PredicateType.COLUMNS.name().toUpperCase());
+				ret.append(HbaseConstants.ARROW);
+				ret.append(HbaseConstants.SINGLE_QUOTES).append(val).append(HbaseConstants.SINGLE_QUOTES);
+				break;
+			case ROW:
+				ret.append(val);
+				break;
+		}
+		return ret.toString();
+	}
+
+	enum PredicateType {STARTROW, STOPROW, FILTER, COLUMNS, ROW};
 }