blob: 96a36abe97d6a5543e65b5bc68d5454f1234bcfa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.authorization.kafka.authorizer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.service.RangerBasePlugin;
import org.apache.ranger.plugin.util.RangerPerfTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RangerKafkaAuthorizer implements Authorizer {
private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
public static final String KEY_TOPIC = "topic";
public static final String KEY_CLUSTER = "cluster";
public static final String KEY_CONSUMER_GROUP = "consumergroup";
public static final String KEY_TRANSACTIONALID = "transactionalid";
public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
public static final String ACCESS_TYPE_READ = "consume";
public static final String ACCESS_TYPE_WRITE = "publish";
public static final String ACCESS_TYPE_CREATE = "create";
public static final String ACCESS_TYPE_DELETE = "delete";
public static final String ACCESS_TYPE_CONFIGURE = "configure";
public static final String ACCESS_TYPE_DESCRIBE = "describe";
public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
private static volatile RangerBasePlugin rangerPlugin = null;
RangerKafkaAuditHandler auditHandler = null;
public RangerKafkaAuthorizer() {
}
private static String mapToRangerAccessType(AclOperation operation) {
switch (operation) {
case READ:
return ACCESS_TYPE_READ;
case WRITE:
return ACCESS_TYPE_WRITE;
case ALTER:
return ACCESS_TYPE_CONFIGURE;
case DESCRIBE:
return ACCESS_TYPE_DESCRIBE;
case CLUSTER_ACTION:
return ACCESS_TYPE_CLUSTER_ACTION;
case CREATE:
return ACCESS_TYPE_CREATE;
case DELETE:
return ACCESS_TYPE_DELETE;
case DESCRIBE_CONFIGS:
return ACCESS_TYPE_DESCRIBE_CONFIGS;
case ALTER_CONFIGS:
return ACCESS_TYPE_ALTER_CONFIGS;
case IDEMPOTENT_WRITE:
return ACCESS_TYPE_IDEMPOTENT_WRITE;
case UNKNOWN:
case ANY:
case ALL:
default:
return null;
}
}
private static String mapToResourceType(ResourceType resourceType) {
switch (resourceType) {
case TOPIC:
return KEY_TOPIC;
case CLUSTER:
return KEY_CLUSTER;
case GROUP:
return KEY_CONSUMER_GROUP;
case TRANSACTIONAL_ID:
return KEY_TRANSACTIONALID;
case DELEGATION_TOKEN:
return KEY_DELEGATIONTOKEN;
case ANY:
case UNKNOWN:
default:
return null;
}
}
private static RangerAccessResourceImpl createRangerAccessResource(String resourceTypeKey, String resourceName) {
RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
rangerResource.setValue(resourceTypeKey, resourceName);
return rangerResource;
}
private static RangerAccessRequestImpl createRangerAccessRequest(String userName,
Set<String> userGroups,
String ip,
Date eventTime,
String resourceTypeKey,
String resourceName,
String accessType) {
RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, resourceName));
rangerRequest.setUser(userName);
rangerRequest.setUserGroups(userGroups);
rangerRequest.setClientIPAddress(ip);
rangerRequest.setAccessTime(eventTime);
rangerRequest.setAccessType(accessType);
rangerRequest.setAction(accessType);
rangerRequest.setRequestData(resourceName);
return rangerRequest;
}
private static List<AuthorizationResult> denyAll(List<Action> actions) {
return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
}
private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
if (CollectionUtils.isEmpty(results)) {
logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
return denyAll(actions);
}
return results.stream()
.map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
.collect(Collectors.toList());
}
private static String toString(AuthorizableRequestContext requestContext) {
return requestContext == null ? null :
String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}",
requestContext.principal(), requestContext.clientAddress(), requestContext.clientId());
}
@Override
public void close() {
logger.info("close() called on authorizer.");
try {
if (rangerPlugin != null) {
rangerPlugin.cleanup();
}
} catch (Throwable t) {
logger.error("Error closing RangerPlugin.", t);
}
}
@Override
public void configure(Map<String, ?> configs) {
RangerBasePlugin me = rangerPlugin;
if (me == null) {
synchronized (RangerKafkaAuthorizer.class) {
me = rangerPlugin;
if (me == null) {
try {
// Possible to override JAAS configuration which is used by Ranger, otherwise
// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
// if it's not defined, then it reverts to 'KafkaServer' configuration.
final Object jaasContext = configs.get("ranger.jaas.context");
final String listenerName = (jaasContext instanceof String
&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
: SecurityProtocol.SASL_PLAINTEXT.name();
final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
MiscUtil.setUGIFromJAASConfig(context.name());
UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
logger.info("LoginUser={}", loginUser);
} catch (Throwable t) {
logger.error("Error getting principal.", t);
}
rangerPlugin = new RangerBasePlugin("kafka", "kafka");
logger.info("Calling plugin.init()");
rangerPlugin.init();
auditHandler = new RangerKafkaAuditHandler();
rangerPlugin.setResultProcessor(auditHandler);
}
}
}
}
@Override
public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
return serverInfo.endpoints().stream()
.collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
}
@Override
public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
if (rangerPlugin == null) {
MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
return denyAll(actions);
}
RangerPerfTracer perf = null;
if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
}
try {
return wrappedAuthorization(requestContext, actions);
} finally {
RangerPerfTracer.log(perf);
}
}
private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
if (CollectionUtils.isEmpty(actions)) {
return Collections.emptyList();
}
String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
String hostAddress = requestContext.clientAddress() == null ? null : requestContext.clientAddress().getHostAddress();
String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
Date eventTime = new Date();
List<RangerAccessRequest> rangerRequests = new ArrayList<>();
for (Action action : actions) {
String accessType = mapToRangerAccessType(action.operation());
if (accessType == null) {
MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type, requestContext=" + toString(requestContext) +
", actions=" + actions + ", operation=" + action.operation());
return denyAll(actions);
}
String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
if (resourceTypeKey == null) {
MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type, requestContext=" + toString(requestContext) +
", actions=" + actions + ", resourceType=" + action.resourcePattern().resourceType());
return denyAll(actions);
}
RangerAccessRequestImpl rangerAccessRequest = createRangerAccessRequest(
userName,
userGroups,
ip,
eventTime,
resourceTypeKey,
action.resourcePattern().name(),
accessType);
rangerRequests.add(rangerAccessRequest);
}
Collection<RangerAccessResult> results = callRangerPlugin(rangerRequests);
List<AuthorizationResult> authorizationResults = mapResults(actions, results);
logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
return authorizationResults;
}
private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
try {
return rangerPlugin.isAccessAllowed(rangerRequests);
} catch (Throwable t) {
logger.error("Error while calling isAccessAllowed(). requests={}", rangerRequests, t);
return null;
} finally {
auditHandler.flushAudit();
}
}
@Override
public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
logger.error("createAcls is not supported by Ranger for Kafka");
return aclBindings.stream()
.map(ab -> {
CompletableFuture<AclCreateResult> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(new UnsupportedOperationException("createAcls is not supported by Ranger for Kafka"));
return completableFuture;
})
.collect(Collectors.toList());
}
@Override
public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
logger.error("deleteAcls is not supported by Ranger for Kafka");
return aclBindingFilters.stream()
.map(ab -> {
CompletableFuture<AclDeleteResult> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(new UnsupportedOperationException("deleteAcls is not supported by Ranger for Kafka"));
return completableFuture;
})
.collect(Collectors.toList());
}
// TODO: provide a real implementation (RANGER-3809)
// Currently we return a dummy implementation because KAFKA-13598 makes producers idempotent by default and this causes
// a failure in the InitProducerId API call on the broker side because of the missing acls() method implementation.
// Overriding this with a dummy impl will make Kafka return an authorization error instead of an exception if the
// IDEMPOTENT_WRITE permission wasn't set on the producer.
@Override
public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
logger.debug("authorizeByResourceType call is not supported by Ranger for Kafka yet");
return AuthorizationResult.DENIED;
}
@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
logger.error("(getting) acls is not supported by Ranger for Kafka");
throw new UnsupportedOperationException("(getting) acls is not supported by Ranger for Kafka");
}
}