blob: ba201dcd67144c1453decd3c677378b847f4587b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.authenticator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Arrays;
import java.util.Map;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SaslClientAuthenticator implements Authenticator {
public enum SaslState {
SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, COMPLETE, FAILED
}
private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
private final Subject subject;
private final String servicePrincipal;
private final String host;
private final String node;
private final String mechanism;
private final boolean handshakeRequestEnable;
// assigned in `configure`
private SaslClient saslClient;
private Map<String, ?> configs;
private String clientPrincipalName;
private AuthCallbackHandler callbackHandler;
private TransportLayer transportLayer;
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
private NetworkSend netOutBuffer;
// Current SASL state
private SaslState saslState;
// Next SASL state to be set when outgoing writes associated with the current SASL state complete
private SaslState pendingSaslState;
// Correlation ID for the next request
private int correlationId;
// Request header for which response from the server is pending
private RequestHeader currentRequestHeader;
public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable) throws IOException {
this.node = node;
this.subject = subject;
this.host = host;
this.servicePrincipal = servicePrincipal;
this.mechanism = mechanism;
this.handshakeRequestEnable = handshakeRequestEnable;
this.correlationId = -1;
}
public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException {
try {
this.transportLayer = transportLayer;
this.configs = configs;
setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
// determine client principal from subject.
if (!subject.getPrincipals().isEmpty()) {
Principal clientPrincipal = subject.getPrincipals().iterator().next();
this.clientPrincipalName = clientPrincipal.getName();
} else {
clientPrincipalName = null;
}
callbackHandler = new SaslClientCallbackHandler();
callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
saslClient = createSaslClient();
} catch (Exception e) {
throw new KafkaException("Failed to configure SaslClientAuthenticator", e);
}
}
private SaslClient createSaslClient() {
try {
return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
public SaslClient run() throws SaslException {
String[] mechs = {mechanism};
LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
}
});
} catch (PrivilegedActionException e) {
throw new KafkaException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
}
}
/**
* Sends an empty message to the server to initiate the authentication process. It then evaluates server challenges
* via `SaslClient.evaluateChallenge` and returns client responses until authentication succeeds or fails.
*
* The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
* followed by N bytes representing the opaque payload.
*/
public void authenticate() throws IOException {
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;
switch (saslState) {
case SEND_HANDSHAKE_REQUEST:
String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct()));
setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
break;
case RECEIVE_HANDSHAKE_RESPONSE:
byte[] responseBytes = receiveResponseOrToken();
if (responseBytes == null)
break;
else {
try {
handleKafkaResponse(currentRequestHeader, responseBytes);
currentRequestHeader = null;
} catch (Exception e) {
setSaslState(SaslState.FAILED);
throw e;
}
setSaslState(SaslState.INITIAL);
// Fall through and start SASL authentication using the configured client mechanism
}
case INITIAL:
sendSaslToken(new byte[0], true);
setSaslState(SaslState.INTERMEDIATE);
break;
case INTERMEDIATE:
byte[] serverToken = receiveResponseOrToken();
if (serverToken != null) {
sendSaslToken(serverToken, false);
}
if (saslClient.isComplete()) {
setSaslState(SaslState.COMPLETE);
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
}
break;
case COMPLETE:
break;
case FAILED:
throw new IOException("SASL handshake failed");
}
}
private void setSaslState(SaslState saslState) {
if (netOutBuffer != null && !netOutBuffer.completed())
pendingSaslState = saslState;
else {
this.pendingSaslState = null;
this.saslState = saslState;
LOG.debug("Set SASL client state to {}", saslState);
}
}
private void sendSaslToken(byte[] serverToken, boolean isInitial) throws IOException {
if (!saslClient.isComplete()) {
byte[] saslToken = createSaslToken(serverToken, isInitial);
if (saslToken != null)
send(ByteBuffer.wrap(saslToken));
}
}
private void send(ByteBuffer buffer) throws IOException {
try {
netOutBuffer = new NetworkSend(node, buffer);
flushNetOutBufferAndUpdateInterestOps();
} catch (IOException e) {
setSaslState(SaslState.FAILED);
throw e;
}
}
private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
boolean flushedCompletely = flushNetOutBuffer();
if (flushedCompletely) {
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
if (pendingSaslState != null)
setSaslState(pendingSaslState);
} else
transportLayer.addInterestOps(SelectionKey.OP_WRITE);
return flushedCompletely;
}
private byte[] receiveResponseOrToken() throws IOException {
if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
netInBuffer.readFrom(transportLayer);
byte[] serverPacket = null;
if (netInBuffer.complete()) {
netInBuffer.payload().rewind();
serverPacket = new byte[netInBuffer.payload().remaining()];
netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
netInBuffer = null; // reset the networkReceive as we read all the data.
}
return serverPacket;
}
public Principal principal() {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName);
}
public boolean complete() {
return saslState == SaslState.COMPLETE;
}
public void close() throws IOException {
if (saslClient != null)
saslClient.dispose();
if (callbackHandler != null)
callbackHandler.close();
}
private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
if (saslToken == null)
throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
try {
if (isInitial && !saslClient.hasInitialResponse())
return saslToken;
else
return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
public byte[] run() throws SaslException {
return saslClient.evaluateChallenge(saslToken);
}
});
} catch (PrivilegedActionException e) {
String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
// Try to provide hints to use about what went wrong so they can fix their configuration.
// TODO: introspect about e: look for GSS information.
final String unknownServerErrorText =
"(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
if (e.toString().indexOf(unknownServerErrorText) > -1) {
error += " This may be caused by Java's being unable to resolve the Kafka Broker's" +
" hostname correctly. You may want to try to adding" +
" '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment." +
" Users must configure FQDN of kafka brokers when authenticating using SASL and" +
" `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
}
error += " Kafka Client will go to AUTH_FAILED state.";
//Unwrap the SaslException inside `PrivilegedActionException`
throw new SaslException(error, e.getCause());
}
}
private boolean flushNetOutBuffer() throws IOException {
if (!netOutBuffer.completed()) {
netOutBuffer.writeTo(transportLayer);
}
return netOutBuffer.completed();
}
private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
Struct struct;
ApiKeys apiKey;
try {
struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
apiKey = ApiKeys.forId(requestHeader.apiKey());
} catch (SchemaException | IllegalArgumentException e) {
LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
throw new AuthenticationException("Invalid SASL mechanism response", e);
}
switch (apiKey) {
case SASL_HANDSHAKE:
handleSaslHandshakeResponse(new SaslHandshakeResponse(struct));
break;
default:
throw new IllegalStateException("Unexpected API key during handshake: " + apiKey);
}
}
private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
Errors error = Errors.forCode(response.errorCode());
switch (error) {
case NONE:
break;
case UNSUPPORTED_SASL_MECHANISM:
throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s",
mechanism, response.enabledMechanisms()));
case ILLEGAL_SASL_STATE:
throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s",
mechanism, response.enabledMechanisms()));
default:
throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s",
response.errorCode(), mechanism, response.enabledMechanisms()));
}
}
}