blob: 49a4641fb71d274e222658a3f3e173cb6d95b8dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.security.auth;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.Testing;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.WorkerToken;
import org.apache.storm.generated.WorkerTokenServiceType;
import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer;
import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer;
import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer;
import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin;
import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
import org.apache.storm.testing.InProcessZookeeper;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class AuthTest {
//3 seconds in milliseconds
public static final int NIMBUS_TIMEOUT = 3_000;
private static final Logger LOG = LoggerFactory.getLogger(AuthTest.class);
private static final File BASE = new File("./src/test/resources/");
private static final String DIGEST_JAAS_CONF = new File(BASE, "jaas_digest.conf").getAbsolutePath();
private static final String BAD_PASSWORD_CONF = new File(BASE, "jaas_digest_bad_password.conf").getAbsolutePath();
private static final String WRONG_USER_CONF = new File(BASE, "jaas_digest_unknown_user.conf").getAbsolutePath();
private static final String MISSING_CLIENT = new File(BASE, "jaas_digest_missing_client.conf").getAbsolutePath();
public static Principal mkPrincipal(final String name) {
return new Principal() {
@Override
public String getName() {
return name;
}
@Override
public boolean equals(Object other) {
return other instanceof Principal
&& name.equals(((Principal) other).getName());
}
@Override
public String toString() {
return name;
}
@Override
public int hashCode() {
return name.hashCode();
}
};
}
public static Subject mkSubject(String name) {
return new Subject(true, Collections.singleton(mkPrincipal(name)),
Collections.emptySet(), Collections.emptySet());
}
public static void withServer(Class<? extends ITransportPlugin> transportPluginClass,
Nimbus.Iface impl,
MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
withServer(null, transportPluginClass, impl, null, null, body);
}
public static void withServer(String loginCfg,
Class<? extends ITransportPlugin> transportPluginClass,
Nimbus.Iface impl,
MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
withServer(loginCfg, transportPluginClass, impl, null, null, body);
}
public static void withServer(String loginCfg,
Class<? extends ITransportPlugin> transportPluginClass,
Nimbus.Iface impl,
InProcessZookeeper zk,
Map<String, Object> extraConfs,
MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(Config.NIMBUS_THRIFT_PORT, 0);
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, transportPluginClass.getName());
if (loginCfg != null) {
conf.put("java.security.auth.login.config", loginCfg);
}
if (zk != null) {
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort());
}
if (extraConfs != null) {
conf.putAll(extraConfs);
}
Nimbus.Iface handler = impl != null ? impl : mock(Nimbus.Iface.class);
final ThriftServer server = new ThriftServer(conf,
new Nimbus.Processor<>(handler),
ThriftConnectionType.NIMBUS);
LOG.info("Created Server... {}", server);
new Thread(() -> {
LOG.info("Starting Serving...");
server.serve();
}).start();
Testing.whileTimeout(
() -> !server.isServing(),
() -> {
try {
Time.sleep(100);
} catch (InterruptedException e) {
//Ignored
}
});
try {
LOG.info("Starting to run {}", body);
body.accept(server, conf);
LOG.info("{} finished with no exceptions", body);
} finally {
LOG.info("Stopping server {}", server);
server.stop();
}
}
public static void verifyIncorrectJaasConf(ThriftServer server, Map<String, Object> conf, String jaas,
Class<? extends Exception> expectedException) {
Map<String, Object> badConf = new HashMap<>(conf);
badConf.put("java.security.auth.login.config", jaas);
try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate("bad_auth_test_topology");
fail("An exception should have been thrown trying to connect.");
} catch (Exception e) {
LOG.info("Got Exception...", e);
assert (Utils.exceptionCauseIsInstanceOf(expectedException, e));
}
}
public static Subject createSubjectWith(WorkerToken wt) {
//This is a bit ugly, but it shows how this would happen in a worker so we will use the same APIs
Map<String, String> creds = new HashMap<>();
ClientAuthUtils.setWorkerToken(creds, wt);
Subject subject = new Subject();
ClientAuthUtils.updateSubject(subject, Collections.emptyList(), creds);
return subject;
}
public static void tryConnectAs(Map<String, Object> conf, ThriftServer server, Subject subject, String topoId)
throws PrivilegedActionException {
Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate(topoId); //Yes this should be a topo name, but it makes this simpler...
}
return null;
});
}
public static Subject testConnectWithTokenFor(WorkerTokenManager wtMan, Map<String, Object> conf, ThriftServer server,
String user, String topoId) throws PrivilegedActionException {
WorkerToken wt = wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId);
Subject subject = createSubjectWith(wt);
tryConnectAs(conf, server, subject, topoId);
return subject;
}
public static void verifyUserIs(AtomicReference<ReqContext> user, String userName) {
//The user from the token is bob, so verify that the name was set correctly...
ReqContext found = user.get();
assertNotNull(found);
assertEquals(userName, found.principal().getName());
assertFalse(found.isImpersonating());
user.set(null);
}
public static ReqContext mkImpersonatingReqContext(String impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) {
ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated));
ret.setRemoteAddress(remoteAddress);
ret.setRealPrincipal(mkPrincipal(impersonatingUser));
return ret;
}
@Test
public void kerbToLocalTest() {
KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal();
kptol.prepare(Collections.emptyMap());
assertEquals("me", kptol.toLocal(mkPrincipal("me@realm")));
assertEquals("simple", kptol.toLocal(mkPrincipal("simple")));
assertEquals("someone", kptol.toLocal(mkPrincipal("someone/host@realm")));
}
@Test
public void simpleAuthTest() throws Exception {
Nimbus.Iface impl = mock(Nimbus.Iface.class);
withServer(SimpleTransportPlugin.class,
impl,
(ThriftServer server, Map<String, Object> conf) -> {
try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate("security_auth_test_topology");
}
//Verify digest is rejected...
Map<String, Object> badConf = new HashMap<>(conf);
badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, DigestSaslTransportPlugin.class.getName());
badConf.put("java.security.auth.login.config", DIGEST_JAAS_CONF);
badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate("bad_security_auth_test_topology");
fail("An exception should have been thrown trying to connect.");
} catch (Exception te) {
LOG.info("Got Exception...", te);
assert (Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
}
});
verify(impl).activate("security_auth_test_topology");
verify(impl, never()).activate("bad_security_auth_test_topology");
}
@Test
public void digestAuthTest() throws Exception {
Nimbus.Iface impl = mock(Nimbus.Iface.class);
final AtomicReference<ReqContext> user = new AtomicReference<>();
doAnswer((invocation) -> {
user.set(new ReqContext(ReqContext.context()));
return null;
}).when(impl).activate(anyString());
withServer(DIGEST_JAAS_CONF,
DigestSaslTransportPlugin.class,
impl,
(ThriftServer server, Map<String, Object> conf) -> {
try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate("security_auth_test_topology");
}
conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
//Verify simple is rejected...
Map<String, Object> badTransport = new HashMap<>(conf);
badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
try (NimbusClient client = new NimbusClient(badTransport, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate("bad_security_auth_test_topology");
fail("An exception should have been thrown trying to connect.");
} catch (Exception te) {
LOG.info("Got Exception...", te);
assert (Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
}
//The user here from the jaas conf is bob. No impersonation is done, so verify that
ReqContext found = user.get();
assertNotNull(found);
assertEquals("bob", found.principal().getName());
assertFalse(found.isImpersonating());
user.set(null);
verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF, TTransportException.class);
verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF, TTransportException.class);
verifyIncorrectJaasConf(server, conf, "./nonexistent.conf", RuntimeException.class);
verifyIncorrectJaasConf(server, conf, MISSING_CLIENT, IOException.class);
});
verify(impl).activate("security_auth_test_topology");
verify(impl, never()).activate("bad_auth_test_topology");
}
@Test
public void workerTokenDigestAuthTest() throws Exception {
LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n");
Nimbus.Iface impl = mock(Nimbus.Iface.class);
final AtomicReference<ReqContext> user = new AtomicReference<>();
doAnswer((invocation) -> {
user.set(new ReqContext(ReqContext.context()));
return null;
}).when(impl).activate(anyString());
Map<String, Object> extraConfs = new HashMap<>();
//Let worker tokens work on insecure ZK...
extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true);
try (InProcessZookeeper zk = new InProcessZookeeper()) {
withServer(MISSING_CLIENT,
DigestSaslTransportPlugin.class,
impl,
zk,
extraConfs,
(ThriftServer server, Map<String, Object> conf) -> {
try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
//We cannot connect if there is no client section in the jaas conf...
try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
client.getClient().activate("bad_auth_test_topology");
fail("We should not be able to connect without a token...");
} catch (Exception e) {
assert (Utils.exceptionCauseIsInstanceOf(IOException.class, e));
}
//Now lets create a token and verify that we can connect...
IStormClusterState state =
ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
WorkerTokenManager wtMan = new WorkerTokenManager(conf, state);
Subject bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob");
verifyUserIs(user, "bob");
Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12));
//Alice has no digest jaas section at all...
Subject alice = testConnectWithTokenFor(wtMan, conf, server, "alice", "topo-alice");
verifyUserIs(user, "alice");
Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13));
//Verify that bob's token has expired
try {
tryConnectAs(conf, server, bob, "bad_auth_test_topology");
fail("We should not be able to connect with bad auth");
} catch (Exception e) {
assert (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e));
}
tryConnectAs(conf, server, alice, "topo-alice");
verifyUserIs(user, "alice");
//Now see if we can create a new token for bob and try again.
bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob");
verifyUserIs(user, "bob");
tryConnectAs(conf, server, alice, "topo-alice");
verifyUserIs(user, "alice");
}
});
}
verify(impl, times(2)).activate("topo-bob");
verify(impl, times(3)).activate("topo-alice");
verify(impl, never()).activate("bad_auth_test_topology");
LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n");
}
@Test
public void negativeWhitelistAuthroizationTest() {
SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
Map<String, Object> conf = ConfigUtils.readStormConfig();
auth.prepare(conf);
ReqContext context = new ReqContext(mkSubject("user"));
assertFalse(auth.permit(context, "activate", conf));
}
@Test
public void positiveWhitelistAuthroizationTest() {
SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF, Arrays.asList("user"));
auth.prepare(conf);
ReqContext context = new ReqContext(mkSubject("user"));
assertTrue(auth.permit(context, "activate", conf));
}
@Test
public void simpleAclUserAuthTest() {
Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
ReqContext admin = new ReqContext(mkSubject("admin"));
ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
ReqContext userA = new ReqContext(mkSubject("user-a"));
ReqContext userB = new ReqContext(mkSubject("user-b"));
final Map<String, Object> empty = Collections.emptyMap();
final Map<String, Object> aAllowed = new HashMap<>();
aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
authorizer.prepare(clusterConf);
assertTrue(authorizer.permit(userA, "submitTopology", empty));
assertTrue(authorizer.permit(userB, "submitTopology", empty));
assertTrue(authorizer.permit(admin, "submitTopology", empty));
assertFalse(authorizer.permit(supervisor, "submitTopology", empty));
assertTrue(authorizer.permit(userA, "fileUpload", null));
assertTrue(authorizer.permit(userB, "fileUpload", null));
assertTrue(authorizer.permit(admin, "fileUpload", null));
assertFalse(authorizer.permit(supervisor, "fileUpload", null));
assertTrue(authorizer.permit(userA, "getNimbusConf", null));
assertTrue(authorizer.permit(userB, "getNimbusConf", null));
assertTrue(authorizer.permit(admin, "getNimbusConf", null));
assertFalse(authorizer.permit(supervisor, "getNimbusConf", null));
assertTrue(authorizer.permit(userA, "getClusterInfo", null));
assertTrue(authorizer.permit(userB, "getClusterInfo", null));
assertTrue(authorizer.permit(admin, "getClusterInfo", null));
assertFalse(authorizer.permit(supervisor, "getClusterInfo", null));
assertFalse(authorizer.permit(userA, "fileDownload", null));
assertFalse(authorizer.permit(userB, "fileDownload", null));
assertTrue(authorizer.permit(admin, "fileDownload", null));
assertTrue(authorizer.permit(supervisor, "fileDownload", null));
assertTrue(authorizer.permit(userA, "killTopology", aAllowed));
assertFalse(authorizer.permit(userB, "killTopology", aAllowed));
assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
assertFalse(authorizer.permit(supervisor, "killTopology", aAllowed));
assertTrue(authorizer.permit(userA, "uploadNewCredentials", aAllowed));
assertFalse(authorizer.permit(userB, "uploadNewCredentials", aAllowed));
assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
assertFalse(authorizer.permit(supervisor, "uploadNewCredentials", aAllowed));
assertTrue(authorizer.permit(userA, "rebalance", aAllowed));
assertFalse(authorizer.permit(userB, "rebalance", aAllowed));
assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed));
assertTrue(authorizer.permit(userA, "activate", aAllowed));
assertFalse(authorizer.permit(userB, "activate", aAllowed));
assertTrue(authorizer.permit(admin, "activate", aAllowed));
assertFalse(authorizer.permit(supervisor, "activate", aAllowed));
assertTrue(authorizer.permit(userA, "deactivate", aAllowed));
assertFalse(authorizer.permit(userB, "deactivate", aAllowed));
assertTrue(authorizer.permit(admin, "deactivate", aAllowed));
assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed));
assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed));
assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed));
assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
assertFalse(authorizer.permit(supervisor, "getTopologyConf", aAllowed));
assertTrue(authorizer.permit(userA, "getTopology", aAllowed));
assertFalse(authorizer.permit(userB, "getTopology", aAllowed));
assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
assertFalse(authorizer.permit(supervisor, "getTopology", aAllowed));
assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed));
assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed));
assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
assertFalse(authorizer.permit(supervisor, "getUserTopology", aAllowed));
assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed));
assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed));
assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
assertFalse(authorizer.permit(supervisor, "getTopologyInfo", aAllowed));
}
@Test
public void simpleAclNimbusUsersAuthTest() {
Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
ReqContext admin = new ReqContext(mkSubject("admin"));
ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
ReqContext userA = new ReqContext(mkSubject("user-a"));
ReqContext userB = new ReqContext(mkSubject("user-b"));
final Map<String, Object> empty = Collections.emptyMap();
SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
authorizer.prepare(clusterConf);
assertTrue(authorizer.permit(userA, "submitTopology", empty));
assertFalse(authorizer.permit(userB, "submitTopology", empty));
assertTrue(authorizer.permit(admin, "fileUpload", null));
assertTrue(authorizer.permit(supervisor, "fileDownload", null));
}
@Test
public void simpleAclNimbusGroupsAuthTest() {
Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
clusterConf.put(Config.NIMBUS_ADMINS_GROUPS, Arrays.asList("admin-group"));
clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, FixedGroupsMapping.class.getName());
Map<String, Object> groups = new HashMap<>();
groups.put("admin", Collections.singleton("admin-group"));
groups.put("not-admin", Collections.singleton("not-admin-group"));
Map<String, Object> groupsParams = new HashMap<>();
groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groups);
clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, groupsParams);
ReqContext admin = new ReqContext(mkSubject("admin"));
ReqContext notAdmin = new ReqContext(mkSubject("not-admin"));
ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
ReqContext userA = new ReqContext(mkSubject("user-a"));
ReqContext userB = new ReqContext(mkSubject("user-b"));
final Map<String, Object> empty = Collections.emptyMap();
SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
authorizer.prepare(clusterConf);
assertTrue(authorizer.permit(userA, "submitTopology", empty));
assertFalse(authorizer.permit(userB, "submitTopology", empty));
assertTrue(authorizer.permit(admin, "fileUpload", null));
assertFalse(authorizer.permit(notAdmin, "fileUpload", null));
assertFalse(authorizer.permit(userB, "fileUpload", null));
assertTrue(authorizer.permit(supervisor, "fileDownload", null));
}
@Test
public void simpleAclSameUserAuthTest() {
Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("admin"));
ReqContext admin = new ReqContext(mkSubject("admin"));
final Map<String, Object> empty = Collections.emptyMap();
final Map<String, Object> aAllowed = new HashMap<>();
aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
authorizer.prepare(clusterConf);
assertTrue(authorizer.permit(admin, "submitTopology", empty));
assertTrue(authorizer.permit(admin, "fileUpload", null));
assertTrue(authorizer.permit(admin, "getNimbusConf", null));
assertTrue(authorizer.permit(admin, "getClusterInfo", null));
assertTrue(authorizer.permit(admin, "fileDownload", null));
assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
assertTrue(authorizer.permit(admin, "activate", aAllowed));
assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
}
@Test
public void shellBaseGroupsMappingTest() throws Exception {
Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping();
groups.prepare(clusterConf);
String userName = System.getProperty("user.name");
assertTrue(groups.getGroups(userName).size() >= 0);
assertEquals(0, groups.getGroups("userDoesNotExist").size());
assertEquals(0, groups.getGroups(null).size());
}
@Test(expected = RuntimeException.class)
public void getTransportPluginThrowsRunimeTest() {
Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
}
@Test
public void impersonationAuthorizerTest() throws Exception {
final String impersonatingUser = "admin";
final String userBeingImpersonated = System.getProperty("user.name");
Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
ShellBasedGroupsMapping groupMapper = new ShellBasedGroupsMapping();
groupMapper.prepare(clusterConf);
Set<String> groups = groupMapper.getGroups(userBeingImpersonated);
InetAddress localHost = InetAddress.getLocalHost();
Map<String, Object> acl = new HashMap<>();
Map<String, Object> aclConf = new HashMap<>();
aclConf.put("hosts", Arrays.asList(localHost.getHostName()));
aclConf.put("groups", groups);
acl.put(impersonatingUser, aclConf);
clusterConf.put(Config.NIMBUS_IMPERSONATION_ACL, acl);
InetAddress unauthorizedHost = com.google.common.net.InetAddresses.forString("10.10.10.10");
ImpersonationAuthorizer authorizer = new ImpersonationAuthorizer();
authorizer.prepare(clusterConf);
//non impersonating request, should be permitted.
assertTrue(authorizer.permit(new ReqContext(mkSubject("anyuser")), "fileUplaod", null));
//user with no impersonation acl should be reject
assertFalse(authorizer.permit(mkImpersonatingReqContext("user-with-no-acl", userBeingImpersonated, localHost),
"someOperation", null));
//request from hosts that are not authorized should be rejected
assertFalse(authorizer.permit(mkImpersonatingReqContext(impersonatingUser, userBeingImpersonated, unauthorizedHost),
"someOperation", null));
//request to impersonate users from unauthroized groups should be rejected.
assertFalse(authorizer.permit(mkImpersonatingReqContext(impersonatingUser, "unauthorized-user", localHost),
"someOperation", null));
//request from authorized hosts and group should be allowed.
assertTrue(authorizer.permit(mkImpersonatingReqContext(impersonatingUser, userBeingImpersonated, localHost),
"someOperation", null));
}
public interface MyBiConsumer<T, U> {
void accept(T t, U u) throws Exception;
}
}