blob: 22701425cb517a75795108f52c0bea675df32cd8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.blobstore;
import java.io.IOException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.generated.AccessControl;
import org.apache.storm.generated.AccessControlType;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.IGroupMappingServiceProvider;
import org.apache.storm.security.auth.IPrincipalToLocal;
import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.utils.WrappedAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides common handling of acls for Blobstores. Also contains some static utility functions related to Blobstores.
*/
public class BlobStoreAclHandler {
public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class);
public static final int READ = 0x01;
public static final int WRITE = 0x02;
public static final int ADMIN = 0x04;
public static final List<AccessControl> WORLD_EVERYTHING =
Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>();
private final IPrincipalToLocal ptol;
private final IGroupMappingServiceProvider groupMappingServiceProvider;
private Set<String> supervisors;
private Set<String> admins;
private Set<String> adminsGroups;
private boolean doAclValidation;
public BlobStoreAclHandler(Map<String, Object> conf) {
ptol = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
if (conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN) != null) {
groupMappingServiceProvider = ClientAuthUtils.getGroupMappingServiceProviderPlugin(conf);
} else {
groupMappingServiceProvider = null;
}
supervisors = new HashSet<String>();
admins = new HashSet<String>();
adminsGroups = new HashSet<>();
if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
supervisors.addAll((List<String>) conf.get(Config.NIMBUS_SUPERVISOR_USERS));
}
if (conf.containsKey(Config.NIMBUS_ADMINS)) {
admins.addAll((List<String>) conf.get(Config.NIMBUS_ADMINS));
}
if (conf.containsKey(Config.NIMBUS_ADMINS_GROUPS)) {
adminsGroups.addAll((List<String>) conf.get(Config.NIMBUS_ADMINS_GROUPS));
}
if (conf.containsKey(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED)) {
doAclValidation = (boolean) conf.get(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED);
}
}
private static AccessControlType parseAclType(String type) {
if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
return AccessControlType.OTHER;
} else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) {
return AccessControlType.USER;
}
throw new IllegalArgumentException(type + " is not a valid access control type");
}
private static int parseAccess(String access) {
int ret = 0;
for (char c : access.toCharArray()) {
if ('r' == c) {
ret = ret | READ;
} else if ('w' == c) {
ret = ret | WRITE;
} else if ('a' == c) {
ret = ret | ADMIN;
} else if ('-' == c) {
//ignored
} else {
throw new IllegalArgumentException("");
}
}
return ret;
}
public static AccessControl parseAccessControl(String str) {
String[] parts = str.split(":");
String type = "other";
String name = "";
String access = "-";
if (parts.length > 3) {
throw new IllegalArgumentException("Don't know how to parse " + str + " into an ACL value");
} else if (parts.length == 1) {
type = "other";
name = "";
access = parts[0];
} else if (parts.length == 2) {
type = "user";
name = parts[0];
access = parts[1];
} else if (parts.length == 3) {
type = parts[0];
name = parts[1];
access = parts[2];
}
AccessControl ret = new AccessControl();
ret.set_type(parseAclType(type));
ret.set_name(name);
ret.set_access(parseAccess(access));
return ret;
}
private static String accessToString(int access) {
StringBuilder ret = new StringBuilder();
ret.append(((access & READ) > 0) ? "r" : "-");
ret.append(((access & WRITE) > 0) ? "w" : "-");
ret.append(((access & ADMIN) > 0) ? "a" : "-");
return ret.toString();
}
public static String accessControlToString(AccessControl ac) {
StringBuilder ret = new StringBuilder();
switch (ac.get_type()) {
case OTHER:
ret.append("o");
break;
case USER:
ret.append("u");
break;
default:
throw new IllegalArgumentException("Don't know what a type of " + ac.get_type() + " means ");
}
ret.append(":");
if (ac.is_set_name()) {
ret.append(ac.get_name());
}
ret.append(":");
ret.append(accessToString(ac.get_access()));
return ret.toString();
}
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException {
Set<String> aclUsers = new HashSet<>();
List<String> duplicateUsers = new ArrayList<>();
for (AccessControl acl : acls) {
String aclUser = acl.get_name();
if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) {
LOG.error("'{}' user can't appear more than once in the ACLs", aclUser);
duplicateUsers.add(aclUser);
}
}
if (duplicateUsers.size() > 0) {
String errorMessage = "user " + Arrays.toString(duplicateUsers.toArray())
+ " can't appear more than once in the ACLs for key [" + key + "].";
throw new WrappedAuthorizationException(errorMessage);
}
}
private Set<String> constructUserFromPrincipals(Subject who) {
Set<String> user = new HashSet<String>();
if (who != null) {
for (Principal p : who.getPrincipals()) {
user.add(ptol.toLocal(p));
}
}
return user;
}
private boolean isAdmin(Subject who) {
Set<String> user = constructUserFromPrincipals(who);
for (String u : user) {
if (admins.contains(u)) {
return true;
}
if (adminsGroups.size() > 0 && groupMappingServiceProvider != null) {
Set<String> userGroups = null;
try {
userGroups = groupMappingServiceProvider.getGroups(u);
} catch (IOException e) {
LOG.warn("Error while trying to fetch user groups", e);
}
if (userGroups != null) {
for (String tgroup : userGroups) {
if (adminsGroups.contains(tgroup)) {
return true;
}
}
}
}
}
return false;
}
private boolean isReadOperation(int operation) {
if (operation == 1) {
return true;
}
return false;
}
private boolean isSupervisor(Subject who, int operation) {
Set<String> user = constructUserFromPrincipals(who);
if (isReadOperation(operation)) {
for (String u : user) {
if (supervisors.contains(u)) {
return true;
}
}
}
return false;
}
private boolean isNimbus(Subject who) {
Set<Principal> principals;
boolean isNimbusInstance = false;
if (who != null) {
principals = who.getPrincipals();
for (Principal principal : principals) {
if (principal instanceof NimbusPrincipal) {
isNimbusInstance = true;
}
}
}
return isNimbusInstance;
}
public boolean checkForValidUsers(Subject who, int mask) {
return isNimbus(who) || isAdmin(who) || isSupervisor(who, mask);
}
/**
* The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN.
*/
public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException {
hasAnyPermissions(acl, (READ | WRITE | ADMIN), who, key);
}
/**
* Validates if the user has any of the permissions mentioned in the mask.
*
* @param acl ACL for the key.
* @param mask mask holds the cumulative value of READ = 1, WRITE = 2 or ADMIN = 4 permissions. mask = 1 implies READ privilege. mask =
* 5 implies READ and ADMIN privileges.
* @param who Is the user against whom the permissions are validated for a key using the ACL and the mask.
* @param key Key used to identify the blob.
*/
public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
if (!doAclValidation) {
return;
}
Set<String> user = constructUserFromPrincipals(who);
LOG.debug("user {}", user);
if (checkForValidUsers(who, mask)) {
return;
}
for (AccessControl ac : acl) {
int allowed = getAllowed(ac, user);
LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
if ((allowed & mask) > 0) {
return;
}
}
throw new WrappedAuthorizationException(
user + " does not have access to " + key);
}
/**
* Validates if the user has at least the set of permissions mentioned in the mask.
*
* @param acl ACL for the key.
* @param mask mask holds the cumulative value of READ = 1, WRITE = 2 or ADMIN = 4 permissions. mask = 1 implies READ privilege. mask =
* 5 implies READ and ADMIN privileges.
* @param who Is the user against whom the permissions are validated for a key using the ACL and the mask.
* @param key Key used to identify the blob.
*/
public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
if (!doAclValidation) {
return;
}
Set<String> user = constructUserFromPrincipals(who);
LOG.debug("user {}", user);
if (checkForValidUsers(who, mask)) {
return;
}
for (AccessControl ac : acl) {
int allowed = getAllowed(ac, user);
mask = ~allowed & mask;
LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key);
}
if (mask == 0) {
return;
}
throw new WrappedAuthorizationException(
user + " does not have " + namedPerms(mask) + " access to " + key);
}
public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) {
meta.set_acl(normalizeSettableAcls(key, meta.get_acl(), who, opMask));
}
private String namedPerms(int mask) {
StringBuilder b = new StringBuilder();
b.append("[");
if ((mask & READ) > 0) {
b.append("READ ");
}
if ((mask & WRITE) > 0) {
b.append("WRITE ");
}
if ((mask & ADMIN) > 0) {
b.append("ADMIN ");
}
b.append("]");
return b.toString();
}
private int getAllowed(AccessControl ac, Set<String> users) {
switch (ac.get_type()) {
case OTHER:
return ac.get_access();
case USER:
if (users.contains(ac.get_name())) {
return ac.get_access();
}
return 0;
default:
return 0;
}
}
private List<AccessControl> removeBadAcls(List<AccessControl> accessControls) {
List<AccessControl> resultAcl = new ArrayList<AccessControl>();
for (AccessControl control : accessControls) {
if (control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0)) {
LOG.debug("Removing invalid blobstore world ACL "
+ BlobStoreAclHandler.accessControlToString(control));
continue;
}
resultAcl.add(control);
}
return resultAcl;
}
private final List<AccessControl> normalizeSettableAcls(String key, List<AccessControl> acls, Subject who,
int opMask) {
List<AccessControl> cleanAcls = removeBadAcls(acls);
Set<String> userNames = getUserNamesFromSubject(who);
for (String user : userNames) {
fixAclsForUser(cleanAcls, user, opMask);
}
fixEmptyNameACLForUsers(cleanAcls, userNames, opMask);
if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls);
if (!acls.isEmpty()) {
LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key);
}
}
return cleanAcls;
}
private boolean worldEverything(List<AccessControl> acls) {
boolean isWorldEverything = false;
for (AccessControl acl : acls) {
if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ | WRITE | ADMIN)) {
isWorldEverything = true;
break;
}
}
return isWorldEverything;
}
private void fixAclsForUser(List<AccessControl> acls, String user, int mask) {
boolean foundUserAcl = false;
List<AccessControl> emptyUserAcls = new ArrayList<>();
for (AccessControl control : acls) {
if (control.get_type() == AccessControlType.USER) {
if (!control.is_set_name()) {
emptyUserAcls.add(control);
} else if (control.get_name().equals(user)) {
int currentAccess = control.get_access();
if ((currentAccess & mask) != mask) {
control.set_access(currentAccess | mask);
}
foundUserAcl = true;
}
}
}
// if ACLs have two user ACLs for empty user and principal, discard empty user ACL
if (!emptyUserAcls.isEmpty() && foundUserAcl) {
acls.removeAll(emptyUserAcls);
}
// add default user ACL when only empty user ACL is not present
if (emptyUserAcls.isEmpty() && !foundUserAcl) {
AccessControl userAcl = new AccessControl();
userAcl.set_type(AccessControlType.USER);
userAcl.set_name(user);
userAcl.set_access(mask);
acls.add(userAcl);
}
}
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
private void fixEmptyNameACLForUsers(List<AccessControl> acls, Set<String> users, int mask) {
List<AccessControl> aclsToAdd = new ArrayList<>();
List<AccessControl> aclsToRemove = new ArrayList<>();
for (AccessControl control : acls) {
if (control.get_type() == AccessControlType.USER && !control.is_set_name()) {
aclsToRemove.add(control);
int currentAccess = control.get_access();
if ((currentAccess & mask) != mask) {
control.set_access(currentAccess | mask);
}
for (String user : users) {
AccessControl copiedControl = new AccessControl(control);
copiedControl.set_name(user);
aclsToAdd.add(copiedControl);
}
}
}
acls.removeAll(aclsToRemove);
acls.addAll(aclsToAdd);
}
private Set<String> getUserNamesFromSubject(Subject who) {
Set<String> user = new HashSet<String>();
if (who != null) {
for (Principal p : who.getPrincipals()) {
user.add(ptol.toLocal(p));
}
}
return user;
}
}