blob: a26eb3d50b546c106ff168f61ac150d7ba6e09ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.authorizer;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.AuthorizerNotReadyException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.CompletionStage;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
import static org.apache.kafka.common.acl.AclOperation.DELETE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.acl.AclPermissionType.DENY;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.PatternType.PREFIXED;
import static org.apache.kafka.common.resource.ResourceType.GROUP;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.common.security.auth.KafkaPrincipal.USER_TYPE;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizer.SUPER_USERS_CONFIG;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizer.getConfiguredSuperUsers;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizer.getDefaultResult;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizerData.WILDCARD;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizerData.WILDCARD_PRINCIPAL;
import static org.apache.kafka.metadata.authorizer.StandardAuthorizerData.findResult;
import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class StandardAuthorizerTest {
public static final Endpoint PLAINTEXT = new Endpoint("PLAINTEXT",
SecurityProtocol.PLAINTEXT,
"127.0.0.1",
9020);
public static final Endpoint CONTROLLER = new Endpoint("CONTROLLER",
SecurityProtocol.PLAINTEXT,
"127.0.0.1",
9020);
static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
private final Collection<Endpoint> endpoints;
AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
assertFalse(endpoints.isEmpty());
this.endpoints = endpoints;
}
@Override
public ClusterResource clusterResource() {
return new ClusterResource(Uuid.fromString("r7mqHQrxTNmzbKvCvWZzLQ").toString());
}
@Override
public int brokerId() {
return 0;
}
@Override
public Collection<Endpoint> endpoints() {
return endpoints;
}
@Override
public Endpoint interBrokerEndpoint() {
return endpoints.iterator().next();
}
@Override
public Collection<String> earlyStartListeners() {
List<String> result = new ArrayList<>();
for (Endpoint endpoint : endpoints) {
if (endpoint.listenerName().get().equals("CONTROLLER")) {
result.add(endpoint.listenerName().get());
}
}
return result;
}
}
@Test
public void testGetConfiguredSuperUsers() {
assertEquals(Collections.emptySet(),
getConfiguredSuperUsers(Collections.emptyMap()));
assertEquals(Collections.emptySet(),
getConfiguredSuperUsers(Collections.singletonMap(SUPER_USERS_CONFIG, " ")));
assertEquals(new HashSet<>(asList("User:bob", "User:alice")),
getConfiguredSuperUsers(Collections.singletonMap(SUPER_USERS_CONFIG, "User:bob;User:alice ")));
assertEquals(new HashSet<>(asList("User:bob", "User:alice")),
getConfiguredSuperUsers(Collections.singletonMap(SUPER_USERS_CONFIG, "; User:bob ; User:alice ")));
assertEquals("expected a string in format principalType:principalName but got bob",
assertThrows(IllegalArgumentException.class, () -> getConfiguredSuperUsers(
Collections.singletonMap(SUPER_USERS_CONFIG, "bob;:alice"))).getMessage());
}
@Test
public void testGetDefaultResult() {
assertEquals(DENIED, getDefaultResult(Collections.emptyMap()));
assertEquals(ALLOWED, getDefaultResult(Collections.singletonMap(
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true")));
assertEquals(DENIED, getDefaultResult(Collections.singletonMap(
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false")));
}
@Test
public void testConfigure() {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap<String, Object> configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
authorizer.configure(configs);
assertEquals(new HashSet<>(asList("User:alice", "User:chris")), authorizer.superUsers());
assertEquals(ALLOWED, authorizer.defaultResult());
}
static Action newAction(AclOperation aclOperation,
ResourceType resourceType,
String resourceName) {
return new Action(aclOperation,
new ResourcePattern(resourceType, resourceName, LITERAL), 1, false, false);
}
static StandardAuthorizer createAndInitializeStandardAuthorizer() {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, "User:superman"));
authorizer.start(new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
authorizer.completeInitialLoad();
return authorizer;
}
private final static AtomicLong NEXT_ID = new AtomicLong(0);
static StandardAcl newFooAcl(AclOperation op, AclPermissionType permission) {
return new StandardAcl(
TOPIC,
"foo_",
PREFIXED,
"User:bob",
WILDCARD,
op,
permission);
}
static StandardAclWithId withId(StandardAcl acl) {
return new StandardAclWithId(new Uuid(acl.hashCode(), acl.hashCode()), acl);
}
@Test
public void testFindResultImplication() throws Exception {
// These permissions all imply DESCRIBE.
for (AclOperation op : asList(DESCRIBE, READ, WRITE, DELETE, ALTER)) {
assertEquals(ALLOWED, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(op, ALLOW)));
}
// CREATE does not imply DESCRIBE
assertEquals(null, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(CREATE, ALLOW)));
// Deny ACLs don't do "implication".
for (AclOperation op : asList(READ, WRITE, DELETE, ALTER)) {
assertEquals(null, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(op, DENY)));
}
// Exact match
assertEquals(DENIED, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(DESCRIBE, DENY)));
// These permissions all imply DESCRIBE_CONFIGS.
for (AclOperation op : asList(DESCRIBE_CONFIGS, ALTER_CONFIGS)) {
assertEquals(ALLOWED, findResult(newAction(DESCRIBE_CONFIGS, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(op, ALLOW)));
}
// Deny ACLs don't do "implication".
assertEquals(null, findResult(newAction(DESCRIBE_CONFIGS, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(ALTER_CONFIGS, DENY)));
// Exact match
assertEquals(DENIED, findResult(newAction(ALTER_CONFIGS, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(ALTER_CONFIGS, DENY)));
}
static StandardAcl newBarAcl(AclOperation op, AclPermissionType permission) {
return new StandardAcl(
GROUP,
"bar",
LITERAL,
WILDCARD_PRINCIPAL,
WILDCARD,
op,
permission);
}
@Test
public void testFindResultPrincipalMatching() throws Exception {
assertEquals(ALLOWED, findResult(newAction(READ, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
newFooAcl(READ, ALLOW)));
// Principal does not match.
assertEquals(null, findResult(newAction(READ, TOPIC, "foo_bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "alice")).build(),
newFooAcl(READ, ALLOW)));
// Wildcard principal matches anything.
assertEquals(DENIED, findResult(newAction(READ, GROUP, "bar"),
new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "alice")).build(),
newBarAcl(READ, DENY)));
}
private static void assertContains(Iterable<AclBinding> iterable, StandardAcl... acls) {
Iterator<AclBinding> iterator = iterable.iterator();
for (int i = 0; iterator.hasNext(); i++) {
AclBinding acl = iterator.next();
assertTrue(i < acls.length, "Only expected " + i + " element(s)");
assertEquals(acls[i].toBinding(), acl, "Unexpected element " + i);
}
assertFalse(iterator.hasNext(), "Expected only " + acls.length + " element(s)");
}
@Test
public void testListAcls() throws Exception {
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
List<StandardAclWithId> fooAcls = asList(
withId(newFooAcl(READ, ALLOW)),
withId(newFooAcl(WRITE, ALLOW)));
List<StandardAclWithId> barAcls = asList(
withId(newBarAcl(DESCRIBE_CONFIGS, DENY)),
withId(newBarAcl(ALTER_CONFIGS, DENY)));
fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
assertContains(authorizer.acls(AclBindingFilter.ANY),
fooAcls.get(0).acl(), fooAcls.get(1).acl(), barAcls.get(0).acl(), barAcls.get(1).acl());
authorizer.removeAcl(fooAcls.get(1).id());
assertContains(authorizer.acls(AclBindingFilter.ANY),
fooAcls.get(0).acl(), barAcls.get(0).acl(), barAcls.get(1).acl());
assertContains(authorizer.acls(new AclBindingFilter(new ResourcePatternFilter(
TOPIC, null, PatternType.ANY), AccessControlEntryFilter.ANY)),
fooAcls.get(0).acl());
}
@Test
public void testSimpleAuthorizations() throws Exception {
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
List<StandardAclWithId> fooAcls = asList(
withId(newFooAcl(READ, ALLOW)),
withId(newFooAcl(WRITE, ALLOW)));
List<StandardAclWithId> barAcls = asList(
withId(newBarAcl(DESCRIBE_CONFIGS, ALLOW)),
withId(newBarAcl(ALTER_CONFIGS, ALLOW)));
fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
assertEquals(singletonList(ALLOWED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
singletonList(newAction(READ, TOPIC, "foo_"))));
assertEquals(singletonList(ALLOWED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(),
singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"))));
}
@Test
public void testDenyPrecedenceWithOperationAll() throws Exception {
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", ALL, DENY),
new StandardAcl(TOPIC, "foo", PREFIXED, "User:alice", "*", READ, ALLOW),
new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, DENY),
new StandardAcl(TOPIC, "foo", PREFIXED, "User:*", "*", DESCRIBE, ALLOW)
);
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
newAction(WRITE, TOPIC, "foo"),
newAction(READ, TOPIC, "foo"),
newAction(DESCRIBE, TOPIC, "foo"),
newAction(READ, TOPIC, "foobar"))));
assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED, DENIED), authorizer.authorize(
newRequestContext("bob"),
Arrays.asList(
newAction(DESCRIBE, TOPIC, "foo"),
newAction(READ, TOPIC, "foo"),
newAction(WRITE, TOPIC, "foo"),
newAction(DESCRIBE, TOPIC, "foobaz"),
newAction(READ, TOPIC, "foobaz"))));
}
@Test
public void testTopicAclWithOperationAll() throws Exception {
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW)
);
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
newAction(WRITE, TOPIC, "foo"),
newAction(DESCRIBE_CONFIGS, TOPIC, "bar"),
newAction(DESCRIBE, TOPIC, "baz"))));
assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("bob"),
Arrays.asList(
newAction(WRITE, TOPIC, "foo"),
newAction(READ, TOPIC, "bar"),
newAction(DESCRIBE, TOPIC, "baz"))));
assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize(
newRequestContext("malory"),
Arrays.asList(
newAction(DESCRIBE, TOPIC, "foo"),
newAction(WRITE, TOPIC, "bar"),
newAction(READ, TOPIC, "baz"))));
}
private AuthorizableRequestContext newRequestContext(String principal) throws Exception {
return new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, principal))
.build();
}
@Test
public void testHostAddressAclValidation() throws Exception {
InetAddress host1 = InetAddress.getByName("192.168.1.1");
InetAddress host2 = InetAddress.getByName("192.168.1.2");
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", host1.getHostAddress(), READ, DENY),
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", READ, ALLOW),
new StandardAcl(TOPIC, "bar", LITERAL, "User:bob", host2.getHostAddress(), READ, ALLOW),
new StandardAcl(TOPIC, "bar", LITERAL, "User:*", InetAddress.getLocalHost().getHostAddress(), DESCRIBE, ALLOW)
);
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
List<Action> actions = Arrays.asList(
newAction(READ, TOPIC, "foo"),
newAction(READ, TOPIC, "bar"),
newAction(DESCRIBE, TOPIC, "bar")
);
assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("alice", InetAddress.getLocalHost()), actions));
assertEquals(Arrays.asList(DENIED, DENIED, DENIED), authorizer.authorize(
newRequestContext("alice", host1), actions));
assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize(
newRequestContext("alice", host2), actions));
assertEquals(Arrays.asList(DENIED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("bob", InetAddress.getLocalHost()), actions));
assertEquals(Arrays.asList(DENIED, DENIED, DENIED), authorizer.authorize(
newRequestContext("bob", host1), actions));
assertEquals(Arrays.asList(DENIED, ALLOWED, ALLOWED), authorizer.authorize(
newRequestContext("bob", host2), actions));
}
private AuthorizableRequestContext newRequestContext(String principal, InetAddress clientAddress) throws Exception {
return new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, principal))
.setClientAddress(clientAddress)
.build();
}
private static void addManyAcls(StandardAuthorizer authorizer) {
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "green2", LITERAL, "User:*", "*", READ, ALLOW),
new StandardAcl(TOPIC, "green", PREFIXED, "User:bob", "*", READ, ALLOW),
new StandardAcl(TOPIC, "betamax4", LITERAL, "User:bob", "*", READ, ALLOW),
new StandardAcl(TOPIC, "betamax", LITERAL, "User:bob", "*", READ, ALLOW),
new StandardAcl(TOPIC, "beta", PREFIXED, "User:*", "*", READ, ALLOW),
new StandardAcl(TOPIC, "alpha", PREFIXED, "User:*", "*", READ, ALLOW),
new StandardAcl(TOPIC, "alp", PREFIXED, "User:bob", "*", READ, DENY),
new StandardAcl(GROUP, "*", LITERAL, "User:bob", "*", WRITE, ALLOW),
new StandardAcl(GROUP, "wheel", LITERAL, "User:*", "*", WRITE, DENY)
);
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
}
@Test
public void testAuthorizationWithManyAcls() throws Exception {
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
addManyAcls(authorizer);
assertEquals(Arrays.asList(ALLOWED, DENIED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
Arrays.asList(newAction(READ, TOPIC, "green1"),
newAction(WRITE, GROUP, "wheel"))));
assertEquals(Arrays.asList(DENIED, ALLOWED, DENIED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
Arrays.asList(newAction(READ, TOPIC, "alpha"),
newAction(WRITE, GROUP, "arbitrary"),
newAction(READ, TOPIC, "ala"))));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDenyAuditLogging(boolean logIfDenied) throws Exception {
try (MockedStatic<LoggerFactory> mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class)) {
Logger otherLog = Mockito.mock(Logger.class);
Logger auditLog = Mockito.mock(Logger.class);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
.thenReturn(auditLog);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
.thenReturn(otherLog);
Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
addManyAcls(authorizer);
ResourcePattern topicResource = new ResourcePattern(TOPIC, "alpha", LITERAL);
Action action = new Action(READ, topicResource, 1, false, logIfDenied);
MockAuthorizableRequestContext requestContext = new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
.setClientAddress(InetAddress.getByName("127.0.0.1"))
.build();
assertEquals(singletonList(DENIED), authorizer.authorize(requestContext, singletonList(action)));
String expectedAuditLog = "Principal = User:bob is Denied operation = READ " +
"from host = 127.0.0.1 on resource = Topic:LITERAL:alpha for request = Fetch " +
"with resourceRefCount = 1 based on rule MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
"resourceName=alp, patternType=PREFIXED, principal=User:bob, host=*, operation=READ, " +
"permissionType=DENY))";
if (logIfDenied) {
Mockito.verify(auditLog).info(expectedAuditLog);
} else {
Mockito.verify(auditLog).trace(expectedAuditLog);
}
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAllowAuditLogging(boolean logIfAllowed) throws Exception {
try (MockedStatic<LoggerFactory> mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class)) {
Logger otherLog = Mockito.mock(Logger.class);
Logger auditLog = Mockito.mock(Logger.class);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger("kafka.authorizer.logger"))
.thenReturn(auditLog);
mockedLoggerFactory
.when(() -> LoggerFactory.getLogger(Mockito.any(Class.class)))
.thenReturn(otherLog);
Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
addManyAcls(authorizer);
ResourcePattern topicResource = new ResourcePattern(TOPIC, "green1", LITERAL);
Action action = new Action(READ, topicResource, 1, logIfAllowed, false);
MockAuthorizableRequestContext requestContext = new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, "bob"))
.setClientAddress(InetAddress.getByName("127.0.0.1"))
.build();
assertEquals(singletonList(ALLOWED), authorizer.authorize(requestContext, singletonList(action)));
String expectedAuditLog = "Principal = User:bob is Allowed operation = READ " +
"from host = 127.0.0.1 on resource = Topic:LITERAL:green1 for request = Fetch " +
"with resourceRefCount = 1 based on rule MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
"resourceName=green, patternType=PREFIXED, principal=User:bob, host=*, operation=READ, " +
"permissionType=ALLOW))";
if (logIfAllowed) {
Mockito.verify(auditLog).debug(expectedAuditLog);
} else {
Mockito.verify(auditLog).trace(expectedAuditLog);
}
}
}
/**
* Test that StandardAuthorizer#start returns a completed future for early start
* listeners.
*/
@Test
public void testStartWithEarlyStartListeners() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, "User:superman"));
Map<Endpoint, ? extends CompletionStage<Void>> futures2 = authorizer.
start(new AuthorizerTestServerInfo(Arrays.asList(PLAINTEXT, CONTROLLER)));
assertEquals(new HashSet<>(Arrays.asList(PLAINTEXT, CONTROLLER)), futures2.keySet());
assertFalse(futures2.get(PLAINTEXT).toCompletableFuture().isDone());
assertTrue(futures2.get(CONTROLLER).toCompletableFuture().isDone());
}
/**
* Test attempts to authorize prior to completeInitialLoad. During this time, only
* superusers can be authorized. Other users will get an AuthorizerNotReadyException
* exception. Not even an authorization result, just an exception thrown for the whole
* batch.
*/
@Test
public void testAuthorizationPriorToCompleteInitialLoad() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, "User:superman"));
assertThrows(AuthorizerNotReadyException.class, () ->
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
Arrays.asList(newAction(READ, TOPIC, "green1"),
newAction(READ, TOPIC, "green2"))));
assertEquals(Arrays.asList(ALLOWED, ALLOWED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "superman")).build(),
Arrays.asList(newAction(READ, TOPIC, "green1"),
newAction(WRITE, GROUP, "wheel"))));
}
@Test
public void testCompleteInitialLoad() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, "User:superman"));
Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
start(new AuthorizerTestServerInfo(Collections.singleton(PLAINTEXT)));
assertEquals(Collections.singleton(PLAINTEXT), futures.keySet());
assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
authorizer.completeInitialLoad();
assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isDone());
assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isCompletedExceptionally());
}
@Test
public void testCompleteInitialLoadWithException() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, "User:superman"));
Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
start(new AuthorizerTestServerInfo(Arrays.asList(PLAINTEXT, CONTROLLER)));
assertEquals(new HashSet<>(Arrays.asList(PLAINTEXT, CONTROLLER)), futures.keySet());
assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
authorizer.completeInitialLoad(new TimeoutException("timed out"));
assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isDone());
assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isCompletedExceptionally());
assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
assertFalse(futures.get(CONTROLLER).toCompletableFuture().isCompletedExceptionally());
}
@Test
public void testPrefixAcls() throws Exception {
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "fooa", PREFIXED, "User:alice", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "foobar", LITERAL, "User:bob", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "f", PREFIXED, "User:bob", "*", ALL, ALLOW)
);
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("bob"),
Arrays.asList(
newAction(WRITE, TOPIC, "foobarr"),
newAction(READ, TOPIC, "goobar"),
newAction(READ, TOPIC, "fooa"))));
assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
newAction(DESCRIBE, TOPIC, "fooa"),
newAction(WRITE, TOPIC, "bar"),
newAction(READ, TOPIC, "baz"))));
}
}