blob: a9c19a58f887b172b832ea75525dc03803cdd217 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.authenticator;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.login.Configuration;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequestResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.ResponseSend;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
public class SaslServerAuthenticator implements Authenticator {
private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
public enum SaslState {
HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
}
private final String node;
private final Subject subject;
private final KerberosShortNamer kerberosNamer;
private final int maxReceiveSize;
private final String host;
// Current SASL state
private SaslState saslState = SaslState.HANDSHAKE_REQUEST;
// Next SASL state to be set when outgoing writes associated with the current SASL state complete
private SaslState pendingSaslState = null;
private SaslServer saslServer;
private String saslMechanism;
private AuthCallbackHandler callbackHandler;
// assigned in `configure`
private TransportLayer transportLayer;
private Set<String> enabledMechanisms;
private Map<String, ?> configs;
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
private NetworkSend netOutBuffer;
public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException {
if (subject == null)
throw new IllegalArgumentException("subject cannot be null");
this.node = node;
this.subject = subject;
this.kerberosNamer = kerberosNameParser;
this.maxReceiveSize = maxReceiveSize;
this.host = host;
}
public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
this.transportLayer = transportLayer;
this.configs = configs;
List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
if (enabledMechanisms == null || enabledMechanisms.isEmpty())
throw new IllegalArgumentException("No SASL mechanisms are enabled");
this.enabledMechanisms = new HashSet<>(enabledMechanisms);
}
private void createSaslServer(String mechanism) throws IOException {
this.saslMechanism = mechanism;
callbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration(), kerberosNamer);
callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
if (subject.getPrincipals().isEmpty())
throw new IllegalArgumentException("subject must have at least one principal");
saslServer = createSaslKerberosServer(callbackHandler, configs);
} else {
try {
saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
public SaslServer run() throws SaslException {
return Sasl.createSaslServer(saslMechanism, "kafka", host, configs, callbackHandler);
}
});
} catch (PrivilegedActionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
}
}
private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs) throws IOException {
// server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject.
final Principal servicePrincipal = subject.getPrincipals().iterator().next();
KerberosName kerberosName;
try {
kerberosName = KerberosName.parse(servicePrincipal.getName());
} catch (IllegalArgumentException e) {
throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
}
final String servicePrincipalName = kerberosName.serviceName();
final String serviceHostname = kerberosName.hostName();
LOG.debug("Creating SaslServer for {} with mechanism {}", kerberosName, saslMechanism);
// As described in http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html:
// "To enable Java GSS to delegate to the native GSS library and its list of native mechanisms,
// set the system property "sun.security.jgss.native" to true"
// "In addition, when performing operations as a particular Subject, for example, Subject.doAs(...)
// or Subject.doAsPrivileged(...), the to-be-used GSSCredential should be added to Subject's
// private credential set. Otherwise, the GSS operations will fail since no credential is found."
boolean usingNativeJgss = Boolean.getBoolean("sun.security.jgss.native");
if (usingNativeJgss) {
try {
GSSManager manager = GSSManager.getInstance();
// This Oid is used to represent the Kerberos version 5 GSS-API mechanism. It is defined in
// RFC 1964.
Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
GSSName gssName = manager.createName(servicePrincipalName + "@" + serviceHostname, GSSName.NT_HOSTBASED_SERVICE);
GSSCredential cred = manager.createCredential(gssName, GSSContext.INDEFINITE_LIFETIME, krb5Mechanism, GSSCredential.ACCEPT_ONLY);
subject.getPrivateCredentials().add(cred);
} catch (GSSException ex) {
LOG.warn("Cannot add private credential to subject; clients authentication may fail", ex);
}
}
try {
return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
public SaslServer run() throws SaslException {
return Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler);
}
});
} catch (PrivilegedActionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
}
/**
* Evaluates client responses via `SaslServer.evaluateResponse` and returns the issued challenge to the client until
* authentication succeeds or fails.
*
* The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
* followed by N bytes representing the opaque payload.
*/
public void authenticate() throws IOException {
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;
if (saslServer != null && saslServer.isComplete()) {
setSaslState(SaslState.COMPLETE);
return;
}
if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node);
netInBuffer.readFrom(transportLayer);
if (netInBuffer.complete()) {
netInBuffer.payload().rewind();
byte[] clientToken = new byte[netInBuffer.payload().remaining()];
netInBuffer.payload().get(clientToken, 0, clientToken.length);
netInBuffer = null; // reset the networkReceive as we read all the data.
try {
switch (saslState) {
case HANDSHAKE_REQUEST:
if (handleKafkaRequest(clientToken))
break;
// For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
// This is required for interoperability with 0.9.0.x clients which do not send handshake request
case AUTHENTICATE:
byte[] response = saslServer.evaluateResponse(clientToken);
if (response != null) {
netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response));
flushNetOutBufferAndUpdateInterestOps();
}
// When the authentication exchange is complete and no more tokens are expected from the client,
// update SASL state. Current SASL state will be updated when outgoing writes to the client complete.
if (saslServer.isComplete())
setSaslState(SaslState.COMPLETE);
break;
default:
break;
}
} catch (Exception e) {
setSaslState(SaslState.FAILED);
throw new IOException(e);
}
}
}
public Principal principal() {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
}
public boolean complete() {
return saslState == SaslState.COMPLETE;
}
public void close() throws IOException {
if (saslServer != null)
saslServer.dispose();
if (callbackHandler != null)
callbackHandler.close();
}
private void setSaslState(SaslState saslState) {
if (netOutBuffer != null && !netOutBuffer.completed())
pendingSaslState = saslState;
else {
this.pendingSaslState = null;
this.saslState = saslState;
LOG.debug("Set SASL server state to {}", saslState);
}
}
private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
boolean flushedCompletely = flushNetOutBuffer();
if (flushedCompletely) {
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
if (pendingSaslState != null)
setSaslState(pendingSaslState);
} else
transportLayer.addInterestOps(SelectionKey.OP_WRITE);
return flushedCompletely;
}
private boolean flushNetOutBuffer() throws IOException {
if (!netOutBuffer.completed())
netOutBuffer.writeTo(transportLayer);
return netOutBuffer.completed();
}
private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
boolean isKafkaRequest = false;
String clientMechanism = null;
try {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
isKafkaRequest = true;
ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
LOG.debug("Handle Kafka request {}", apiKey);
switch (apiKey) {
case API_VERSIONS:
handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
break;
case SASL_HANDSHAKE:
clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
break;
default:
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
}
} catch (SchemaException | IllegalArgumentException e) {
// SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
// if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
// starting with 0x60, revert to GSSAPI for both these exceptions.
if (LOG.isDebugEnabled()) {
StringBuilder tokenBuilder = new StringBuilder();
for (byte b : requestBytes) {
tokenBuilder.append(String.format("%02x", b));
if (tokenBuilder.length() >= 20)
break;
}
LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
}
if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
} else
throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
}
if (clientMechanism != null) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
}
return isKafkaRequest;
}
private String handleHandshakeRequest(RequestHeader requestHeader, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
String clientMechanism = handshakeRequest.mechanism();
if (enabledMechanisms.contains(clientMechanism)) {
LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism);
sendKafkaResponse(requestHeader, new SaslHandshakeResponse((short) 0, enabledMechanisms));
return clientMechanism;
} else {
LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism);
sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM.code(), enabledMechanisms));
throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism);
}
}
private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException {
sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
}
private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response) throws IOException {
ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct()));
flushNetOutBufferAndUpdateInterestOps();
}
}