blob: f84bdf38ead1fd073b7b717a595114794c2a5d24 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.TreeMap;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslOutputStream;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
/**
* A class which, given connected input/output streams, will perform a
* handshake using those streams based on SASL to produce new Input/Output
* streams which will encrypt/decrypt all data written/read from said streams.
* Much of this is inspired by or borrowed from the TSaslTransport in Apache
* Thrift, but with some HDFS-specific tweaks.
*/
@InterfaceAudience.Private
public class DataTransferEncryptor {
public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
/**
* Sent by clients and validated by servers. We use a number that's unlikely
* to ever be sent as the value of the DATA_TRANSFER_VERSION.
*/
private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
/**
* Delimiter for the three-part SASL username string.
*/
private static final String NAME_DELIMITER = " ";
// This has to be set as part of the SASL spec, but it don't matter for
// our purposes, but may not be empty. It's sent over the wire, so use
// a short string.
private static final String SERVER_NAME = "0";
private static final String PROTOCOL = "hdfs";
private static final String MECHANISM = "DIGEST-MD5";
private static final Map<String, String> SASL_PROPS = new TreeMap<String, String>();
static {
SASL_PROPS.put(Sasl.QOP, "auth-conf");
SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
}
/**
* Factory method for DNs, where the nonce, keyId, and encryption key are not
* yet known. The nonce and keyId will be sent by the client, and the DN
* will then use those pieces of info and the secret key shared with the NN
* to determine the encryptionKey used for the SASL handshake/encryption.
*
* Establishes a secure connection assuming that the party on the other end
* has the same shared secret. This does a SASL connection handshake, but not
* a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
* auth-conf enabled. In particular, it doesn't support an arbitrary number of
* challenge/response rounds, and we know that the client will never have an
* initial response, so we don't check for one.
*
* @param underlyingOut output stream to write to the other party
* @param underlyingIn input stream to read from the other party
* @param blockPoolTokenSecretManager secret manager capable of constructing
* encryption key based on keyId, blockPoolId, and nonce
* @return a pair of streams which wrap the given streams and encrypt/decrypt
* all data read/written
* @throws IOException in the event of error
*/
public static IOStreamPair getEncryptedStreams(
OutputStream underlyingOut, InputStream underlyingIn,
BlockPoolTokenSecretManager blockPoolTokenSecretManager,
String encryptionAlgorithm) throws IOException {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
if (LOG.isDebugEnabled()) {
LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
}
SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
PROTOCOL, SERVER_NAME, saslProps,
new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
int magicNumber = in.readInt();
if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber);
}
try {
// step 1
performSaslStep1(out, in, sasl);
// step 2 (server-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
// SASL handshake is complete
checkSaslComplete(sasl);
return sasl.createEncryptedStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&
ioe.getCause() instanceof InvalidEncryptionKeyException) {
// This could just be because the client is long-lived and hasn't gotten
// a new encryption key from the NN in a while. Upon receiving this
// error, the client will get a new encryption key from the NN and retry
// connecting to this DN.
sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
} else {
sendGenericSaslErrorMessage(out, ioe.getMessage());
}
throw ioe;
}
}
/**
* Factory method for clients, where the encryption token is already created.
*
* Establishes a secure connection assuming that the party on the other end
* has the same shared secret. This does a SASL connection handshake, but not
* a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
* auth-conf enabled. In particular, it doesn't support an arbitrary number of
* challenge/response rounds, and we know that the client will never have an
* initial response, so we don't check for one.
*
* @param underlyingOut output stream to write to the other party
* @param underlyingIn input stream to read from the other party
* @param encryptionKey all info required to establish an encrypted stream
* @return a pair of streams which wrap the given streams and encrypt/decrypt
* all data read/written
* @throws IOException in the event of error
*/
public static IOStreamPair getEncryptedStreams(
OutputStream underlyingOut, InputStream underlyingIn,
DataEncryptionKey encryptionKey)
throws IOException {
Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
saslProps.put("com.sun.security.sasl.digest.cipher",
encryptionKey.encryptionAlgorithm);
if (LOG.isDebugEnabled()) {
LOG.debug("Client using encryption algorithm " +
encryptionKey.encryptionAlgorithm);
}
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
String userName = getUserNameFromEncryptionKey(encryptionKey);
SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
out.flush();
try {
// Start of handshake - "initial response" in SASL terminology.
sendSaslMessage(out, new byte[0]);
// step 1
performSaslStep1(out, in, sasl);
// step 2 (client-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl);
return sasl.createEncryptedStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;
}
}
private static void performSaslStep1(DataOutputStream out, DataInputStream in,
SaslParticipant sasl) throws IOException {
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
}
private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
if (!sasl.isComplete()) {
throw new IOException("Failed to complete SASL handshake");
}
if (!sasl.supportsConfidentiality()) {
throw new IOException("SASL handshake completed, but channel does not " +
"support encryption");
}
}
private static void sendSaslMessage(DataOutputStream out, byte[] payload)
throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
message);
}
private static void sendGenericSaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
}
private static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(status);
if (payload != null) {
builder.setPayload(ByteString.copyFrom(payload));
}
if (message != null) {
builder.setMessage(message);
}
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
private static byte[] readSaslMessage(DataInputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return proto.getPayload().toByteArray();
}
}
/**
* Set the encryption key when asked by the server-side SASL object.
*/
private static class SaslServerCallbackHandler implements CallbackHandler {
private BlockPoolTokenSecretManager blockPoolTokenSecretManager;
public SaslServerCallbackHandler(BlockPoolTokenSecretManager
blockPoolTokenSecretManager) {
this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof RealmCallback) {
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL DIGEST-MD5 Callback: " + callback);
}
}
if (pc != null) {
byte[] encryptionKey = getEncryptionKeyFromUserName(
blockPoolTokenSecretManager, nc.getDefaultName());
pc.setPassword(encryptionKeyToPassword(encryptionKey));
}
if (ac != null) {
ac.setAuthorized(true);
ac.setAuthorizedID(ac.getAuthorizationID());
}
}
}
/**
* Set the encryption key when asked by the client-side SASL object.
*/
private static class SaslClientCallbackHandler implements CallbackHandler {
private byte[] encryptionKey;
private String userName;
public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
this.encryptionKey = encryptionKey;
this.userName = userName;
}
@Override
public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof RealmChoiceCallback) {
continue;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
nc.setName(userName);
}
if (pc != null) {
pc.setPassword(encryptionKeyToPassword(encryptionKey));
}
if (rc != null) {
rc.setText(rc.getDefaultText());
}
}
}
/**
* The SASL username consists of the keyId, blockPoolId, and nonce with the
* first two encoded as Strings, and the third encoded using Base64. The
* fields are each separated by a single space.
*
* @param encryptionKey the encryption key to encode as a SASL username.
* @return encoded username containing keyId, blockPoolId, and nonce
*/
private static String getUserNameFromEncryptionKey(
DataEncryptionKey encryptionKey) {
return encryptionKey.keyId + NAME_DELIMITER +
encryptionKey.blockPoolId + NAME_DELIMITER +
new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
}
/**
* Given a secret manager and a username encoded as described above, determine
* the encryption key.
*
* @param blockPoolTokenSecretManager to determine the encryption key.
* @param userName containing the keyId, blockPoolId, and nonce.
* @return secret encryption key.
* @throws IOException
*/
private static byte[] getEncryptionKeyFromUserName(
BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
throws IOException {
String[] nameComponents = userName.split(NAME_DELIMITER);
if (nameComponents.length != 3) {
throw new IOException("Provided name '" + userName + "' has " +
nameComponents.length + " components instead of the expected 3.");
}
int keyId = Integer.parseInt(nameComponents[0]);
String blockPoolId = nameComponents[1];
byte[] nonce = Base64.decodeBase64(nameComponents[2]);
return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
blockPoolId, nonce);
}
private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
}
/**
* Strongly inspired by Thrift's TSaslTransport class.
*
* Used to abstract over the <code>SaslServer</code> and
* <code>SaslClient</code> classes, which share a lot of their interface, but
* unfortunately don't share a common superclass.
*/
private static class SaslParticipant {
// One of these will always be null.
public SaslServer saslServer;
public SaslClient saslClient;
public SaslParticipant(SaslServer saslServer) {
this.saslServer = saslServer;
}
public SaslParticipant(SaslClient saslClient) {
this.saslClient = saslClient;
}
public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
if (saslClient != null) {
return saslClient.evaluateChallenge(challengeOrResponse);
} else {
return saslServer.evaluateResponse(challengeOrResponse);
}
}
public boolean isComplete() {
if (saslClient != null)
return saslClient.isComplete();
else
return saslServer.isComplete();
}
public boolean supportsConfidentiality() {
String qop = null;
if (saslClient != null) {
qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
} else {
qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
}
return qop != null && qop.equals("auth-conf");
}
// Return some input/output streams that will henceforth have their
// communication encrypted.
private IOStreamPair createEncryptedStreamPair(
DataOutputStream out, DataInputStream in) {
if (saslClient != null) {
return new IOStreamPair(
new SaslInputStream(in, saslClient),
new SaslOutputStream(out, saslClient));
} else {
return new IOStreamPair(
new SaslInputStream(in, saslServer),
new SaslOutputStream(out, saslServer));
}
}
}
@InterfaceAudience.Private
public static class InvalidMagicNumberException extends IOException {
private static final long serialVersionUID = 1L;
public InvalidMagicNumberException(int magicNumber) {
super(String.format("Received %x instead of %x from client.",
magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
}
}
}