blob: 0adb41e7292f352fecd57fdf2cbc32b8f18528f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.crypto.SecretKey;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.hdfs.net.EncryptedPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a client. There are
* two possible supported variants of SASL negotiation: either a general-purpose
* negotiation supporting any quality of protection, or a specialized
* negotiation that enforces privacy as the quality of protection using a
* cryptographically strong encryption key.
*
* This class is used in both the HDFS client and the DataNode. The DataNode
* needs it, because it acts as a client to other DataNodes during write
* pipelines and block transfers.
*/
@InterfaceAudience.Private
public class SaslDataTransferClient {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferClient.class);
private final Configuration conf;
private final AtomicBoolean fallbackToSimpleAuth;
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
// Store the most recent successfully negotiated QOP,
// for testing purpose only
private String targetQOP;
/**
* Creates a new SaslDataTransferClient. This constructor is used in cases
* where it is not relevant to track if a secure client did a fallback to
* simple auth. For intra-cluster connections between data nodes in the same
* cluster, we can assume that all run under the same security configuration.
*
* @param conf the configuration
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
*/
public SaslDataTransferClient(Configuration conf,
SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver) {
this(conf, saslPropsResolver, trustedChannelResolver, null);
}
/**
* Creates a new SaslDataTransferClient.
*
* @param conf the configuration
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
* @param fallbackToSimpleAuth checked on each attempt at general SASL
* handshake, if true forces use of simple auth
*/
public SaslDataTransferClient(Configuration conf,
SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver,
AtomicBoolean fallbackToSimpleAuth) {
this.conf = conf;
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.saslPropsResolver = saslPropsResolver;
this.trustedChannelResolver = trustedChannelResolver;
}
/**
* Sends client SASL negotiation for a newly allocated socket if required.
*
* @param socket connection socket
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
// The encryption key factory only returns a key if encryption is enabled.
DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
encryptionKeyFactory.newDataEncryptionKey() : null;
IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
underlyingIn, encryptionKey, accessToken, datanodeId, null);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
/**
* Sends client SASL negotiation for a peer if required.
*
* @param peer connection peer
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
accessToken, datanodeId);
// TODO: Consider renaming EncryptedPeer to SaslPeer.
return ios != null ? new EncryptedPeer(peer, ios) : peer;
}
/**
* Sends client SASL negotiation for a socket if required.
*
* @param socket connection socket
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory,
accessToken, datanodeId, null);
}
public IOStreamPair socketSend(
Socket socket, OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
SecretKey secretKey)
throws IOException {
IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
underlyingIn, encryptionKeyFactory, accessToken, datanodeId,
secretKey);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
/**
* Checks if an address is already trusted and then sends client SASL
* negotiation if required.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKeyFactory for creation of an encryption key
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair checkTrustAndSend(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
return checkTrustAndSend(addr, underlyingOut, underlyingIn,
encryptionKeyFactory, accessToken, datanodeId, null);
}
private IOStreamPair checkTrustAndSend(
InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
SecretKey secretKey)
throws IOException {
boolean localTrusted = trustedChannelResolver.isTrusted();
boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
LOG.info("SASL encryption trust check: localHostTrusted = {}, "
+ "remoteHostTrusted = {}", localTrusted, remoteTrusted);
if (!localTrusted || !remoteTrusted) {
// The encryption key factory only returns a key if encryption is enabled.
DataEncryptionKey encryptionKey =
encryptionKeyFactory.newDataEncryptionKey();
return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
datanodeId, secretKey);
} else {
LOG.debug(
"SASL client skipping handshake on trusted connection for addr = {}, "
+ "datanodeId = {}", addr, datanodeId);
return null;
}
}
/**
* Sends client SASL negotiation if required. Determines the correct type of
* SASL handshake based on configuration.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKey for an encrypted SASL handshake
* @param accessToken connection block access token
* @param datanodeId ID of destination DataNode
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKey encryptionKey,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
SecretKey secretKey)
throws IOException {
if (encryptionKey != null) {
LOG.debug("SASL client doing encrypted handshake for addr = {}, "
+ "datanodeId = {}", addr, datanodeId);
return getEncryptedStreams(addr, underlyingOut, underlyingIn,
encryptionKey, accessToken, secretKey);
} else if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug("SASL client skipping handshake in unsecured configuration for "
+ "addr = {}, datanodeId = {}", addr, datanodeId);
return null;
} else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) {
LOG.debug(
"SASL client skipping handshake in secured configuration with "
+ "privileged port for addr = {}, datanodeId = {}",
addr, datanodeId);
return null;
} else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
LOG.debug(
"SASL client skipping handshake in secured configuration with "
+ "unsecured cluster for addr = {}, datanodeId = {}",
addr, datanodeId);
return null;
} else if (saslPropsResolver != null) {
LOG.debug(
"SASL client doing general handshake for addr = {}, datanodeId = {}",
addr, datanodeId);
return getSaslStreams(addr, underlyingOut, underlyingIn,
accessToken, secretKey);
} else {
// It's a secured cluster using non-privileged ports, but no SASL. The
// only way this can happen is if the DataNode has
// ignore.secure.ports.for.testing configured so this is a rare edge case.
LOG.debug("SASL client skipping handshake in secured configuration with "
+ "no SASL protection configured for addr = {}, datanodeId = {}",
addr, datanodeId);
return null;
}
}
/**
* Sends client SASL negotiation for specialized encrypted handshake.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param encryptionKey for an encrypted SASL handshake
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKey encryptionKey,
Token<BlockTokenIdentifier> accessToken,
SecretKey secretKey)
throws IOException {
Map<String, String> saslProps = createSaslPropertiesForEncryption(
encryptionKey.encryptionAlgorithm);
if (secretKey != null) {
LOG.debug("DataNode overwriting downstream QOP" +
saslProps.get(Sasl.QOP));
updateToken(accessToken, secretKey, saslProps);
}
LOG.debug("Client using encryption algorithm {}",
encryptionKey.encryptionAlgorithm);
String userName = getUserNameFromEncryptionKey(encryptionKey);
char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
saslProps, callbackHandler, accessToken);
}
/**
* The SASL username for an encrypted handshake consists of the keyId,
* blockPoolId, and nonce with the first two encoded as Strings, and the third
* encoded using Base64. The fields are each separated by a single space.
*
* @param encryptionKey the encryption key to encode as a SASL username.
* @return encoded username containing keyId, blockPoolId, and nonce
*/
private static String getUserNameFromEncryptionKey(
DataEncryptionKey encryptionKey) {
return encryptionKey.keyId + NAME_DELIMITER +
encryptionKey.blockPoolId + NAME_DELIMITER +
new String(Base64.encodeBase64(encryptionKey.nonce, false),
Charsets.UTF_8);
}
/**
* Sets user name and password when asked by the client-side SASL object.
*/
private static final class SaslClientCallbackHandler
implements CallbackHandler {
private final char[] password;
private final String userName;
/**
* Creates a new SaslClientCallbackHandler.
*
* @param userName SASL user name
* @param password SASL password
*/
public SaslClientCallbackHandler(String userName, char[] password) {
this.password = password;
this.userName = userName;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else if (!(callback instanceof RealmChoiceCallback)) {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
nc.setName(userName);
}
if (pc != null) {
pc.setPassword(password);
}
if (rc != null) {
rc.setText(rc.getDefaultText());
}
}
}
@VisibleForTesting
public String getTargetQOP() {
return targetQOP;
}
/**
* Sends client SASL negotiation for general-purpose handshake.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param accessToken connection block access token
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getSaslStreams(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
Token<BlockTokenIdentifier> accessToken,
SecretKey secretKey)
throws IOException {
Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
// secretKey != null only happens when this is called by DN
// sending to downstream DN. If called from client, this will be null,
// as there is no key for client to generate mac instance.
// So that, if a different QOP is desired for inter-DN communication,
// the check below will use new QOP to create a secret, which includes
// the new QOP.
if (secretKey != null) {
String newQOP = conf
.get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY);
if (newQOP != null) {
saslProps.put(Sasl.QOP, newQOP);
}
LOG.debug("DataNode overwriting downstream QOP " +
saslProps.get(Sasl.QOP));
updateToken(accessToken, secretKey, saslProps);
}
targetQOP = saslProps.get(Sasl.QOP);
String userName = buildUserName(accessToken);
char[] password = buildClientPassword(accessToken);
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
saslProps, callbackHandler, accessToken);
}
private void updateToken(Token<BlockTokenIdentifier> accessToken,
SecretKey secretKey, Map<String, String> saslProps)
throws IOException {
byte[] newSecret = saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8);
BlockTokenIdentifier bkid = accessToken.decodeIdentifier();
bkid.setHandshakeMsg(newSecret);
byte[] bkidBytes = bkid.getBytes();
accessToken.setPassword(
SecretManager.createPassword(bkidBytes, secretKey));
accessToken.setID(bkidBytes);
}
/**
* Builds the client's user name for the general-purpose handshake, consisting
* of the base64-encoded serialized block access token identifier. Note that
* this includes only the token identifier, not the token itself, which would
* include the password. The password is a shared secret, and we must not
* write it on the network during the SASL authentication exchange.
*
* @param blockToken for block access
* @return SASL user name
*/
private static String buildUserName(Token<BlockTokenIdentifier> blockToken) {
return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
Charsets.UTF_8);
}
/**
* Calculates the password on the client side for the general-purpose
* handshake. The password consists of the block access token's password.
*
* @param blockToken for block access
* @return SASL password
*/
private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
return new String(Base64.encodeBase64(blockToken.getPassword(), false),
Charsets.UTF_8).toCharArray();
}
/**
* This method actually executes the client-side SASL handshake.
*
* @param addr connection address
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @param userName SASL user name
* @param saslProps properties of SASL negotiation
* @param callbackHandler for responding to SASL callbacks
* @return new pair of streams, wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair doSaslHandshake(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn, String userName,
Map<String, String> saslProps, CallbackHandler callbackHandler,
Token<BlockTokenIdentifier> accessToken) throws IOException {
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
saslProps, callbackHandler);
out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
out.flush();
try {
// Start of handshake - "initial response" in SASL terminology.
// The handshake secret can be null, this happens when client is running
// a new version but the cluster does not have this feature.
// In which case there will be no encrypted secret sent from NN.
BlockTokenIdentifier blockTokenIdentifier =
accessToken.decodeIdentifier();
if (blockTokenIdentifier != null) {
byte[] handshakeSecret =
accessToken.decodeIdentifier().getHandshakeMsg();
if (handshakeSecret == null || handshakeSecret.length == 0) {
LOG.debug("Handshake secret is null, "
+ "sending without handshake secret.");
sendSaslMessage(out, new byte[0]);
} else {
LOG.debug("Sending handshake secret.");
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(accessToken.getIdentifier())));
String bpid = identifier.getBlockPoolId();
sendSaslMessageHandshakeSecret(out, new byte[0],
handshakeSecret, bpid);
}
} else {
LOG.debug("Block token id is null, sending without handshake secret.");
sendSaslMessage(out, new byte[0]);
}
// step 1
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
List<CipherOption> cipherOptions = null;
String cipherSuites = conf.get(
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
if (requestedQopContainsPrivacy(saslProps)) {
// Negotiate cipher suites if configured. Currently, the only supported
// cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
// values for future expansion.
if (cipherSuites != null && !cipherSuites.isEmpty()) {
if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
throw new IOException(String.format("Invalid cipher suite, %s=%s",
DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
}
CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
cipherOptions = Lists.newArrayListWithCapacity(1);
cipherOptions.add(option);
}
}
sendSaslMessageAndNegotiationCipherOptions(out, localResponse,
cipherOptions);
// step 2 (client-side only)
SaslResponseWithNegotiatedCipherOption response =
readSaslMessageAndNegotiatedCipherOption(in);
localResponse = sasl.evaluateChallengeOrResponse(response.payload);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
CipherOption cipherOption = null;
if (sasl.isNegotiatedQopPrivacy()) {
// Unwrap the negotiated cipher option
cipherOption = unwrap(response.cipherOption, sasl);
if (LOG.isDebugEnabled()) {
if (cipherOption == null) {
// No cipher suite is negotiated
if (cipherSuites != null && !cipherSuites.isEmpty()) {
// the client accepts some cipher suites, but the server does not.
LOG.debug("Client accepts cipher suites {}, "
+ "but server {} does not accept any of them",
cipherSuites, addr.toString());
}
} else {
LOG.debug("Client using cipher suite {} with server {}",
cipherOption.getCipherSuite().getName(), addr.toString());
}
}
}
// If negotiated cipher option is not null, we will use it to create
// stream pair.
return cipherOption != null ? createStreamPair(
conf, cipherOption, underlyingOut, underlyingIn, false) :
sasl.createStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;
}
}
}