blob: 276758e32d632e3a0c28d35b773e187787fbbb53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc.user;
import com.google.protobuf.MessageLite;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.SaslSupport;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.AbstractServerConnection;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.OutboundRpcMessage;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.drill.exec.work.user.UserWorker;
import org.apache.hadoop.security.HadoopKerberosName;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslException;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
private static final String SERVER_NAME = "Apache Drill Server";
private final UserConnectionConfig config;
private final SSLConfig sslConfig;
private Channel sslChannel;
private final UserWorker userWorker;
private static final ConcurrentHashMap<BitToUserConnection, BitToUserConnectionConfig> userConnectionMap;
//Initializing the singleton map during startup
static {
userConnectionMap = new ConcurrentHashMap<>();
}
/**
* Serialize {@link org.apache.drill.exec.proto.UserProtos.BitToUserHandshake} instance without password
* @param inbound handshake instance for serialization
* @return String of serialized object
*/
private String serializeUserToBitHandshakeWithoutPassword(UserToBitHandshake inbound) {
StringBuilder sb = new StringBuilder();
sb.append("rpc_version: ");
sb.append(inbound.getRpcVersion());
sb.append("\ncredentials:\n\t");
sb.append(inbound.getCredentials());
sb.append("properties:");
List<Property> props = inbound.getProperties().getPropertiesList();
for (Property p: props) {
if (!p.getKey().equalsIgnoreCase("password")) {
sb.append("\n\tproperty:\n\t\t");
sb.append("key: \"");
sb.append(p.getKey());
sb.append("\"\n\t\tvalue: \"");
sb.append(p.getValue());
sb.append("\"");
}
}
sb.append("\nsupport_complex_types: ");
sb.append(inbound.getSupportComplexTypes());
sb.append("\nsupport_timeout: ");
sb.append(inbound.getSupportTimeout());
sb.append("sasl_support: ");
sb.append(inbound.getSaslSupport());
sb.append("\nclient_infos:\n\t");
sb.append(inbound.getClientInfos().toString().replace("\n", "\n\t"));
return sb.toString();
}
public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup,
UserWorker worker) throws DrillbitStartupException {
super(UserRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
allocator.getAsByteBufAllocator(),
eventLoopGroup);
this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
this.sslChannel = null;
try {
this.sslConfig = new SSLConfigBuilder()
.config(context.getConfig())
.mode(SSLConfig.Mode.SERVER)
.initializeSSLContext(true)
.validateKeyStore(true)
.build();
logger.info("Rpc server configured to use TLS protocol '{}'", sslConfig.getProtocol());
} catch (DrillException e) {
throw new DrillbitStartupException(e.getMessage(), e.getCause());
}
this.userWorker = worker;
// Initialize Singleton instance of UserRpcMetrics.
((UserRpcMetrics)UserRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
}
@Override
protected void setupSSL(ChannelPipeline pipe) {
SSLEngine sslEngine = sslConfig.createSSLEngine(config.getAllocator(), null, 0);
// Add SSL handler into pipeline
pipe.addFirst(RpcConstants.SSL_HANDLER, new SslHandler(sslEngine));
logger.debug("SSL communication between client and server is enabled.");
logger.debug(sslConfig.toString());
}
@Override
protected boolean isSslEnabled() {
return sslConfig.isUserSslEnabled();
}
@Override
public void setSslChannel(Channel c) {
sslChannel = c;
}
@Override
protected void closeSSL(){
if(isSslEnabled() && sslChannel != null){
sslChannel.close();
}
}
@Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
// a user server only expects acknowledgments on messages it creates.
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
}
/**
* Access to set of active connection details for this instance of the Drillbit
* @return Active connection set
*/
public static Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> getUserConnections() {
return userConnectionMap.entrySet();
}
/**
* It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection
* is used to get hold of {@link UserSession} which stores all session related information like session options
* changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a
* UserSession. This connection object is also used to send query data and result back to the client submitted as part
* of the session tied to this connection.
*/
public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection>
implements UserClientConnection {
private UserSession session;
private UserToBitHandshake inbound;
private String authenticatedUser;
BitToUserConnection(SocketChannel channel) {
super(channel, config, !config.isAuthEnabled()
? config.getMessageHandler()
: new ServerAuthenticationHandler<>(config.getMessageHandler(),
RpcType.SASL_MESSAGE_VALUE, RpcType.SASL_MESSAGE));
// Increase the connection count here since at this point it means that we already have the TCP connection.
// Later when connection fails for any reason then we will decrease the counter based on Netty's connection close
// handler.
incConnectionCounter();
}
void disableReadTimeout() {
getChannel().pipeline().remove(RpcConstants.TIMEOUT_HANDLER);
}
void setHandshake(final UserToBitHandshake inbound) {
this.inbound = inbound;
}
@Override
public void finalizeSaslSession() throws IOException {
final String authorizationID = getSaslServer().getAuthorizationID();
final String userName = new HadoopKerberosName(authorizationID).getShortName();
finalizeSession(userName);
logger.info("User {} logged in from {}", authenticatedUser, getRemoteAddress());
}
/**
* Sets the user on the session, and finalizes the session.
*
* @param userName user name to set on the session
*
*/
void finalizeSession(String userName) {
// create a session
session = UserSession.Builder.newBuilder()
.withCredentials(UserCredentials.newBuilder()
.setUserName(userName)
.build())
.withOptionManager(userWorker.getSystemOptions())
.withUserProperties(inbound.getProperties())
.setSupportComplexTypes(inbound.getSupportComplexTypes())
.build();
this.authenticatedUser = userName;
// if inbound impersonation is enabled and a target is mentioned
final String targetName = session.getTargetUserName();
if (config.getImpersonationManager() != null && targetName != null) {
config.getImpersonationManager().replaceUserOnSession(targetName, session);
}
}
@Override
public UserSession getSession(){
return session;
}
@Override
public void sendResult(final RpcOutcomeListener<Ack> listener, final QueryResult result) {
logger.trace("Sending result to client with {}", result);
send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, true);
}
@Override
public void sendData(final RpcOutcomeListener<Ack> listener, final QueryWritableBatch result) {
logger.trace("Sending data to client with {}", result);
send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, false, result.getBuffers());
}
@Override
protected Logger getLogger() {
return logger;
}
@Override
public ChannelFuture getChannelClosureFuture() {
return getChannel().closeFuture()
.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
cleanup();
}
});
}
@Override
public SocketAddress getRemoteAddress() {
return getChannel().remoteAddress();
}
@Override
public void channelClosed(RpcException ex) {
// log the logged out event only when authentication is enabled
if (config.isAuthEnabled()) {
logger.info("User {} logged out from {}", authenticatedUser, getRemoteAddress());
}
super.channelClosed(ex);
}
private void cleanup() {
if (session != null) {
session.close();
}
}
@Override
public void close() {
cleanup();
super.close();
}
@Override
public void incConnectionCounter() {
UserRpcMetrics.getInstance().addConnectionCount();
}
@Override
public void decConnectionCounter() {
UserRpcMetrics.getInstance().decConnectionCount();
//Removing entry in connection map (sys table)
userConnectionMap.remove(this);
}
}
@Override
protected BitToUserConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return registerAndGetConnection(channel);
}
private BitToUserConnection registerAndGetConnection(SocketChannel channel) {
BitToUserConnection bit2userConn = new BitToUserConnection(channel);
if (bit2userConn != null) {
userConnectionMap.put(bit2userConn, new BitToUserConnectionConfig());
}
return bit2userConn;
}
@Override
protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(final BitToUserConnection connection) {
return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
@Override
protected void consumeHandshake(ChannelHandlerContext ctx, UserToBitHandshake inbound) throws Exception {
BitToUserHandshake handshakeResp = getHandshakeResponse(inbound);
OutboundRpcMessage msg = new OutboundRpcMessage(RpcMode.RESPONSE, this.handshakeType, coordinationId, handshakeResp);
ctx.writeAndFlush(msg);
if (handshakeResp.getStatus() != HandshakeStatus.SUCCESS &&
handshakeResp.getStatus() != HandshakeStatus.AUTH_REQUIRED) {
// If handling handshake results in an error, throw an exception to terminate the connection.
throw new RpcException("Handshake request failed: " + handshakeResp.getErrorMessage());
}
}
@Override
public BitToUserHandshake getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Handling handshake from user to bit. {}", serializeUserToBitHandshakeWithoutPassword(inbound));
}
// if timeout is unsupported or is set to false, disable timeout.
if (!inbound.hasSupportTimeout() || !inbound.getSupportTimeout()) {
connection.disableReadTimeout();
logger.warn("Timeout Disabled as client doesn't support it.", connection.getName());
}
BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setServerInfos(UserRpcUtils.getRpcEndpointInfos(SERVER_NAME))
.addAllSupportedMethods(UserRpcConfig.SUPPORTED_SERVER_METHODS);
try {
if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) {
final String errMsg = String.format("Invalid rpc version. Expected %d, actual %d.",
UserRpcConfig.RPC_VERSION, inbound.getRpcVersion());
return handleFailure(respBuilder, HandshakeStatus.RPC_VERSION_MISMATCH, errMsg, null);
}
connection.setHandshake(inbound);
if (!config.isAuthEnabled()) {
connection.finalizeSession(inbound.getCredentials().getUserName());
respBuilder.setStatus(HandshakeStatus.SUCCESS);
return respBuilder.build();
}
// If sasl_support field is absent in handshake message then treat the client as < 1.10 client
final boolean clientSupportsSasl = inbound.hasSaslSupport();
// saslSupportOrdinal will be set to UNKNOWN_SASL_SUPPORT, if sasl_support field in handshake is set to a
// value which is unknown to this server. We will treat those clients as one which knows SASL protocol.
final int saslSupportOrdinal = (clientSupportsSasl) ? inbound.getSaslSupport().ordinal()
: SaslSupport.UNKNOWN_SASL_SUPPORT.ordinal();
// Check if client doesn't support SASL or only supports SASL_AUTH and server has encryption enabled
if ((!clientSupportsSasl || saslSupportOrdinal == SaslSupport.SASL_AUTH.ordinal())
&& config.isEncryptionEnabled()) {
throw new UserAuthenticationException("The server doesn't allow client without encryption support." +
" Please upgrade your client or talk to your system administrator.");
}
if (!clientSupportsSasl) { // for backward compatibility < 1.10
final String userName = inbound.getCredentials().getUserName();
if (logger.isTraceEnabled()) {
logger.trace("User {} on connection {} is likely using an older client.",
userName, connection.getRemoteAddress());
}
try {
String password = "";
final UserProperties props = inbound.getProperties();
for (int i = 0; i < props.getPropertiesCount(); i++) {
Property prop = props.getProperties(i);
if (DrillProperties.PASSWORD.equalsIgnoreCase(prop.getKey())) {
password = prop.getValue();
break;
}
}
final PlainFactory plainFactory;
try {
plainFactory = (PlainFactory) config.getAuthProvider()
.getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
} catch (final SaslException e) {
throw new UserAuthenticationException("The server no longer supports username/password" +
" based authentication. Please talk to your system administrator.");
}
plainFactory.getAuthenticator()
.authenticate(userName, password);
connection.changeHandlerTo(config.getMessageHandler());
connection.finalizeSession(userName);
respBuilder.setStatus(HandshakeStatus.SUCCESS);
logger.info("Authenticated {} from {} successfully using PLAIN", userName,
connection.getRemoteAddress());
return respBuilder.build();
} catch (UserAuthenticationException ex) {
return handleFailure(respBuilder, HandshakeStatus.AUTH_FAILED, ex.getMessage(), ex);
}
}
// Offer all the configured mechanisms to client. If certain mechanism doesn't support encryption
// like PLAIN, those should fail during the SASL handshake negotiation.
respBuilder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
// set the encrypted flag in handshake message. For older clients this field is optional so will be ignored
respBuilder.setEncrypted(connection.isEncryptionEnabled());
respBuilder.setMaxWrappedSize(connection.getMaxWrappedSize());
// for now, this means PLAIN credentials will be sent over twice
// (during handshake and during sasl exchange)
respBuilder.setStatus(HandshakeStatus.AUTH_REQUIRED);
return respBuilder.build();
} catch (Exception e) {
return handleFailure(respBuilder, HandshakeStatus.UNKNOWN_FAILURE, e.getMessage(), e);
}
}
};
}
/**
* Complete building the given builder for <i>BitToUserHandshake</i> message with given status and error details.
*
* @param respBuilder Instance of {@link org.apache.drill.exec.proto.UserProtos.BitToUserHandshake} builder which
* has RPC version field already set.
* @param status Status of handling handshake request.
* @param errMsg Error message.
* @param exception Optional exception.
* @return
*/
private static BitToUserHandshake handleFailure(BitToUserHandshake.Builder respBuilder, HandshakeStatus status,
String errMsg, Exception exception) {
final String errorId = UUID.randomUUID().toString();
if (exception != null) {
logger.error("Error {} in Handling handshake request: {}, {}", errorId, status, errMsg, exception);
} else {
logger.error("Error {} in Handling handshake request: {}, {}", errorId, status, errMsg);
}
return respBuilder
.setStatus(status)
.setErrorId(errorId)
.setErrorMessage(errMsg)
.build();
}
@Override
protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
return new UserProtobufLengthDecoder(allocator, outOfMemoryHandler);
}
/**
* User Connection's config for System Table access
*/
public class BitToUserConnectionConfig {
private DateTime established;
private boolean isAuthEnabled;
private boolean isEncryptionEnabled;
private boolean isSSLEnabled;
public BitToUserConnectionConfig() {
established = new DateTime(); //Current Joda-based Time
isAuthEnabled = config.isAuthEnabled();
isEncryptionEnabled = config.isEncryptionEnabled();
isSSLEnabled = config.isSSLEnabled();
}
public boolean isAuthEnabled() {
return isAuthEnabled;
}
public boolean isEncryptionEnabled() {
return isEncryptionEnabled;
}
public boolean isSSLEnabled() {
return isSSLEnabled;
}
public DateTime getEstablished() {
return established;
}
}
}