blob: f77ab9dea4068edf7d4b705826f4942402c79f77 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.authorization.ranger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.authorization.Authorizable;
import org.apache.impala.authorization.Authorizable.Type;
import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.authorization.AuthorizationConfig;
import org.apache.impala.authorization.AuthorizationContext;
import org.apache.impala.authorization.AuthorizationException;
import org.apache.impala.authorization.BaseAuthorizationChecker;
import org.apache.impala.authorization.DefaultAuthorizableFactory;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.authorization.PrivilegeRequest;
import org.apache.impala.authorization.User;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TSessionState;
import org.apache.impala.util.EventSequence;
import org.apache.ranger.audit.model.AuthzAuditEvent;
import org.apache.ranger.plugin.model.RangerPolicy;
import org.apache.ranger.plugin.model.RangerServiceDef;
import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* An implementation of {@link AuthorizationChecker} that uses Ranger.
*
* The Ranger implementation does not use catalog to cache the authorization policy.
* Ranger plugin uses its own cache.
*/
public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
private static final Logger LOG = LoggerFactory.getLogger(
RangerAuthorizationChecker.class);
// These are Ranger access types (privileges).
public static final String UPDATE_ACCESS_TYPE = "update";
public static final String SELECT_ACCESS_TYPE = "select";
private final RangerImpalaPlugin plugin_;
public RangerAuthorizationChecker(AuthorizationConfig authzConfig) {
super(authzConfig);
Preconditions.checkArgument(authzConfig instanceof RangerAuthorizationConfig);
RangerAuthorizationConfig rangerConfig = (RangerAuthorizationConfig) authzConfig;
plugin_ = new RangerImpalaPlugin(
rangerConfig.getServiceType(), rangerConfig.getAppId());
plugin_.init();
}
@Override
protected boolean authorizeResource(AuthorizationContext authzCtx, User user,
PrivilegeRequest request) throws InternalException {
Preconditions.checkArgument(authzCtx instanceof RangerAuthorizationContext);
Preconditions.checkNotNull(user);
Preconditions.checkNotNull(request);
RangerAuthorizationContext rangerAuthzCtx = (RangerAuthorizationContext) authzCtx;
List<RangerAccessResourceImpl> resources = new ArrayList<>();
Authorizable authorizable = request.getAuthorizable();
switch (authorizable.getType()) {
case SERVER:
// Hive service definition does not have a concept of server. So we define
// server to mean all access to all resource sets.
resources.add(new RangerImpalaResourceBuilder()
.database("*").table("*").column("*").build());
resources.add(new RangerImpalaResourceBuilder()
.database("*").function("*").build());
resources.add(new RangerImpalaResourceBuilder().uri("*").build());
break;
case DB:
resources.add(new RangerImpalaResourceBuilder()
.database(authorizable.getDbName())
.owner(authorizable.getOwnerUser())
.build());
break;
case TABLE:
resources.add(new RangerImpalaResourceBuilder()
.database(authorizable.getDbName())
.table(authorizable.getTableName())
.owner(authorizable.getOwnerUser())
.build());
break;
case COLUMN:
RangerImpalaResourceBuilder builder = new RangerImpalaResourceBuilder();
builder.database(authorizable.getDbName());
// * in Ranger means "all". For example, to check access for all columns, we need
// to create a request, such as:
// [server=server1, database=foo, table=bar, column=*]
//
// "Any" column access is special in Ranger. For example if we want to check if
// we have access to "any" column on a particular table, we need to build a
// request without the column defined in the resource and use a special
// ANY_ACCESS access type.
//
// For example any column in foo.bar table:
// access type: RangerPolicyEngine.ANY_ACCESS
// resources: [server=server1, database=foo, table=bar]
if (request.getPrivilege() != Privilege.ANY ||
!DefaultAuthorizableFactory.ALL.equals(authorizable.getTableName())) {
builder.table(authorizable.getTableName());
}
if (request.getPrivilege() != Privilege.ANY ||
!DefaultAuthorizableFactory.ALL.equals(authorizable.getColumnName())) {
builder.column(authorizable.getColumnName());
}
builder.owner(authorizable.getOwnerUser());
resources.add(builder.build());
break;
case FUNCTION:
resources.add(new RangerImpalaResourceBuilder()
.database(authorizable.getDbName())
.function(authorizable.getFnName())
.build());
break;
case URI:
resources.add(new RangerImpalaResourceBuilder()
.uri(authorizable.getName())
.build());
break;
default:
throw new IllegalArgumentException(String.format("Invalid authorizable type: %s",
authorizable.getType()));
}
for (RangerAccessResourceImpl resource: resources) {
if (request.getPrivilege() == Privilege.ANY) {
if (!authorizeResource(rangerAuthzCtx, resource, user, request.getPrivilege(),
((RangerAuthorizationContext) authzCtx).getAuditHandler())) {
return false;
}
} else {
boolean authorized = request.getPrivilege().hasAnyOf() ?
authorizeAny(rangerAuthzCtx, resource, user, request.getPrivilege()) :
authorizeAll(rangerAuthzCtx, resource, user, request.getPrivilege());
if (!authorized) {
return false;
}
}
}
return true;
}
@Override
public void postAuthorize(AuthorizationContext authzCtx) {
Preconditions.checkArgument(authzCtx instanceof RangerAuthorizationContext);
super.postAuthorize(authzCtx);
RangerBufferAuditHandler auditHandler =
((RangerAuthorizationContext) authzCtx).getAuditHandler();
auditHandler.flush();
}
@Override
protected void authorizeRowFilterAndColumnMask(User user,
List<PrivilegeRequest> privilegeRequests)
throws AuthorizationException, InternalException {
for (PrivilegeRequest request : privilegeRequests) {
if (request.getAuthorizable().getType() == Type.TABLE) {
authorizeRowFilter(user,
request.getAuthorizable().getDbName(),
request.getAuthorizable().getTableName());
}
}
}
@Override
public void invalidateAuthorizationCache() {
long startTime = System.currentTimeMillis();
try {
plugin_.refreshPoliciesAndTags();
} finally {
LOG.debug("Refreshing Ranger policies took {} ms",
(System.currentTimeMillis() - startTime));
}
}
@Override
public AuthorizationContext createAuthorizationContext(boolean doAudits,
String sqlStmt, TSessionState sessionState, Optional<EventSequence> timeline)
throws InternalException {
RangerAuthorizationContext authzCtx =
new RangerAuthorizationContext(sessionState, timeline);
if (doAudits) {
// Any statement that goes through {@link authorize} will need to have audit logs.
if (sqlStmt != null) {
authzCtx.setAuditHandler(new RangerBufferAuditHandler(
sqlStmt, plugin_.getClusterName(),
sessionState.getNetwork_address().getHostname()));
} else {
authzCtx.setAuditHandler(new RangerBufferAuditHandler());
}
}
return authzCtx;
}
@Override
protected void authorizeTableAccess(AuthorizationContext authzCtx,
AnalysisResult analysisResult, FeCatalog catalog, List<PrivilegeRequest> requests)
throws AuthorizationException, InternalException {
RangerAuthorizationContext originalCtx = (RangerAuthorizationContext) authzCtx;
RangerBufferAuditHandler originalAuditHandler = originalCtx.getAuditHandler();
// case 1: table (select) OK --> add the table event
// case 2: table (non-select) ERROR --> add the table event
// case 3: table (select) ERROR, columns (select) OK -> only add the column events
// case 4: table (select) ERROR, columns (select) ERROR --> only add the first column
// event
RangerAuthorizationContext tmpCtx = new RangerAuthorizationContext(
originalCtx.getSessionState(), originalCtx.getTimeline());
tmpCtx.setAuditHandler(new RangerBufferAuditHandler(originalAuditHandler));
try {
super.authorizeTableAccess(tmpCtx, analysisResult, catalog, requests);
} catch (AuthorizationException e) {
tmpCtx.getAuditHandler().getAuthzEvents().stream()
.filter(evt ->
// case 2: get the first failing non-select table
(!"select".equalsIgnoreCase(evt.getAccessType()) &&
"@table".equals(evt.getResourceType())) ||
// case 4: get the first failing column
("@column".equals(evt.getResourceType()) && evt.getAccessResult() == 0))
.findFirst()
.ifPresent(evt -> originalCtx.getAuditHandler().getAuthzEvents().add(evt));
throw e;
} finally {
// case 1 & 4: we only add the successful events. The first table-level access
// check is only for the short-circuit, we don't want to add an event for that.
List<AuthzAuditEvent> events = tmpCtx.getAuditHandler().getAuthzEvents().stream()
.filter(evt -> evt.getAccessResult() != 0)
.collect(Collectors.toList());
originalCtx.getAuditHandler().getAuthzEvents().addAll(events);
}
}
@Override
public boolean needsMaskingOrFiltering(User user, String dbName, String tableName,
List<String> requiredColumns) throws InternalException {
for (String column: requiredColumns) {
if (evalColumnMask(user, dbName, tableName, column).isMaskEnabled()) {
return true;
}
}
return false;
}
@Override
public String createColumnMask(User user, String dbName, String tableName,
String columnName) throws InternalException {
RangerAccessResult accessResult = evalColumnMask(user, dbName, tableName,
columnName);
// No column masking policies, return the original column.
if (!accessResult.isMaskEnabled()) return columnName;
String maskType = accessResult.getMaskType();
RangerServiceDef.RangerDataMaskTypeDef maskTypeDef = accessResult.getMaskTypeDef();
Preconditions.checkNotNull(maskType);
// The expression used to replace the original column.
String maskedColumn = columnName;
// The expression of the mask type. Column names are referenced by "{col}".
// Transformer examples for the builtin mask types:
// mask type transformer
// MASK mask({col})
// MASK_SHOW_LAST_4 mask_show_last_n({col}, 4, 'x', 'x', 'x', -1, '1')
// MASK_SHOW_FIRST_4 mask_show_first_n({col}, 4, 'x', 'x', 'x', -1, '1')
// MASK_HASH mask_hash({col})
// MASK_DATE_SHOW_YEAR mask({col}, 'x', 'x', 'x', -1, '1', 1, 0, -1)
String transformer = null;
if (maskTypeDef != null) {
transformer = maskTypeDef.getTransformer();
}
if (StringUtils.equalsIgnoreCase(maskType, RangerPolicy.MASK_TYPE_NULL)) {
maskedColumn = "NULL";
} else if (StringUtils.equalsIgnoreCase(maskType, RangerPolicy.MASK_TYPE_CUSTOM)) {
String maskedValue = accessResult.getMaskedValue();
if (maskedValue == null) {
maskedColumn = "NULL";
} else {
maskedColumn = maskedValue.replace("{col}", columnName);
}
} else if (StringUtils.isNotEmpty(transformer)) {
maskedColumn = transformer.replace("{col}", columnName);
}
LOG.info("dbName: {}, tableName: {}, column: {}, maskType: {}, columnTransformer: {}",
dbName, tableName, columnName, maskType, maskedColumn);
return maskedColumn;
}
/**
* Evaluate column masking policies on the given column and returns the result.
* A RangerAccessResult contains the matched policy details and the masked column.
*/
private RangerAccessResult evalColumnMask(User user, String dbName,
String tableName, String columnName) throws InternalException {
RangerAccessResourceImpl resource = new RangerImpalaResourceBuilder()
.database(dbName)
.table(tableName)
.column(columnName)
.build();
RangerAccessRequest req = new RangerAccessRequestImpl(resource,
SELECT_ACCESS_TYPE, user.getShortName(), getUserGroups(user));
return plugin_.evalDataMaskPolicies(req, null);
}
/**
* This method checks if row filter is enabled on the given tables and deny access
* when row filter is enabled by throwing an {@link AuthorizationException} . This is
* to prevent data leak when Hive has row filter enabled but not in Impala.
*/
private void authorizeRowFilter(User user, String dbName, String tableName)
throws InternalException, AuthorizationException {
RangerAccessResourceImpl resource = new RangerImpalaResourceBuilder()
.database(dbName)
.table(tableName)
.build();
RangerAccessRequest req = new RangerAccessRequestImpl(resource,
SELECT_ACCESS_TYPE, user.getShortName(), getUserGroups(user));
if (plugin_.evalRowFilterPolicies(req, null).isRowFilterEnabled()) {
throw new AuthorizationException(String.format(
"Impala does not support row filtering yet. Row filtering is enabled " +
"on table: %s.%s", dbName, tableName));
}
}
@Override
public Set<String> getUserGroups(User user) throws InternalException {
UserGroupInformation ugi;
if (RuntimeEnv.INSTANCE.isTestEnv()) {
ugi = UserGroupInformation.createUserForTesting(user.getShortName(),
new String[]{user.getShortName()});
} else {
ugi = UserGroupInformation.createRemoteUser(user.getShortName());
}
return new HashSet<>(ugi.getGroups());
}
private boolean authorizeAny(RangerAuthorizationContext authzCtx,
RangerAccessResourceImpl resource, User user, Privilege privilege)
throws InternalException {
boolean authorized = false;
RangerBufferAuditHandler originalAuditHandler = authzCtx.getAuditHandler();
// Use a temporary audit handler instead of the original audit handler
// so that we know all the audit events generated by the temporary audit
// handler are contained to the given resource.
RangerBufferAuditHandler tmpAuditHandler = originalAuditHandler == null ?
null : new RangerBufferAuditHandler(originalAuditHandler);
for (Privilege impliedPrivilege: privilege.getImpliedPrivileges()) {
if (authorizeResource(authzCtx, resource, user, impliedPrivilege,
tmpAuditHandler)) {
authorized = true;
break;
}
}
if (originalAuditHandler != null) {
updateAuditEvents(tmpAuditHandler, originalAuditHandler, true /*any*/,
privilege);
}
return authorized;
}
private boolean authorizeAll(RangerAuthorizationContext authzCtx,
RangerAccessResourceImpl resource, User user, Privilege privilege)
throws InternalException {
boolean authorized = true;
RangerBufferAuditHandler originalAuditHandler = authzCtx.getAuditHandler();
// Use a temporary audit handler instead of the original audit handler
// so that we know all the audit events generated by the temporary audit
// handler are contained to the given resource.
RangerBufferAuditHandler tmpAuditHandler = originalAuditHandler == null ?
null : new RangerBufferAuditHandler(originalAuditHandler);
for (Privilege impliedPrivilege: privilege.getImpliedPrivileges()) {
if (!authorizeResource(authzCtx, resource, user, impliedPrivilege,
tmpAuditHandler)) {
authorized = false;
break;
}
}
if (originalAuditHandler != null) {
updateAuditEvents(tmpAuditHandler, originalAuditHandler, false /*not any*/,
privilege);
}
return authorized;
}
/**
* Updates the {@link AuthzAuditEvent} from the source to the destination
* {@link RangerBufferAuditHandler} with the given privilege.
*
* For example we want to log:
* VIEW_METADATA - Allowed
* vs
* INSERT - Denied, INSERT - Allowed.
*/
private static void updateAuditEvents(RangerBufferAuditHandler source,
RangerBufferAuditHandler dest, boolean isAny, Privilege privilege) {
// A custom audit event to log the actual privilege instead of the implied
// privileges.
AuthzAuditEvent newAuditEvent = null;
for (AuthzAuditEvent auditEvent : source.getAuthzEvents()) {
if (auditEvent.getAccessResult() == (isAny ? 1 : 0)) {
newAuditEvent = auditEvent;
break; // break if at least one is a success.
} else {
newAuditEvent = auditEvent;
}
}
if (newAuditEvent != null) {
newAuditEvent.setAccessType(privilege.name().toLowerCase());
dest.getAuthzEvents().add(newAuditEvent);
}
}
private boolean authorizeResource(RangerAuthorizationContext authzCtx,
RangerAccessResourceImpl resource, User user, Privilege privilege,
RangerBufferAuditHandler auditHandler) throws InternalException {
String accessType;
if (privilege == Privilege.ANY) {
accessType = RangerPolicyEngine.ANY_ACCESS;
} else if (privilege == Privilege.INSERT) {
// Ranger plugin for Hive considers INSERT to be UPDATE.
accessType = UPDATE_ACCESS_TYPE;
} else {
accessType = privilege.name().toLowerCase();
}
RangerAccessRequestImpl request = new RangerAccessRequestImpl(resource,
accessType, user.getShortName(), getUserGroups(user));
request.setClusterName(plugin_.getClusterName());
if (authzCtx.getSessionState() != null) {
request.setClientIPAddress(
authzCtx.getSessionState().getNetwork_address().getHostname());
}
RangerAccessResult authorized = plugin_.isAccessAllowed(request, auditHandler);
return authorized != null && authorized.getIsAllowed();
}
@VisibleForTesting
public RangerImpalaPlugin getRangerImpalaPlugin() { return plugin_; }
}