blob: d7c5dd1a3e58515cf97bf8452209cdeba6a3a546 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
/**
* The AclControlManager manages any ACLs that are stored in the __cluster_metadata topic.
* If the ACLs are stored externally (such as in ZooKeeper) then there will be nothing for
* this manager to do.
*/
public class AclControlManager {
static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
AclControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
return new AclControlManager(logContext, snapshotRegistry);
}
}
private final Logger log;
private final TimelineHashMap<Uuid, StandardAcl> idToAcl;
private final TimelineHashSet<StandardAcl> existingAcls;
private AclControlManager(
LogContext logContext,
SnapshotRegistry snapshotRegistry
) {
this.log = logContext.logger(AclControlManager.class);
this.idToAcl = new TimelineHashMap<>(snapshotRegistry, 0);
this.existingAcls = new TimelineHashSet<>(snapshotRegistry, 0);
}
ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) {
List<AclCreateResult> results = new ArrayList<>(acls.size());
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (AclBinding acl : acls) {
try {
validateNewAcl(acl);
} catch (Throwable t) {
ApiException e = (t instanceof ApiException) ? (ApiException) t :
new UnknownServerException("Unknown error while trying to create ACL", t);
results.add(new AclCreateResult(e));
continue;
}
StandardAcl standardAcl = StandardAcl.fromAclBinding(acl);
if (existingAcls.add(standardAcl)) {
StandardAclWithId standardAclWithId = new StandardAclWithId(newAclId(), standardAcl);
idToAcl.put(standardAclWithId.id(), standardAcl);
records.add(new ApiMessageAndVersion(standardAclWithId.toRecord(), (short) 0));
}
results.add(AclCreateResult.SUCCESS);
}
return new ControllerResult<>(records, results, true);
}
Uuid newAclId() {
Uuid uuid;
do {
uuid = Uuid.randomUuid();
} while (idToAcl.containsKey(uuid));
return uuid;
}
static void validateNewAcl(AclBinding binding) {
switch (binding.pattern().resourceType()) {
case UNKNOWN:
case ANY:
throw new InvalidRequestException("Invalid resourceType " +
binding.pattern().resourceType());
default:
break;
}
switch (binding.pattern().patternType()) {
case LITERAL:
case PREFIXED:
break;
default:
throw new InvalidRequestException("Invalid patternType " +
binding.pattern().patternType());
}
switch (binding.entry().operation()) {
case UNKNOWN:
case ANY:
throw new InvalidRequestException("Invalid operation " +
binding.entry().operation());
default:
break;
}
switch (binding.entry().permissionType()) {
case DENY:
case ALLOW:
break;
default:
throw new InvalidRequestException("Invalid permissionType " +
binding.entry().permissionType());
}
if (binding.pattern().name() == null || binding.pattern().name().isEmpty()) {
throw new InvalidRequestException("Resource name should not be empty");
}
}
ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filters) {
List<AclDeleteResult> results = new ArrayList<>();
Set<ApiMessageAndVersion> records = new HashSet<>();
for (AclBindingFilter filter : filters) {
try {
validateFilter(filter);
AclDeleteResult result = deleteAclsForFilter(filter, records);
results.add(result);
} catch (Throwable e) {
results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception()));
}
}
return ControllerResult.atomicOf(new ArrayList<>(records), results);
}
AclDeleteResult deleteAclsForFilter(AclBindingFilter filter,
Set<ApiMessageAndVersion> records) {
List<AclBindingDeleteResult> deleted = new ArrayList<>();
for (Entry<Uuid, StandardAcl> entry : idToAcl.entrySet()) {
Uuid id = entry.getKey();
StandardAcl acl = entry.getValue();
AclBinding binding = acl.toBinding();
if (filter.matches(binding)) {
deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
if (records.size() > MAX_RECORDS_PER_USER_OP) {
throw new BoundedListTooLongException("Cannot remove more than " +
MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.");
}
}
}
return new AclDeleteResult(deleted);
}
static void validateFilter(AclBindingFilter filter) {
if (filter.patternFilter().isUnknown()) {
throw new InvalidRequestException("Unknown patternFilter.");
}
if (filter.entryFilter().isUnknown()) {
throw new InvalidRequestException("Unknown entryFilter.");
}
}
public void replay(AccessControlEntryRecord record) {
StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record);
idToAcl.put(aclWithId.id(), aclWithId.acl());
existingAcls.add(aclWithId.acl());
log.info("Replayed AccessControlEntryRecord for {}, setting {}", record.id(),
aclWithId.acl());
}
public void replay(RemoveAccessControlEntryRecord record) {
StandardAcl acl = idToAcl.remove(record.id());
if (acl == null) {
throw new RuntimeException("Unable to replay " + record + ": no acl with " +
"that ID found.");
}
if (!existingAcls.remove(acl)) {
throw new RuntimeException("Unable to replay " + record + " for " + acl +
": acl not found " + "in existingAcls.");
}
log.info("Replayed RemoveAccessControlEntryRecord for {}, removing {}", record.id(), acl);
}
Map<Uuid, StandardAcl> idToAcl() {
return Collections.unmodifiableMap(idToAcl);
}
}