blob: 8854968436854563c9b464feb3c45ed9b65fc81e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a server. There are
* two possible supported variants of SASL negotiation: either a general-purpose
* negotiation supporting any quality of protection, or a specialized
* negotiation that enforces privacy as the quality of protection using a
* cryptographically strong encryption key.
*
* This class is used in the DataNode for handling inbound connections.
*/
@InterfaceAudience.Private
public class SaslDataTransferServer {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferServer.class);
private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private final DNConf dnConf;
// Store the most recent successfully negotiated QOP,
// for testing purpose only
private String negotiatedQOP;
/**
* Creates a new SaslDataTransferServer.
*
* @param dnConf configuration of DataNode
* @param blockPoolTokenSecretManager used for checking block access tokens
* and encryption keys
*/
public SaslDataTransferServer(DNConf dnConf,
BlockPoolTokenSecretManager blockPoolTokenSecretManager) {
this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
this.dnConf = dnConf;
}
/**
* Receives SASL negotiation from a peer on behalf of a server.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param int xferPort data transfer port of DataNode accepting connection
* @param datanodeId ID of DataNode accepting connection
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public IOStreamPair receive(Peer peer, OutputStream underlyingOut,
InputStream underlyingIn, int xferPort, DatanodeID datanodeId)
throws IOException {
if (dnConf.getEncryptDataTransfer()) {
LOG.debug(
"SASL server doing encrypted handshake for peer = {}, datanodeId = {}",
peer, datanodeId);
return getEncryptedStreams(peer, underlyingOut, underlyingIn);
} else if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug(
"SASL server skipping handshake in unsecured configuration for "
+ "peer = {}, datanodeId = {}", peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else if (SecurityUtil.isPrivilegedPort(xferPort)) {
LOG.debug(
"SASL server skipping handshake in secured configuration for "
+ "peer = {}, datanodeId = {}", peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else if (dnConf.getSaslPropsResolver() != null) {
LOG.debug(
"SASL server doing general handshake for peer = {}, datanodeId = {}",
peer, datanodeId);
return getSaslStreams(peer, underlyingOut, underlyingIn);
} else if (dnConf.getIgnoreSecurePortsForTesting()) {
// It's a secured cluster using non-privileged ports, but no SASL. The
// only way this can happen is if the DataNode has
// ignore.secure.ports.for.testing configured, so this is a rare edge case.
LOG.debug(
"SASL server skipping handshake in secured configuration with no SASL "
+ "protection configured for peer = {}, datanodeId = {}",
peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else {
// The error message here intentionally does not mention
// ignore.secure.ports.for.testing. That's intended for dev use only.
// This code path is not expected to execute ever, because DataNode startup
// checks for invalid configuration and aborts.
throw new IOException(String.format("Cannot create a secured " +
"connection if DataNode listens on unprivileged port (%d) and no " +
"protection is defined in configuration property %s.",
datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
}
}
/**
* Receives SASL negotiation for specialized encrypted handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(Peer peer,
OutputStream underlyingOut, InputStream underlyingIn) throws IOException {
if (peer.hasSecureChannel() ||
dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
return new IOStreamPair(underlyingIn, underlyingOut);
}
Map<String, String> saslProps = createSaslPropertiesForEncryption(
dnConf.getEncryptionAlgorithm());
if (LOG.isDebugEnabled()) {
LOG.debug("Server using encryption algorithm " +
dnConf.getEncryptionAlgorithm());
}
CallbackHandler callbackHandler = new SaslServerCallbackHandler(
new PasswordFunction() {
@Override
public char[] apply(String userName) throws IOException {
return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName));
}
});
return doSaslHandshake(peer, underlyingOut, underlyingIn, saslProps,
callbackHandler);
}
/**
* The SASL handshake for encrypted vs. general-purpose uses different logic
* for determining the password. This interface is used to parameterize that
* logic. It's similar to a Guava Function, but we need to let it throw
* exceptions.
*/
private interface PasswordFunction {
/**
* Returns the SASL password for the given user name.
*
* @param userName SASL user name
* @return SASL password
* @throws IOException for any error
*/
char[] apply(String userName) throws IOException;
}
/**
* Sets user name and password when asked by the server-side SASL object.
*/
private static final class SaslServerCallbackHandler
implements CallbackHandler {
private final PasswordFunction passwordFunction;
/**
* Creates a new SaslServerCallbackHandler.
*
* @param passwordFunction for determing the user's password
*/
public SaslServerCallbackHandler(PasswordFunction passwordFunction) {
this.passwordFunction = passwordFunction;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof RealmCallback) {
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL DIGEST-MD5 Callback: " + callback);
}
}
if (pc != null) {
pc.setPassword(passwordFunction.apply(nc.getDefaultName()));
}
if (ac != null) {
ac.setAuthorized(true);
ac.setAuthorizedID(ac.getAuthorizationID());
}
}
}
/**
* Given a secret manager and a username encoded for the encrypted handshake,
* determine the encryption key.
*
* @param userName containing the keyId, blockPoolId, and nonce.
* @return secret encryption key.
* @throws IOException
*/
private byte[] getEncryptionKeyFromUserName(String userName)
throws IOException {
String[] nameComponents = userName.split(NAME_DELIMITER);
if (nameComponents.length != 3) {
throw new IOException("Provided name '" + userName + "' has " +
nameComponents.length + " components instead of the expected 3.");
}
int keyId = Integer.parseInt(nameComponents[0]);
String blockPoolId = nameComponents[1];
byte[] nonce = Base64.decodeBase64(nameComponents[2]);
return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
blockPoolId, nonce);
}
/**
* Receives SASL negotiation for general-purpose handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
InputStream underlyingIn) throws IOException {
if (peer.hasSecureChannel() ||
dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
return new IOStreamPair(underlyingIn, underlyingOut);
}
SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
Map<String, String> saslProps = saslPropsResolver.getServerProperties(
getPeerAddress(peer));
CallbackHandler callbackHandler = new SaslServerCallbackHandler(
new PasswordFunction() {
@Override
public char[] apply(String userName) throws IOException {
return buildServerPassword(userName);
}
});
return doSaslHandshake(peer, underlyingOut, underlyingIn, saslProps,
callbackHandler);
}
/**
* Calculates the expected correct password on the server side for the
* general-purpose handshake. The password consists of the block access
* token's password (known to the DataNode via its secret manager). This
* expects that the client has supplied a user name consisting of its
* serialized block access token identifier.
*
* @param userName SASL user name containing serialized block access token
* identifier
* @return expected correct SASL password
* @throws IOException for any error
*/
private char[] buildServerPassword(String userName) throws IOException {
BlockTokenIdentifier identifier = deserializeIdentifier(userName);
byte[] tokenPassword = blockPoolTokenSecretManager.retrievePassword(
identifier);
return (new String(Base64.encodeBase64(tokenPassword, false),
Charsets.UTF_8)).toCharArray();
}
/**
* Deserializes a base64-encoded binary representation of a block access
* token.
*
* @param str String to deserialize
* @return BlockTokenIdentifier deserialized from str
* @throws IOException if there is any I/O error
*/
private BlockTokenIdentifier deserializeIdentifier(String str)
throws IOException {
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
identifier.readFields(new DataInputStream(new ByteArrayInputStream(
Base64.decodeBase64(str))));
return identifier;
}
@VisibleForTesting
public String getNegotiatedQOP() {
return negotiatedQOP;
}
/**
* This method actually executes the server-side SASL handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param saslProps properties of SASL negotiation
* @param callbackHandler for responding to SASL callbacks
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair doSaslHandshake(Peer peer, OutputStream underlyingOut,
InputStream underlyingIn, Map<String, String> saslProps,
CallbackHandler callbackHandler) throws IOException {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
int magicNumber = in.readInt();
if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber,
dnConf.getEncryptDataTransfer());
}
try {
// step 1
SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in);
byte[] secret = message.getSecret();
String bpid = message.getBpid();
if (secret != null || bpid != null) {
// sanity check, if one is null, the other must also not be null
assert(secret != null && bpid != null);
String qop = new String(secret, Charsets.UTF_8);
saslProps.put(Sasl.QOP, qop);
}
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
saslProps, callbackHandler);
byte[] remoteResponse = message.getPayload();
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
// step 2 (server-side only)
List<CipherOption> cipherOptions = Lists.newArrayList();
remoteResponse = readSaslMessageAndNegotiationCipherOptions(
in, cipherOptions);
localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
CipherOption cipherOption = null;
negotiatedQOP = sasl.getNegotiatedQop();
if (sasl.isNegotiatedQopPrivacy()) {
// Negotiate a cipher option
Configuration conf = dnConf.getConf();
cipherOption = negotiateCipherOption(conf, cipherOptions);
if (LOG.isDebugEnabled()) {
if (cipherOption == null) {
// No cipher suite is negotiated
String cipherSuites =
conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
if (cipherSuites != null && !cipherSuites.isEmpty()) {
// the server accepts some cipher suites, but the client does not.
LOG.debug("Server accepts cipher suites {}, "
+ "but client {} does not accept any of them",
cipherSuites, peer.getRemoteAddressString());
}
} else {
LOG.debug("Server using cipher suite {} with client {}",
cipherOption.getCipherSuite().getName(),
peer.getRemoteAddressString());
}
}
}
// If negotiated cipher option is not null, wrap it before sending.
sendSaslMessageAndNegotiatedCipherOption(out, localResponse,
wrap(cipherOption, sasl));
// If negotiated cipher option is not null, we will use it to create
// stream pair.
return cipherOption != null ? createStreamPair(
dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) :
sasl.createStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&
ioe.getCause() instanceof InvalidEncryptionKeyException) {
// This could just be because the client is long-lived and hasn't gotten
// a new encryption key from the NN in a while. Upon receiving this
// error, the client will get a new encryption key from the NN and retry
// connecting to this DN.
sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
} else {
sendGenericSaslErrorMessage(out, ioe.getMessage());
}
throw ioe;
}
}
/**
* Sends a SASL negotiation message indicating an invalid key error.
*
* @param out stream to receive message
* @param message to send
* @throws IOException for any error
*/
private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
message);
}
}