blob: ec18a703384eb361196492bc1c8d9b2cd477887f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.rsc.rpc;
import java.io.IOException;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.livy.rsc.RSCConf;
/**
* Abstract SASL handler. Abstracts the auth protocol handling and encryption, if it's enabled.
* Needs subclasses to provide access to the actual underlying SASL implementation (client or
* server).
*/
abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage>
implements KryoMessageCodec.EncryptionHandler {
// LOG is not static to make debugging easier (being able to identify which sub-class
// generated the log message).
private final Logger LOG;
private final boolean requiresEncryption;
private KryoMessageCodec kryo;
private boolean hasAuthResponse = false;
protected SaslHandler(RSCConf config) {
this.requiresEncryption = Rpc.SASL_AUTH_CONF.equals(config.get(RSCConf.Entry.SASL_QOP));
this.LOG = LoggerFactory.getLogger(getClass());
}
// Use a separate method to make it easier to create a SaslHandler without having to
// plumb the KryoMessageCodec instance through the constructors.
void setKryoMessageCodec(KryoMessageCodec kryo) {
this.kryo = kryo;
}
@Override
protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg)
throws Exception {
LOG.debug("Handling SASL challenge message...");
Rpc.SaslMessage response = update(msg);
if (response != null) {
LOG.debug("Sending SASL challenge response...");
hasAuthResponse = true;
ctx.channel().writeAndFlush(response).sync();
}
if (!isComplete()) {
return;
}
// If negotiation is complete, remove this handler from the pipeline, and register it with
// the Kryo instance to handle encryption if needed.
ctx.channel().pipeline().remove(this);
String qop = getNegotiatedProperty(Sasl.QOP);
LOG.debug("SASL negotiation finished with QOP {}.", qop);
if (Rpc.SASL_AUTH_CONF.equals(qop)) {
LOG.info("SASL confidentiality enabled.");
kryo.setEncryptionHandler(this);
} else {
if (requiresEncryption) {
throw new SaslException("Encryption required, but SASL negotiation did not set it up.");
}
dispose();
}
onComplete();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
dispose();
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (!isComplete()) {
LOG.info("Exception in SASL negotiation.", cause);
onError(cause);
ctx.close();
}
ctx.fireExceptionCaught(cause);
}
protected abstract boolean isComplete();
protected abstract String getNegotiatedProperty(String name);
protected abstract Rpc.SaslMessage update(Rpc.SaslMessage challenge) throws IOException;
protected abstract void onComplete() throws Exception;
protected abstract void onError(Throwable t);
}