blob: 1f925f99a860bea33c7f4dae6be53b08aa9cfc70 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.authenticator;
import java.io.IOException;
import java.util.Map;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.login.Configuration;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
public class SaslServerAuthenticator implements Authenticator {
private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
private final SaslServer saslServer;
private final Subject subject;
private final String node;
private final KerberosShortNamer kerberosNamer;
private final int maxReceiveSize;
// assigned in `configure`
private TransportLayer transportLayer;
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
private NetworkSend netOutBuffer;
public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException {
if (subject == null)
throw new IllegalArgumentException("subject cannot be null");
if (subject.getPrincipals().isEmpty())
throw new IllegalArgumentException("subject must have at least one principal");
this.node = node;
this.subject = subject;
this.kerberosNamer = kerberosNameParser;
this.maxReceiveSize = maxReceiveSize;
saslServer = createSaslServer();
}
public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
this.transportLayer = transportLayer;
}
private SaslServer createSaslServer() throws IOException {
// server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject.
final SaslServerCallbackHandler saslServerCallbackHandler = new SaslServerCallbackHandler(
Configuration.getConfiguration(), kerberosNamer);
final Principal servicePrincipal = subject.getPrincipals().iterator().next();
KerberosName kerberosName;
try {
kerberosName = KerberosName.parse(servicePrincipal.getName());
} catch (IllegalArgumentException e) {
throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
}
final String servicePrincipalName = kerberosName.serviceName();
final String serviceHostname = kerberosName.hostName();
final String mech = "GSSAPI";
LOG.debug("Creating SaslServer for {} with mechanism {}", kerberosName, mech);
// As described in http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html:
// "To enable Java GSS to delegate to the native GSS library and its list of native mechanisms,
// set the system property "sun.security.jgss.native" to true"
// "In addition, when performing operations as a particular Subject, for example, Subject.doAs(...)
// or Subject.doAsPrivileged(...), the to-be-used GSSCredential should be added to Subject's
// private credential set. Otherwise, the GSS operations will fail since no credential is found."
boolean usingNativeJgss = Boolean.getBoolean("sun.security.jgss.native");
if (usingNativeJgss) {
try {
GSSManager manager = GSSManager.getInstance();
// This Oid is used to represent the Kerberos version 5 GSS-API mechanism. It is defined in
// RFC 1964.
Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
GSSName gssName = manager.createName(servicePrincipalName + "@" + serviceHostname, GSSName.NT_HOSTBASED_SERVICE);
GSSCredential cred = manager.createCredential(gssName, GSSContext.INDEFINITE_LIFETIME, krb5Mechanism, GSSCredential.ACCEPT_ONLY);
subject.getPrivateCredentials().add(cred);
} catch (GSSException ex) {
LOG.warn("Cannot add private credential to subject; clients authentication may fail", ex);
}
}
try {
return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
public SaslServer run() throws SaslException {
return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler);
}
});
} catch (PrivilegedActionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
}
/**
* Evaluates client responses via `SaslServer.evaluateResponse` and returns the issued challenge to the client until
* authentication succeeds or fails.
*
* The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
* followed by N bytes representing the opaque payload.
*/
public void authenticate() throws IOException {
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;
if (saslServer.isComplete()) {
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return;
}
if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node);
netInBuffer.readFrom(transportLayer);
if (netInBuffer.complete()) {
netInBuffer.payload().rewind();
byte[] clientToken = new byte[netInBuffer.payload().remaining()];
netInBuffer.payload().get(clientToken, 0, clientToken.length);
netInBuffer = null; // reset the networkReceive as we read all the data.
try {
byte[] response = saslServer.evaluateResponse(clientToken);
if (response != null) {
netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response));
flushNetOutBufferAndUpdateInterestOps();
}
} catch (Exception e) {
throw new IOException(e);
}
}
}
public Principal principal() {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
}
public boolean complete() {
return saslServer.isComplete();
}
public void close() throws IOException {
saslServer.dispose();
}
private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
boolean flushedCompletely = flushNetOutBuffer();
if (flushedCompletely)
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
else
transportLayer.addInterestOps(SelectionKey.OP_WRITE);
return flushedCompletely;
}
private boolean flushNetOutBuffer() throws IOException {
if (!netOutBuffer.completed())
netOutBuffer.writeTo(transportLayer);
return netOutBuffer.completed();
}
}