blob: fefcdc66f1bad9a94263bfb0855d8505006592db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.messaging.netty;
import java.io.IOException;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.LoginException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.KerberosPrincipalToLocal;
import org.apache.storm.shade.org.apache.zookeeper.server.auth.KerberosName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class KerberosSaslNettyServer {
private static final Logger LOG = LoggerFactory
.getLogger(KerberosSaslNettyServer.class);
private SaslServer saslServer;
private Subject subject;
private List<String> authorizedUsers;
KerberosSaslNettyServer(Map<String, Object> topoConf, String jaasSection, List<String> authorizedUsers) {
this.authorizedUsers = authorizedUsers;
LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(authorizedUsers);
String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
//login our principal
subject = null;
try {
LOG.debug("Trying to login using {}.", jaasConfFile);
Login login = new Login(jaasSection, ch, jaasConfFile);
subject = login.getSubject();
LOG.debug("Got Subject: {}", subject.toString());
} catch (LoginException ex) {
LOG.error("Server failed to login in principal:", ex);
throw new RuntimeException(ex);
}
//check the credential of our principal
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
LOG.error("Failed to verifyuser principal.");
throw new RuntimeException("Fail to verify user principal with section \""
+ jaasSection
+ "\" in login configuration file "
+ jaasConfFile);
}
try {
LOG.info("Creating Kerberos Server.");
final CallbackHandler fch = ch;
Principal p = (Principal) subject.getPrincipals().toArray()[0];
KerberosName kerberosName = new KerberosName(p.getName());
final String hostName = kerberosName.getHostName();
final String serviceName = kerberosName.getServiceName();
LOG.debug("Server with host: {}", hostName);
saslServer =
Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
@Override
public SaslServer run() {
try {
Map<String, String> props = new TreeMap<String, String>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
return Sasl.createSaslServer(SaslUtils.KERBEROS,
serviceName,
hostName, props, fch);
} catch (Exception e) {
LOG.error("Subject failed to create sasl server.", e);
return null;
}
}
});
LOG.info("Got Server: {}", saslServer);
} catch (PrivilegedActionException e) {
LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
throw new RuntimeException(e);
}
}
public boolean isComplete() {
return saslServer.isComplete();
}
public String getUserName() {
return saslServer.getAuthorizationID();
}
/**
* Used by SaslTokenMessage::processToken() to respond to server SASL tokens.
*
* @param token Server's SASL token
* @return token to send back to the server.
*/
public byte[] response(final byte[] token) {
try {
byte[] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
@Override
public byte[] run() {
try {
LOG.debug("response: Responding to input token of length: {}",
token.length);
byte[] retval = saslServer.evaluateResponse(token);
return retval;
} catch (SaslException e) {
LOG.error("response: Failed to evaluate client token of length: {} : {}",
token.length, e);
throw new RuntimeException(e);
}
}
});
return retval;
} catch (PrivilegedActionException e) {
LOG.error("Failed to generate response for token: ", e);
throw new RuntimeException(e);
}
}
/**
* CallbackHandler for SASL DIGEST-MD5 mechanism.
*/
public static class KerberosSaslCallbackHandler implements CallbackHandler {
/**
* Used to authenticate the clients.
*/
private List<String> authorizedUsers;
KerberosSaslCallbackHandler(List<String> authorizedUsers) {
LOG.debug("KerberosSaslCallback: Creating KerberosSaslCallback handler.");
this.authorizedUsers = authorizedUsers;
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
LOG.info("Kerberos Callback Handler got callback: {}", callback.getClass());
if (callback instanceof AuthorizeCallback) {
AuthorizeCallback ac = (AuthorizeCallback) callback;
if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
LOG.debug("{} != {}", ac.getAuthenticationID(), ac.getAuthorizationID());
continue;
}
LOG.debug("Authorized Users: {}", authorizedUsers);
LOG.debug("Checking authorization for: {}", ac.getAuthorizationID());
for (String user : authorizedUsers) {
String requester = ac.getAuthorizationID();
KerberosPrincipal principal = new KerberosPrincipal(requester);
requester = new KerberosPrincipalToLocal().toLocal(principal);
if (requester.equals(user)) {
ac.setAuthorized(true);
break;
}
}
}
}
}
}
}