blob: 61233f10a66164acdbedd89e155e07db7755e0fb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.auth;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Strings;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.naming.AuthenticationException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.KerberosName;
import org.apache.pulsar.common.sasl.SaslConstants;
/**
* A SASL Client object.
* This is added for support Kerberos authentication.
*/
@Slf4j
public class PulsarSaslClient {
private final SaslClient saslClient;
private final Subject clientSubject;
public PulsarSaslClient(String serverHostname, String serverType, Subject subject) throws SaslException {
checkArgument(subject != null, "Cannot create SASL client with NULL JAAS subject");
checkArgument(!Strings.isNullOrEmpty(serverHostname), "Cannot create SASL client with NUll server name");
if (!serverType.equals(SaslConstants.SASL_BROKER_PROTOCOL) && !serverType
.equals(SaslConstants.SASL_PROXY_PROTOCOL)) {
log.warn("The server type {} is not recommended", serverType);
}
String serverPrincipal = serverType.toLowerCase() + "/" + serverHostname;
this.clientSubject = subject;
if (clientSubject.getPrincipals().isEmpty()) {
throw new SaslException("Cannot create SASL client with empty JAAS subject principal");
}
// GSSAPI/Kerberos
final Object[] principals = clientSubject.getPrincipals().toArray();
final Principal clientPrincipal = (Principal) principals[0];
final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
KerberosName serviceKerberosName = new KerberosName(serverPrincipal + "@" + clientKerberosName.getRealm());
final String serviceName = serviceKerberosName.getServiceName();
final String serviceHostname = serviceKerberosName.getHostName();
final String clientPrincipalName = clientKerberosName.toString();
log.info("Using JAAS/SASL/GSSAPI auth to connect to server Principal {},",
serverPrincipal);
try {
this.saslClient = Subject.doAs(clientSubject, new PrivilegedExceptionAction<SaslClient>() {
@Override
public SaslClient run() throws SaslException {
String[] mechs = {"GSSAPI"};
return Sasl.createSaslClient(mechs, clientPrincipalName, serviceName, serviceHostname, null,
new ClientCallbackHandler());
}
});
} catch (PrivilegedActionException err) {
log.error("GSSAPI client error", err.getCause());
throw new SaslException("error while booting GSSAPI client", err.getCause());
}
if (saslClient == null) {
throw new SaslException("Cannot create JVM SASL Client");
}
}
public AuthData evaluateChallenge(final AuthData saslToken) throws AuthenticationException {
if (saslToken == null) {
throw new AuthenticationException("saslToken is null");
}
try {
if (clientSubject != null) {
final byte[] retval = Subject.doAs(clientSubject, new PrivilegedExceptionAction<byte[]>() {
@Override
public byte[] run() throws SaslException {
return saslClient.evaluateChallenge(saslToken.getBytes());
}
});
return AuthData.of(retval);
} else {
return AuthData.of(saslClient.evaluateChallenge(saslToken.getBytes()));
}
} catch (Exception e) {
log.error("SASL error", e.getCause());
throw new AuthenticationException("SASL/JAAS error" + e.getCause());
}
}
public boolean hasInitialResponse() {
return saslClient.hasInitialResponse();
}
static class ClientCallbackHandler implements CallbackHandler {
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
handleAuthorizeCallback((AuthorizeCallback) callback);
} else {
throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Client Callback.");
}
}
}
private void handleAuthorizeCallback(AuthorizeCallback ac) {
String authid = ac.getAuthenticationID();
String authzid = ac.getAuthorizationID();
if (authid.equals(authzid)) {
ac.setAuthorized(true);
} else {
ac.setAuthorized(false);
}
if (ac.isAuthorized()) {
ac.setAuthorizedID(authzid);
}
log.info("Successfully authenticated. authenticationID: {}; authorizationID: {}.",
authid, authzid);
}
}
public boolean isComplete() {
return saslClient.isComplete();
}
}