blob: 6a9b703ee399e5dcfc5d1f6a97b67be6fd1c0d4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.security.auth;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.URI;
import java.security.MessageDigest;
import java.security.URIParameter;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import org.apache.storm.Config;
import org.apache.storm.generated.WorkerToken;
import org.apache.storm.generated.WorkerTokenInfo;
import org.apache.storm.generated.WorkerTokenServiceType;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.shade.org.apache.commons.codec.binary.Hex;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientAuthUtils {
public static final String LOGIN_CONTEXT_SERVER = "StormServer";
public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
public static final String LOGIN_CONTEXT_PACEMAKER_DIGEST = "PacemakerDigest";
public static final String LOGIN_CONTEXT_PACEMAKER_SERVER = "PacemakerServer";
public static final String LOGIN_CONTEXT_PACEMAKER_CLIENT = "PacemakerClient";
public static final String SERVICE = "storm_thrift_server";
private static final Logger LOG = LoggerFactory.getLogger(ClientAuthUtils.class);
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
public static String getJaasConf(Map<String, Object> topoConf) {
return (String) topoConf.get("java.security.auth.login.config");
}
/**
* Construct a JAAS configuration object per storm configuration file.
*
* @param topoConf Storm configuration
* @return JAAS configuration object
*/
public static Configuration getConfiguration(Map<String, Object> topoConf) {
Configuration loginConf = null;
//find login file configuration from Storm configuration
String loginConfigurationFile = getJaasConf(topoConf);
if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) {
File configFile = new File(loginConfigurationFile);
if (!configFile.canRead()) {
throw new RuntimeException("File " + loginConfigurationFile + " cannot be read.");
}
try {
URI configUri = configFile.toURI();
loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return loginConf;
}
/**
* Get configurations for a section.
*
* @param configuration The config to pull the key/value pairs out of.
* @param section The app configuration entry name to get stuff from.
* @return Return array of config entries or null if configuration is null
*/
public static AppConfigurationEntry[] getEntries(Configuration configuration,
String section) throws IOException {
if (configuration == null) {
return null;
}
AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(section);
if (configurationEntries == null) {
String errorMessage = "Could not find a '" + section + "' entry in this configuration.";
throw new IOException(errorMessage);
}
return configurationEntries;
}
/**
* Pull a set of keys out of a Configuration.
*
* @param topoConf The config containing the jaas conf file.
* @param section The app configuration entry name to get stuff from.
* @return Return a map of the configs in conf.
*/
public static SortedMap<String, ?> pullConfig(Map<String, Object> topoConf,
String section) throws IOException {
Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
if (configurationEntries == null) {
return null;
}
TreeMap<String, Object> results = new TreeMap<>();
for (AppConfigurationEntry entry : configurationEntries) {
Map<String, ?> options = entry.getOptions();
for (String key : options.keySet()) {
results.put(key, options.get(key));
}
}
return results;
}
/**
* Pull a the value given section and key from Configuration.
*
* @param topoConf The config containing the jaas conf file.
* @param section The app configuration entry name to get stuff from.
* @param key The key to look up inside of the section
* @return Return a the String value of the configuration value
*/
public static String get(Map<String, Object> topoConf, String section, String key) throws IOException {
Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
return get(configuration, section, key);
}
static String get(Configuration configuration, String section, String key) throws IOException {
AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
if (configurationEntries == null) {
return null;
}
for (AppConfigurationEntry entry : configurationEntries) {
Object val = entry.getOptions().get(key);
if (val != null) {
return (String) val;
}
}
return null;
}
/**
* Construct a principal to local plugin.
*
* @param topoConf storm configuration
* @return the plugin
*/
public static IPrincipalToLocal getPrincipalToLocalPlugin(Map<String, Object> topoConf) {
IPrincipalToLocal ptol = null;
try {
String ptolClassname = (String) topoConf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
if (ptolClassname == null) {
LOG.warn("No principal to local given {}", Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
} else {
ptol = ReflectionUtils.newInstance(ptolClassname);
//TODO this can only ever be null if someone is doing something odd with mocking
// We should really fix the mocking and remove this
if (ptol != null) {
ptol.prepare(topoConf);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return ptol;
}
/**
* Construct a group mapping service provider plugin.
*
* @param conf daemon configuration
* @return the plugin
*/
public static IGroupMappingServiceProvider getGroupMappingServiceProviderPlugin(Map<String, Object> conf) {
IGroupMappingServiceProvider gmsp = null;
try {
String gmspClassName = (String) conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
if (gmspClassName == null) {
LOG.warn("No group mapper given {}", Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
} else {
gmsp = ReflectionUtils.newInstance(gmspClassName);
if (gmsp != null) {
gmsp.prepare(conf);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return gmsp;
}
/**
* Get all of the configured Credential Renewer Plugins.
*
* @param conf the storm configuration to use.
* @return the configured credential renewers.
*/
public static Collection<ICredentialsRenewer> getCredentialRenewers(Map<String, Object> conf) {
try {
Set<ICredentialsRenewer> ret = new HashSet<>();
Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
if (clazzes != null) {
for (String clazz : clazzes) {
ICredentialsRenewer inst = ReflectionUtils.newInstance(clazz);
inst.prepare(conf);
ret.add(inst);
}
}
return ret;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Get all the Nimbus Auto cred plugins.
*
* @param conf nimbus configuration to use.
* @return nimbus auto credential plugins.
*/
public static Collection<INimbusCredentialPlugin> getNimbusAutoCredPlugins(Map<String, Object> conf) {
try {
Set<INimbusCredentialPlugin> ret = new HashSet<>();
Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
if (clazzes != null) {
for (String clazz : clazzes) {
INimbusCredentialPlugin inst = ReflectionUtils.newInstance(clazz);
inst.prepare(conf);
ret.add(inst);
}
}
return ret;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Get all of the configured AutoCredential Plugins.
*
* @param topoConf the storm configuration to use.
* @return the configured auto credentials.
*/
public static Collection<IAutoCredentials> getAutoCredentials(Map<String, Object> topoConf) {
try {
Set<IAutoCredentials> autos = new HashSet<>();
Collection<String> clazzes = (Collection<String>) topoConf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
if (clazzes != null) {
for (String clazz : clazzes) {
IAutoCredentials a = ReflectionUtils.newInstance(clazz);
a.prepare(topoConf);
autos.add(a);
}
}
LOG.info("Got AutoCreds " + autos);
return autos;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Get the key used to store a WorkerToken in the credentials map.
*
* @param type the type of service to get.
* @return the key as a String.
*/
public static String workerTokenCredentialsKey(WorkerTokenServiceType type) {
return "STORM_WORKER_TOKEN_" + type.name();
}
/**
* Read a WorkerToken out of credentials for the given type.
*
* @param credentials the credentials map.
* @param type the type of service we are looking for.
* @return the deserialized WorkerToken or null if none could be found.
*/
public static WorkerToken readWorkerToken(Map<String, String> credentials, WorkerTokenServiceType type) {
WorkerToken ret = null;
String key = workerTokenCredentialsKey(type);
String tokenStr = credentials.get(key);
if (tokenStr != null) {
ret = Utils.deserializeFromString(tokenStr, WorkerToken.class);
}
return ret;
}
/**
* Store a worker token in some credentials. It can be pulled back out by calling readWorkerToken.
*
* @param credentials the credentials map.
* @param token the token you want to store.
*/
public static void setWorkerToken(Map<String, String> credentials, WorkerToken token) {
String key = workerTokenCredentialsKey(token.get_serviceType());
credentials.put(key, Utils.serializeToString(token));
}
/**
* Find a worker token in a given subject with a given token type.
*
* @param subject what to look in.
* @param type the type of token to look for.
* @return the token or null.
*/
public static WorkerToken findWorkerToken(Subject subject, final WorkerTokenServiceType type) {
Set<WorkerToken> creds = subject.getPrivateCredentials(WorkerToken.class);
synchronized (creds) {
return creds.stream()
.filter((wt) ->
wt.get_serviceType() == type)
.findAny().orElse(null);
}
}
private static boolean willWorkerTokensBeStoredSecurely(Map<String, Object> conf) {
boolean overrideZkAuth = ObjectReader.getBoolean(conf.get("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS"), false);
if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
return true;
} else if (overrideZkAuth) {
LOG.error("\n\n\t\tYOU HAVE ENABLED INSECURE WORKER TOKENS. IF THIS IS NOT A UNIT TEST PLEASE STOP NOW!!!\n\n");
return true;
}
return false;
}
/**
* Check if worker tokens should be enabled on the server side or not.
*
* @param server a Thrift server to know if the transport support tokens or not. No need to create a token if the transport does not
* support it.
* @param conf the daemon configuration to be sure the tokens are secure.
* @return true if we can enable them, else false.
*/
public static boolean areWorkerTokensEnabledServer(ThriftServer server, Map<String, Object> conf) {
return server.supportsWorkerTokens() && willWorkerTokensBeStoredSecurely(conf);
}
/**
* Check if worker tokens should be enabled on the server side or not (for a given server).
*
* @param connectionType the type of server this is for.
* @param conf the daemon configuration to be sure the tokens are secure.
* @return true if we can enable them, else false.
*/
public static boolean areWorkerTokensEnabledServer(ThriftConnectionType connectionType, Map<String, Object> conf) {
return connectionType.getWtType() != null && willWorkerTokensBeStoredSecurely(conf);
}
/**
* Turn a WorkerTokenInfo in a byte array.
*
* @param wti what to serialize.
* @return the resulting byte array.
*/
public static byte[] serializeWorkerTokenInfo(WorkerTokenInfo wti) {
return Utils.serialize(wti);
}
/**
* Get and deserialize the WorkerTokenInfo in the worker token.
*
* @param wt the token.
* @return the deserialized info.
*/
public static WorkerTokenInfo getWorkerTokenInfo(WorkerToken wt) {
return Utils.deserialize(wt.get_info(), WorkerTokenInfo.class);
}
//Support for worker tokens Similar to an IAutoCredentials implementation
private static Subject insertWorkerTokens(Subject subject, Map<String, String> credentials) {
if (credentials == null) {
return subject;
}
for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
WorkerToken token = readWorkerToken(credentials, type);
if (token != null) {
Set<Object> creds = subject.getPrivateCredentials();
synchronized (creds) {
WorkerToken previous = findWorkerToken(subject, type);
creds.add(token);
if (previous != null) {
creds.remove(previous);
}
}
}
}
return subject;
}
/**
* Populate a subject from credentials using the IAutoCredentials.
*
* @param subject the subject to populate or null if a new Subject should be created.
* @param autos the IAutoCredentials to call to populate the subject.
* @param credentials the credentials to pull from
* @return the populated subject.
*/
public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) {
try {
if (subject == null) {
subject = new Subject();
}
for (IAutoCredentials autoCred : autos) {
autoCred.populateSubject(subject, credentials);
}
return insertWorkerTokens(subject, credentials);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Update a subject from credentials using the IAutoCredentials.
*
* @param subject the subject to update
* @param autos the IAutoCredentials to call to update the subject.
* @param credentials the credentials to pull from
*/
public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) {
if (subject == null || autos == null) {
throw new RuntimeException("The subject or auto credentials cannot be null when updating a subject with credentials");
}
try {
for (IAutoCredentials autoCred : autos) {
autoCred.updateSubject(subject, credentials);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
insertWorkerTokens(subject, credentials);
}
/**
* Construct a transport plugin per storm configuration.
*/
public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf) {
try {
String transportPluginClassName = type.getTransportPlugin(topoConf);
ITransportPlugin transportPlugin = ReflectionUtils.newInstance(transportPluginClassName);
transportPlugin.prepare(type, topoConf);
return transportPlugin;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static String makeDigestPayload(Map<String, Object> topoConf, String configSection) {
String username = null;
String password = null;
try {
Map<String, ?> results = ClientAuthUtils.pullConfig(topoConf, configSection);
username = (String) results.get(USERNAME);
password = (String) results.get(PASSWORD);
} catch (Exception e) {
LOG.error("Failed to pull username/password out of jaas conf", e);
}
if (username == null || password == null) {
return null;
}
try {
MessageDigest digest = MessageDigest.getInstance("SHA-512");
byte[] output = digest.digest((username + ":" + password).getBytes());
return Hex.encodeHexString(output);
} catch (java.security.NoSuchAlgorithmException e) {
LOG.error("Cant run SHA-512 digest. Algorithm not available.", e);
throw new RuntimeException(e);
}
}
public static byte[] serializeKerberosTicket(KerberosTicket tgt) throws Exception {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bao);
out.writeObject(tgt);
out.flush();
out.close();
return bao.toByteArray();
}
public static KerberosTicket deserializeKerberosTicket(byte[] tgtBytes) {
KerberosTicket ret;
try {
ByteArrayInputStream bin = new ByteArrayInputStream(tgtBytes);
ObjectInputStream in = new ObjectInputStream(bin);
ret = (KerberosTicket) in.readObject();
in.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
return ret;
}
public static KerberosTicket cloneKerberosTicket(KerberosTicket kerberosTicket) {
if (kerberosTicket != null) {
try {
return (deserializeKerberosTicket(serializeKerberosTicket(kerberosTicket)));
} catch (Exception e) {
throw new RuntimeException("Failed to clone KerberosTicket TGT!!", e);
}
}
return null;
}
}