blob: 3fdb0d5e88be90f8c44b1d4d3e197c4d85b5c6d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import lombok.Getter;
/**
* Handles incoming discovery request from client and sends appropriate response back to client
*
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
private ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
private LookupProxyHandler lookupProxyHandler = null;
@Getter
private DirectProxyHandler directProxyHandler = null;
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
private String authMethod = "none";
AuthenticationProvider authenticationProvider;
AuthenticationState authState;
private ClientConfigurationData clientConf;
private boolean hasProxyToBrokerUrl;
private int protocolVersionToAdvertise;
private String proxyToBrokerUrl;
private HAProxyMessage haProxyMessage;
enum State {
Init,
// Connecting between user client and proxy server.
// Mutual authn needs verify between client and proxy server several times.
Connecting,
// Proxy the lookup requests to a random broker
// Follow redirects
ProxyLookupRequests,
// If we are proxying a connection to a specific broker, we
// are just forwarding data between the 2 connections, without
// looking into it
ProxyConnectionToBroker,
Closed,
}
ConnectionPool getConnectionPool() {
return client.getCnxPool();
}
public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
ProxyService.activeConnections.inc();
if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
ProxyService.rejectedConnections.inc();
return;
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
ProxyService.activeConnections.dec();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ProxyService.newConnections.inc();
service.getClientCnxs().add(this);
LOG.info("[{}] New connection opened", remoteAddress);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
}
if (client != null) {
client.close();
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
ClientCnx.isKnownException(cause) ? null : cause);
ctx.close();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
haProxyMessage = (HAProxyMessage) msg;
return;
}
switch (state) {
case Init:
case Connecting:
case ProxyLookupRequests:
// Do the regular decoding for the Connected message
super.channelRead(ctx, msg);
break;
case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only if we can write on the connection
ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
ProxyService.bytesCounter.inc(bytes);
}
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;
default:
break;
}
}
@Override
public void operationComplete(Future<Void> future) throws Exception {
// This is invoked when the write operation on the paired connection is
// completed
if (future.isSuccess()) {
ctx.read();
} else {
LOG.warn("[{}] Error in writing to inbound channel. Closing", remoteAddress, future.cause());
directProxyHandler.outboundChannel.close();
}
}
private void completeConnect() {
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Client already knows which broker to connect. Let's open a
// connection there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
protocolVersionToAdvertise, sslHandlerSupplier);
cancelKeepAliveTask();
} else {
// Client is doing a lookup, we can consider the handshake complete
// and we'll take care of just topics and
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
}
}
private void createClientAndCompleteConnect(AuthData clientData)
throws PulsarClientException {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
completeConnect();
}
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
createClientAndCompleteConnect(clientData);
return;
}
// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise));
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
}
state = State.Connecting;
return;
}
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Init);
this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
this.protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null";
if (LOG.isDebugEnabled()) {
LOG.debug("Received CONNECT from {} proxyToBroker={}", remoteAddress, proxyToBrokerUrl);
LOG.debug(
"[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
Commands.getCurrentProtocolVersion());
}
if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) {
LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
ctx.close();
return;
}
try {
// init authn
this.clientConf = createClientConfiguration();
this.clientAuthentication = clientConf.getAuthentication();
int protocolVersion = getProtocolVersionToAdvertise(connect);
// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());
completeConnect();
return;
}
AuthData clientData = AuthData.of(connect.getAuthData());
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
} else if (connect.hasAuthMethod()) {
// Legacy client is passing enum
authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
} else {
authMethod = "none";
}
authenticationProvider = service
.getAuthenticationService()
.getAuthenticationProvider(authMethod);
// Not find provider named authMethod. Most used for tests.
// In AuthenticationDisabled, it will set authMethod "none".
if (authenticationProvider == null) {
clientAuthRole = service.getAuthenticationService().getAnonymousUserRole()
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));
createClientAndCompleteConnect(clientData);
return;
}
// init authState and other var
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
authenticationData = authState.getAuthDataSource();
doAuthentication(clientData);
} catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
}
@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
checkArgument(state == State.Connecting);
checkArgument(authResponse.hasResponse());
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
if (LOG.isDebugEnabled()) {
LOG.debug("Received AuthResponse from {}, auth method: {}",
remoteAddress, authResponse.getResponse().getAuthMethodName());
}
try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
} catch (Exception e) {
String msg = "Unable to handleAuthResponse";
LOG.warn("[{}] {} ", remoteAddress, msg, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
close();
}
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
}
@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
}
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleGetSchema(commandGetSchema);
}
/**
* handles discovery request from client ands sends next active broker address
*/
@Override
protected void handleLookup(CommandLookupTopic lookup) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleLookup(lookup);
}
private void close() {
state = State.Closed;
ctx.close();
try {
client.close();
} catch (PulsarClientException e) {
LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
}
}
ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters()));
}
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
clientConf.setUseKeyStoreTls(true);
clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
} else {
clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
}
}
return clientConf;
}
private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
}
private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}
long newRequestId() {
return client.newRequestId();
}
public Authentication getClientAuthentication() {
return clientAuthentication;
}
@Override
protected boolean isHandshakeCompleted() {
return state != State.Init;
}
SocketAddress clientAddress() {
return remoteAddress;
}
ChannelHandlerContext ctx() {
return ctx;
}
public boolean hasHAProxyMessage() {
return haProxyMessage != null;
}
public HAProxyMessage getHAProxyMessage() {
return haProxyMessage;
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
}