blob: 7383e50d8192c5c2906c7b1ba144d42f605a49e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.policy.kafka;
import static org.apache.sentry.policy.common.PolicyConstants.AUTHORIZABLE_SPLITTER;
import static org.apache.sentry.policy.common.PolicyConstants.PRIVILEGE_PREFIX;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.sentry.core.model.kafka.KafkaActionFactory;
import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.policy.common.PrivilegeValidator;
import org.apache.sentry.policy.common.PrivilegeValidatorContext;
import org.apache.shiro.config.ConfigurationException;
import com.google.common.collect.Lists;
/**
* Validator for Kafka privileges.
* Below are the requirements for a kafka privilege to be valid.
* 1. Privilege must start with Host resource.
* 2. Privilege must have at most one non Host resource, Cluster or Topic or ConsumerGroup, followed
* by Host resource.
* 3. Privilege must end with exactly one action.
*/
public class KafkaPrivilegeValidator implements PrivilegeValidator {
public static final String KafkaPrivilegeHelpMsg =
"Invalid Kafka privilege." +
" Kafka privilege must be of the form host=<HOST>-><RESOURCE>=<RESOURCE_NAME>->action=<ACTION>," +
" where <HOST> can be '*' or any valid host name," +
" <RESOURCE> can be one of " + Arrays.toString(getKafkaAuthorizablesExceptHost()) +
" <RESOURCE_NAME> is name of the resource," +
" <ACTION> can be one of " + Arrays.toString(KafkaActionFactory.KafkaActionType.values()) +
".";
private static KafkaAuthorizable.AuthorizableType[] getKafkaAuthorizablesExceptHost() {
final KafkaAuthorizable.AuthorizableType[] authorizableTypes = KafkaAuthorizable.AuthorizableType.values();
List<KafkaAuthorizable.AuthorizableType> authorizableTypesWithoutHost = new ArrayList<>(authorizableTypes.length - 1);
for (KafkaAuthorizable.AuthorizableType authorizableType: authorizableTypes) {
if (!authorizableType.equals(KafkaAuthorizable.AuthorizableType.HOST)) {
authorizableTypesWithoutHost.add(authorizableType);
}
}
return authorizableTypesWithoutHost.toArray(new KafkaAuthorizable.AuthorizableType[authorizableTypesWithoutHost.size()]);
}
public KafkaPrivilegeValidator() {
}
@Override
public void validate(PrivilegeValidatorContext context) throws ConfigurationException {
List<String> splits = Lists.newArrayList();
for (String section : AUTHORIZABLE_SPLITTER.split(context.getPrivilege())) {
splits.add(section);
}
// Check privilege splits length is 2 or 3
if (splits.size() < 2 || splits.size() > 3) {
throw new ConfigurationException(KafkaPrivilegeHelpMsg);
}
// Check privilege starts with Host resource
if (isAction(splits.get(0))) {
throw new ConfigurationException("Kafka privilege can not start with an action.\n" + KafkaPrivilegeHelpMsg);
}
KafkaAuthorizable hostAuthorizable = KafkaModelAuthorizables.from(splits.get(0));
if (hostAuthorizable == null) {
throw new ConfigurationException("No Kafka authorizable found for " + splits.get(0) + "\n." + KafkaPrivilegeHelpMsg);
}
if (!(hostAuthorizable instanceof Host)) {
throw new ConfigurationException("Kafka privilege must begin with host authorizable.\n" + KafkaPrivilegeHelpMsg);
}
// Check privilege has at most one non Host resource following Host resource
if (splits.size() == 3) {
if (isAction(splits.get(1))) {
throw new ConfigurationException("Kafka privilege can have action only at the end of privilege.\n" + KafkaPrivilegeHelpMsg);
}
KafkaAuthorizable authorizable = KafkaModelAuthorizables.from(splits.get(1));
if (authorizable == null) {
throw new ConfigurationException("No Kafka authorizable found for " + splits.get(1) + "\n." + KafkaPrivilegeHelpMsg);
}
if (authorizable instanceof Host) {
throw new ConfigurationException("Host authorizable can be specified just once in a Kafka privilege.\n" + KafkaPrivilegeHelpMsg);
}
}
// Check privilege ends with exactly one valid action
if (!isAction(splits.get(splits.size() - 1))) {
throw new ConfigurationException("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeHelpMsg);
}
}
private boolean isAction(String privilegePart) {
final String privilege = privilegePart.toLowerCase();
final String action = privilege.replace(PRIVILEGE_PREFIX, "").toLowerCase();
return privilege.startsWith(PRIVILEGE_PREFIX) &&
KafkaActionFactory.getInstance().getActionByName(action) != null;
}
}