blob: 17a74429f0a7f6598eb0a90d563360c4978c4254 [file] [log] [blame]
/*
* Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* - Redistributions of source code must retain the aabove copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the StumbleUpon nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.apache.kudu.client;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.security.cert.Certificate;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import org.apache.yetus.audience.InterfaceAudience;
import org.ietf.jgss.GSSException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
import org.apache.kudu.security.Token.SignedTokenPB;
import org.apache.kudu.util.SecurityUtil;
/**
* Netty Pipeline handler which runs connection negotiation with
* the server. When negotiation is complete, this removes itself
* from the pipeline and fires a Negotiator.Success or Negotiator.Failure upstream.
*/
@InterfaceAudience.Private
public class Negotiator extends SimpleChannelUpstreamHandler {
private static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
private final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
private static final ImmutableSet<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
ImmutableSet.of(
RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS,
RpcHeader.RpcFeatureFlag.TLS);
/**
* Set of SASL mechanisms supported by the client, in descending priority order.
* The client will pick the first of these mechanisms that is supported by
* the server and also succeeds to initialize.
*/
private enum SaslMechanism {
GSSAPI,
PLAIN,
}
static final int CONNECTION_CTX_CALL_ID = -3;
static final int SASL_CALL_ID = -33;
/**
* The cipher suites, in order of our preference.
* This list is based on the kDefaultTlsCiphers list in security_flags.cc,
* See that file for details on how it was derived.
*/
static final String[] PREFERRED_CIPHER_SUITES = new String[] {
"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", // Java 8
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", // Java 8
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", // Java 8
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", // Java 8
"TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384", // Java 7 (TLS 1.2+ only)
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", // Java 7 (TLS 1.2+ only)
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256", // Java 7 (TLS 1.2+ only)
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", // Java 7 (TLS 1.2+ only)
"TLS_RSA_WITH_AES_256_GCM_SHA384", // Java 8
"TLS_RSA_WITH_AES_128_GCM_SHA256", // Java 8
"TLS_RSA_WITH_AES_256_CBC_SHA256", // Java 7 (TLS 1.2+ only)
"TLS_RSA_WITH_AES_128_CBC_SHA256", // Java 7 (TLS 1.2+ only)
// The following two are critical to allow the client to connect to
// servers running versions of OpenSSL that don't support TLS 1.2.
"TLS_RSA_WITH_AES_256_CBC_SHA", // All Java versions
"TLS_RSA_WITH_AES_128_CBC_SHA" // All Java versions
};
private enum State {
INITIAL,
AWAIT_NEGOTIATE,
AWAIT_TLS_HANDSHAKE,
AWAIT_TOKEN_EXCHANGE,
AWAIT_SASL,
FINISHED
}
/** The remote hostname we're connecting to, used by TLS and GSSAPI */
private final String remoteHostname;
/** The security context holding the client credentials */
private final SecurityContext securityContext;
/**
* The authentication token we'll try to connect with, maybe null.
* This is fetched from {@link #securityContext} in the constructor to
* ensure that it doesn't change over the course of a negotiation attempt.
*/
private final SignedTokenPB authnToken;
private static enum AuthnTokenNotUsedReason {
NONE_AVAILABLE("no token is available"),
NO_TRUSTED_CERTS("no TLS certificates are trusted by the client"),
FORBIDDEN_BY_POLICY("this connection will be used to acquire a new token and " +
"therefore requires primary credentials"),
NOT_CHOSEN_BY_SERVER("the server chose not to accept token authentication");
AuthnTokenNotUsedReason(String msg) {
this.msg = msg;
}
final String msg;
};
private AuthnTokenNotUsedReason authnTokenNotUsedReason = null;
private State state = State.INITIAL;
private SaslClient saslClient;
/** The negotiated mechanism, set after NEGOTIATE stage. */
private SaslMechanism chosenMech;
/** The negotiated authentication type, set after NEGOTIATE state. */
private AuthenticationTypePB.TypeCase chosenAuthnType;
/** The features supported by the server, set after NEGOTIATE stage. */
private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
/**
* The negotiation protocol relies on tunneling the TLS handshake through
* protobufs. The embedder holds a Netty SslHandler which can perform the
* handshake. Once the handshake is complete, we will stop using the embedder
* and add the handler directly to the real ChannelPipeline.
* Only non-null once TLS is initiated.
*/
private DecoderEmbedder<ChannelBuffer> sslEmbedder;
/**
* The nonce sent from the server to the client, or null if negotiation has
* not yet taken place, or the server does not send a nonce.
*/
private byte[] nonce;
/**
* Future indicating whether the embedded handshake has completed.
* Only non-null once TLS is initiated.
*/
private ChannelFuture sslHandshakeFuture;
private Certificate peerCert;
@InterfaceAudience.LimitedPrivate("Test")
boolean overrideLoopbackForTests;
public Negotiator(String remoteHostname,
SecurityContext securityContext,
boolean ignoreAuthnToken) {
this.remoteHostname = remoteHostname;
this.securityContext = securityContext;
SignedTokenPB token = securityContext.getAuthenticationToken();
if (token != null) {
if (ignoreAuthnToken) {
this.authnToken = null;
this.authnTokenNotUsedReason = AuthnTokenNotUsedReason.FORBIDDEN_BY_POLICY;
} else if (!securityContext.hasTrustedCerts()) {
this.authnToken = null;
this.authnTokenNotUsedReason = AuthnTokenNotUsedReason.NO_TRUSTED_CERTS;
} else {
this.authnToken = token;
}
} else {
this.authnToken = null;
this.authnTokenNotUsedReason = AuthnTokenNotUsedReason.NONE_AVAILABLE;
}
}
public void sendHello(Channel channel) {
sendNegotiateMessage(channel);
}
private void sendNegotiateMessage(Channel channel) {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
// Advertise our supported features
// ----------------------------------
for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
builder.addSupportedFeatures(flag);
}
if (isLoopbackConnection(channel)) {
builder.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY);
}
// Advertise our authentication types.
// ----------------------------------
// We always advertise SASL.
builder.addAuthnTypesBuilder().setSasl(
AuthenticationTypePB.Sasl.getDefaultInstance());
// We may also have a token. But, we can only use the token
// if we are able to use authenticated TLS to authenticate the server.
if (authnToken != null) {
builder.addAuthnTypesBuilder().setToken(
AuthenticationTypePB.Token.getDefaultInstance());
}
// We currently don't support client-certificate authentication from the
// Java client.
state = State.AWAIT_NEGOTIATE;
sendSaslMessage(channel, builder.build());
}
private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) {
Preconditions.checkNotNull(channel);
RpcHeader.RequestHeader.Builder builder = RpcHeader.RequestHeader.newBuilder();
builder.setCallId(SASL_CALL_ID);
Channels.write(channel, new RpcOutboundMessage(builder, msg));
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws IOException {
Object m = evt.getMessage();
if (!(m instanceof CallResponse)) {
ctx.sendUpstream(evt);
return;
}
handleResponse(ctx.getChannel(), (CallResponse)m);
}
private void handleResponse(Channel chan, CallResponse callResponse) throws IOException {
final RpcHeader.ResponseHeader header = callResponse.getHeader();
if (header.getIsError()) {
final RpcHeader.ErrorStatusPB.Builder errBuilder = RpcHeader.ErrorStatusPB.newBuilder();
KuduRpc.readProtobuf(callResponse.getPBMessage(), errBuilder);
final RpcHeader.ErrorStatusPB error = errBuilder.build();
LOG.debug("peer {} sent connection negotiation error: {}",
chan.getRemoteAddress(), error.getMessage());
// The upstream code should handle the negotiation failure.
state = State.FINISHED;
chan.getPipeline().remove(this);
Channels.fireMessageReceived(chan, new Failure(error));
return;
}
RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
// TODO: check that the message type matches the expected one in all
// of the below implementations.
switch (state) {
case AWAIT_NEGOTIATE:
handleNegotiateResponse(chan, response);
break;
case AWAIT_SASL:
handleSaslMessage(chan, response);
break;
case AWAIT_TOKEN_EXCHANGE:
handleTokenExchangeResponse(chan, response);
break;
case AWAIT_TLS_HANDSHAKE:
handleTlsMessage(chan, response);
break;
default:
throw new IllegalStateException("received a message in unexpected state: " +
state.toString());
}
}
private void handleSaslMessage(Channel chan, NegotiatePB response) throws IOException {
switch (response.getStep()) {
case SASL_CHALLENGE:
handleChallengeResponse(chan, response);
break;
case SASL_SUCCESS:
handleSuccessResponse(chan, response);
break;
default:
throw new IllegalStateException("Wrong negotiation step: " +
response.getStep());
}
}
private RpcHeader.NegotiatePB parseSaslMsgResponse(CallResponse response) {
RpcHeader.ResponseHeader responseHeader = response.getHeader();
int id = responseHeader.getCallId();
if (id != SASL_CALL_ID) {
throw new IllegalStateException("Received a call that wasn't for SASL");
}
RpcHeader.NegotiatePB.Builder saslBuilder = RpcHeader.NegotiatePB.newBuilder();
KuduRpc.readProtobuf(response.getPBMessage(), saslBuilder);
return saslBuilder.build();
}
private void handleNegotiateResponse(Channel chan,
RpcHeader.NegotiatePB response) throws IOException {
Preconditions.checkState(response.getStep() == NegotiateStep.NEGOTIATE,
"Expected NEGOTIATE message, got {}", response.getStep());
// Store the supported features advertised by the server.
serverFeatures = getFeatureFlags(response);
// If the server supports TLS, we will always speak TLS to it.
final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
// Check the negotiated authentication type sent by the server.
chosenAuthnType = chooseAuthenticationType(response);
if (chosenAuthnType == AuthenticationTypePB.TypeCase.SASL) {
chooseAndInitializeSaslMech(response);
}
// If we negotiated TLS, then we want to start the TLS handshake; otherwise,
// we can move directly to the authentication phase.
if (negotiatedTls) {
startTlsHandshake(chan);
} else {
startAuthentication(chan);
}
}
/**
* Determine whether the given channel is a loopback connection (i.e. the server
* and client are on the same host).
*/
private boolean isLoopbackConnection(Channel channel) {
if (overrideLoopbackForTests) {
return true;
}
try {
InetAddress local = ((InetSocketAddress)channel.getLocalAddress()).getAddress();
InetAddress remote = ((InetSocketAddress)channel.getRemoteAddress()).getAddress();
return local.equals(remote);
} catch (ClassCastException cce) {
// In the off chance that we have some other type of local/remote address,
// we'll just assume it's not loopback.
return false;
}
}
private void chooseAndInitializeSaslMech(NegotiatePB response) throws KuduException {
securityContext.refreshSubject();
// Gather the set of server-supported mechanisms.
Map<String, String> errorsByMech = Maps.newHashMap();
Set<SaslMechanism> serverMechs = Sets.newHashSet();
for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
switch (mech.getMechanism().toUpperCase()) {
case "GSSAPI":
serverMechs.add(SaslMechanism.GSSAPI);
break;
case "PLAIN":
serverMechs.add(SaslMechanism.PLAIN);
break;
default:
errorsByMech.put(mech.getMechanism(), "unrecognized mechanism");
break;
}
}
// For each of our own mechanisms, in descending priority, check if
// the server also supports them. If so, try to initialize saslClient.
// If we find a common mechanism that also can be successfully initialized,
// choose that mech.
for (SaslMechanism clientMech : SaslMechanism.values()) {
if (clientMech.equals(SaslMechanism.GSSAPI)) {
Subject s = securityContext.getSubject();
if (s == null ||
s.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
errorsByMech.put(clientMech.name(), "client does not have Kerberos credentials (tgt)");
continue;
}
if (SecurityUtil.isTgtExpired(s)) {
errorsByMech.put(clientMech.name(), "client Kerberos credentials (TGT) have expired");
continue;
}
}
if (!serverMechs.contains(clientMech)) {
errorsByMech.put(clientMech.name(), "not advertised by server");
continue;
}
Map<String, String> props = Maps.newHashMap();
// If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
// integrity protection so that the channel bindings and nonce can be
// verified.
if (clientMech == SaslMechanism.GSSAPI) {
props.put(Sasl.QOP, "auth-int");
}
try {
saslClient = Sasl.createSaslClient(new String[]{ clientMech.name() },
null,
"kudu",
remoteHostname,
props,
SASL_CALLBACK);
chosenMech = clientMech;
break;
} catch (SaslException e) {
errorsByMech.put(clientMech.name(), e.getMessage());
}
}
if (chosenMech != null) {
LOG.debug("SASL mechanism {} chosen for peer {}", chosenMech.name(), remoteHostname);
return;
}
// TODO(KUDU-1948): when the Java client has an option to require security, detect the case
// where the server is configured without Kerberos and the client requires it.
String message;
if (serverMechs.size() == 1 && serverMechs.contains(SaslMechanism.GSSAPI)) {
// Give a better error diagnostic for common case of an unauthenticated client connecting
// to a secure server.
message = "server requires authentication, but " +
errorsByMech.get(SaslMechanism.GSSAPI.name());
} else {
message = "client/server supported SASL mechanism mismatch: [" +
Joiner.on(", ").withKeyValueSeparator(": ").join(errorsByMech) + "]";
}
if (authnTokenNotUsedReason != null) {
message += ". Authentication tokens were not used because " +
authnTokenNotUsedReason.msg;
}
// If client has valid secondary authn credentials (such as authn token),
// but it does not have primary authn credentials (such as Kerberos creds),
// throw a recoverable exception. So that the request can be retried as long
// as the original call hasn't timed out, for cases documented in KUDU-2267,
// e.g. masters are in the process of the very first leader election after
// started up and does not have CA signed cert.
if (authnToken != null) {
throw new RecoverableException(Status.NotAuthorized(message));
} else {
throw new NonRecoverableException(Status.NotAuthorized(message));
}
}
private AuthenticationTypePB.TypeCase chooseAuthenticationType(NegotiatePB response) {
Preconditions.checkArgument(response.getAuthnTypesCount() <= 1,
"Expected server to reply with at most one authn type");
if (response.getAuthnTypesCount() == 0) {
// Default to SASL for compatibility with old servers.
return AuthenticationTypePB.TypeCase.SASL;
}
AuthenticationTypePB.TypeCase type = response.getAuthnTypes(0).getTypeCase();
switch (type) {
case SASL:
if (authnToken != null) {
authnTokenNotUsedReason = AuthnTokenNotUsedReason.NOT_CHOSEN_BY_SERVER;
}
break;
case TOKEN:
if (authnToken == null) {
// TODO(todd): should we also check whether we have a CA cert?
// it seems like this should have the same logic as whether we advertised it
throw new IllegalArgumentException("server chose token authentication " +
"but client had no valid token");
}
break;
default:
throw new IllegalArgumentException("server chose bad authn type " + chosenAuthnType);
}
return type;
}
private Set<RpcFeatureFlag> getFeatureFlags(NegotiatePB response) {
ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
if (feature != RpcFeatureFlag.UNKNOWN) {
features.add(feature);
}
}
return features.build();
}
/**
* Send the initial TLS "ClientHello" message.
*/
private void startTlsHandshake(Channel chan) throws SSLException {
SSLEngine engine;
switch (chosenAuthnType) {
case SASL:
engine = securityContext.createSSLEngineTrustAll();
break;
case TOKEN:
engine = securityContext.createSSLEngine();
break;
default:
throw new AssertionError("unreachable");
}
engine.setUseClientMode(true);
Set<String> supported = Sets.newHashSet(engine.getSupportedCipherSuites());
List<String> toEnable = Lists.newArrayList();
for (String cipher : PREFERRED_CIPHER_SUITES) {
if (supported.contains(cipher)) {
toEnable.add(cipher);
}
}
if (toEnable.isEmpty()) {
// This should never be the case given the cipher suites we picked are
// supported by the standard JDK, but just in case, better to have a clear
// exception.
throw new RuntimeException("No preferred cipher suites were supported. " +
"Supported suites: " + Joiner.on(',').join(supported));
}
engine.setEnabledCipherSuites(toEnable.toArray(new String[0]));
SslHandler handler = new SslHandler(engine);
handler.setEnableRenegotiation(false);
sslEmbedder = new DecoderEmbedder<>(handler);
sslHandshakeFuture = handler.handshake();
state = State.AWAIT_TLS_HANDSHAKE;
boolean sent = sendPendingOutboundTls(chan);
assert sent;
}
/**
* Handle an inbound message during the TLS handshake. If this message
* causes the handshake to complete, triggers the beginning of SASL initiation.
*/
private void handleTlsMessage(Channel chan, NegotiatePB response) throws IOException {
Preconditions.checkState(response.getStep() == NegotiateStep.TLS_HANDSHAKE);
Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
"empty TLS message from server");
// Pass the TLS message into our embedded SslHandler.
sslEmbedder.offer(ChannelBuffers.copiedBuffer(
response.getTlsHandshake().asReadOnlyByteBuffer()));
if (sendPendingOutboundTls(chan)) {
// Data was sent -- we must continue the handshake process.
return;
}
// The handshake completed.
// Insert the SSL handler into the pipeline so that all following traffic
// gets encrypted, and then move on to the SASL portion of negotiation.
//
// NOTE: this takes effect immediately (i.e. the following SASL initiation
// sequence is encrypted).
SslHandler handler = (SslHandler)sslEmbedder.getPipeline().getFirst();
Certificate[] certs = handler.getEngine().getSession().getPeerCertificates();
if (certs.length == 0) {
throw new SSLPeerUnverifiedException("no peer cert found");
}
// The first element of the array is the peer's own certificate.
peerCert = certs[0];
// Don't wrap the TLS socket if we are using TLS for authentication only.
boolean isAuthOnly = serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
isLoopbackConnection(chan);
if (!isAuthOnly) {
chan.getPipeline().addFirst("tls", handler);
}
startAuthentication(chan);
}
/**
* If the embedded SslHandler has data to send outbound, gather
* it all, send it tunneled in a NegotiatePB message, and return true.
*
* Otherwise, indicates that the handshake is complete by returning false.
*/
private boolean sendPendingOutboundTls(Channel chan) {
// The SslHandler can generate multiple TLS messages in response
// (e.g. ClientKeyExchange, ChangeCipherSpec, ClientFinished).
// We poll the handler until it stops giving us buffers.
List<ByteString> bufs = Lists.newArrayList();
while (sslEmbedder.peek() != null) {
bufs.add(ByteString.copyFrom(sslEmbedder.poll().toByteBuffer()));
}
ByteString data = ByteString.copyFrom(bufs);
if (sslHandshakeFuture.isDone()) {
// TODO(todd): should check sslHandshakeFuture.isSuccess()
// TODO(danburkert): is this a correct assumption? would the
// client ever be "done" but also produce handshake data?
// if it did, would we want to encrypt the SSL message or no?
assert data.isEmpty();
return false;
} else {
assert data.size() > 0;
sendTunneledTls(chan, data);
return true;
}
}
/**
* Send a buffer of data for the TLS handshake, encapsulated in the
* appropriate TLS_HANDSHAKE negotiation message.
*/
private void sendTunneledTls(Channel chan, ByteString buf) {
sendSaslMessage(chan, NegotiatePB.newBuilder()
.setStep(NegotiateStep.TLS_HANDSHAKE)
.setTlsHandshake(buf)
.build());
}
private void startAuthentication(Channel chan) throws SaslException, NonRecoverableException {
switch (chosenAuthnType) {
case SASL:
sendSaslInitiate(chan);
break;
case TOKEN:
sendTokenExchange(chan);
break;
default:
throw new AssertionError("unreachable");
}
}
private void sendTokenExchange(Channel chan) {
// We must not send a token unless we have successfully finished
// authenticating via TLS.
Preconditions.checkNotNull(authnToken);
Preconditions.checkNotNull(sslHandshakeFuture);
Preconditions.checkState(sslHandshakeFuture.isSuccess());
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
.setStep(NegotiateStep.TOKEN_EXCHANGE)
.setAuthnToken(authnToken);
state = State.AWAIT_TOKEN_EXCHANGE;
sendSaslMessage(chan, builder.build());
}
private void handleTokenExchangeResponse(Channel chan, NegotiatePB response)
throws SaslException {
Preconditions.checkArgument(response.getStep() == NegotiateStep.TOKEN_EXCHANGE,
"expected TOKEN_EXCHANGE, got step: {}", response.getStep());
// The token response doesn't have any actual data in it, so we can just move on.
finish(chan);
}
private void sendSaslInitiate(Channel chan) throws SaslException, NonRecoverableException {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
if (saslClient.hasInitialResponse()) {
byte[] initialResponse = evaluateChallenge(new byte[0]);
builder.setToken(UnsafeByteOperations.unsafeWrap(initialResponse));
}
builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
builder.addSaslMechanismsBuilder().setMechanism(chosenMech.name());
state = State.AWAIT_SASL;
sendSaslMessage(chan, builder.build());
}
private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response)
throws SaslException, NonRecoverableException {
byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
if (saslToken == null) {
throw new IllegalStateException("Not expecting an empty token");
}
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
builder.setToken(UnsafeByteOperations.unsafeWrap(saslToken));
builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_RESPONSE);
sendSaslMessage(chan, builder.build());
}
/**
* Verify the channel bindings included in 'response'. This is used only
* for GSSAPI-authenticated connections over TLS.
* @throws SSLPeerUnverifiedException on failure to verify
*/
private void verifyChannelBindings(NegotiatePB response) throws IOException {
byte[] expected = SecurityUtil.getEndpointChannelBindings(peerCert);
if (!response.hasChannelBindings()) {
throw new SSLPeerUnverifiedException("no channel bindings provided by remote peer");
}
byte[] provided = response.getChannelBindings().toByteArray();
// NOTE: the C SASL library's implementation of sasl_encode() actually
// includes a length prefix. Java's equivalents do not. So, we have to
// chop off the length prefix here before unwrapping.
if (provided.length < 4) {
throw new SSLPeerUnverifiedException("invalid too-short channel bindings");
}
byte[] unwrapped = saslClient.unwrap(provided, 4, provided.length - 4);
if (!Bytes.equals(expected, unwrapped)) {
throw new SSLPeerUnverifiedException("invalid channel bindings provided by remote peer");
}
}
private void handleSuccessResponse(Channel chan, NegotiatePB response) throws IOException {
Preconditions.checkState(saslClient.isComplete(),
"server sent SASL_SUCCESS step, but SASL negotiation is not complete");
if (chosenMech == SaslMechanism.GSSAPI) {
if (response.hasNonce()) {
// Grab the nonce from the server, if it has sent one. We'll send it back
// later with SASL integrity protection as part of the connection context.
nonce = response.getNonce().toByteArray();
}
if (peerCert != null) {
// Check the channel bindings provided by the server against the expected channel bindings.
verifyChannelBindings(response);
}
}
finish(chan);
}
/**
* Marks the negotiation as finished, and sends the connection context to the server.
* @param chan the connection channel
*/
private void finish(Channel chan) throws SaslException {
state = State.FINISHED;
chan.getPipeline().remove(this);
Channels.write(chan, makeConnectionContext());
LOG.debug("Authenticated connection {} using {}/{}",
chan, chosenAuthnType, chosenMech);
Channels.fireMessageReceived(chan, new Success(serverFeatures));
}
private RpcOutboundMessage makeConnectionContext() throws SaslException {
RpcHeader.ConnectionContextPB.Builder builder = RpcHeader.ConnectionContextPB.newBuilder();
// The UserInformationPB is deprecated, but used by servers prior to Kudu 1.1.
RpcHeader.UserInformationPB.Builder userBuilder = RpcHeader.UserInformationPB.newBuilder();
String user = securityContext.getRealUser();
userBuilder.setEffectiveUser(user);
userBuilder.setRealUser(user);
builder.setDEPRECATEDUserInfo(userBuilder.build());
if (nonce != null) {
// Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI.
// The Java SASL client does not automatically add the length header,
// so we have to do it ourselves.
byte[] encodedNonce = saslClient.wrap(nonce, 0, nonce.length);
ByteBuffer buf = ByteBuffer.allocate(encodedNonce.length + 4);
buf.order(ByteOrder.BIG_ENDIAN);
buf.putInt(encodedNonce.length);
buf.put(encodedNonce);
builder.setEncodedNonce(UnsafeByteOperations.unsafeWrap(buf.array()));
}
RpcHeader.ConnectionContextPB pb = builder.build();
RpcHeader.RequestHeader.Builder header =
RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID);
return new RpcOutboundMessage(header, pb);
}
private byte[] evaluateChallenge(final byte[] challenge)
throws SaslException, NonRecoverableException {
try {
return Subject.doAs(securityContext.getSubject(),
new PrivilegedExceptionAction<byte[]>() {
@Override
public byte[] run() throws SaslException {
return saslClient.evaluateChallenge(challenge);
}
});
} catch (PrivilegedActionException e) {
// This cast is safe because the action above only throws checked SaslException.
SaslException saslException = (SaslException) e.getCause();
// TODO(KUDU-2121): We should never get to this point if the client does not have
// Kerberos credentials, but it seems that on certain platforms it can happen.
// So, we try and determine whether the evaluateChallenge failed due to missing
// credentials, and return a nicer error message if so.
Throwable cause = saslException.getCause();
if (cause instanceof GSSException &&
((GSSException) cause).getMajor() == GSSException.NO_CRED) {
throw new NonRecoverableException(
Status.ConfigurationError(
"Server requires Kerberos, but this client is not authenticated " +
"(missing or expired TGT)"),
saslException);
}
throw saslException;
}
}
private class SaslClientCallbackHandler implements CallbackHandler {
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
((NameCallback) callback).setName(securityContext.getRealUser());
} else if (callback instanceof PasswordCallback) {
((PasswordCallback) callback).setPassword(new char[0]);
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
}
}
/**
* The results of a successful negotiation. This is sent to upstream handlers in the
* Netty pipeline after negotiation completes.
*/
static class Success {
final Set<RpcFeatureFlag> serverFeatures;
public Success(Set<RpcFeatureFlag> serverFeatures) {
this.serverFeatures = serverFeatures;
}
}
/**
* The results of a failed negotiation. This is sent to upstream handlers in the Netty pipeline
* when a negotiation fails.
*/
static class Failure {
/** The RPC error received from the server. */
final RpcHeader.ErrorStatusPB status;
public Failure(RpcHeader.ErrorStatusPB status) {
this.status = status;
}
}
}