blob: 58203eee51c28fc60b1e179f168456e78d47cf58 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handles incoming discovery request from client and sends appropriate response back to client.
*
*/
public class ProxyConnection extends PulsarHandler {
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
// ConnectionPool is used by the proxy to issue lookup requests
private ConnectionPool connectionPool;
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private final ProxyService service;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
private LookupProxyHandler lookupProxyHandler = null;
@Getter
private DirectProxyHandler directProxyHandler = null;
private final BrokerProxyValidator brokerProxyValidator;
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
private String authMethod = "none";
AuthenticationProvider authenticationProvider;
AuthenticationState authState;
private ClientConfigurationData clientConf;
private boolean hasProxyToBrokerUrl;
private int protocolVersionToAdvertise;
private String proxyToBrokerUrl;
private HAProxyMessage haProxyMessage;
private static final byte[] EMPTY_CREDENTIALS = new byte[0];
enum State {
Init,
// Connecting between user client and proxy server.
// Mutual authn needs verify between client and proxy server several times.
Connecting,
// Proxy the lookup requests to a random broker
// Follow redirects
ProxyLookupRequests,
// If we are proxying a connection to a specific broker, we
// are just forwarding data between the 2 connections, without
// looking into it
ProxyConnectionToBroker,
Closing,
Closed,
}
ConnectionPool getConnectionPool() {
return connectionPool;
}
public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
this.brokerProxyValidator = service.getBrokerProxyValidator();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
ProxyService.ACTIVE_CONNECTIONS.inc();
if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
state = State.Closing;
ctx.close();
ProxyService.REJECTED_CONNECTIONS.inc();
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
ProxyService.ACTIVE_CONNECTIONS.dec();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ProxyService.NEW_CONNECTIONS.inc();
service.getClientCnxs().add(this);
LOG.info("[{}] New connection opened", remoteAddress);
}
@Override
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);
if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}
state = State.Closed;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
state = State.Closing;
super.exceptionCaught(ctx, cause);
LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
ClientCnx.isKnownException(cause) ? null : cause);
ctx.close();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
// handle backpressure
// stop/resume reading input from connection between the proxy and the broker
// when the writability of the connection between the client and the proxy changes
directProxyHandler.outboundChannel.config().setAutoRead(ctx.channel().isWritable());
}
super.channelWritabilityChanged(ctx);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
haProxyMessage = (HAProxyMessage) msg;
return;
}
switch (state) {
case Init:
case Connecting:
case ProxyLookupRequests:
// Do the regular decoding for the Connected message
super.channelRead(ctx, msg);
break;
case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only if we can write on the connection
ProxyService.OPS_COUNTER.inc();
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
ProxyService.BYTES_COUNTER.inc(bytes);
}
directProxyHandler.outboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
default:
break;
}
}
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
}
} else {
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
remoteAddress, state);
}
}
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Optimize proxy connection to fail-fast if the target broker isn't active
// Pulsar client will retry connecting after a back off timeout
if (service.getConfiguration().isCheckActiveBrokers()
&& !isBrokerActive(proxyToBrokerUrl)) {
state = State.Closing;
LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
ctx()
.writeAndFlush(
Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."))
.addListener(ChannelFutureListener.CLOSE);
return;
}
brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
.thenAcceptAsync(address -> {
// Client already knows which broker to connect. Let's open a
// connection there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
protocolVersionToAdvertise, sslHandlerSupplier);
}, ctx.executor())
.exceptionally(throwable -> {
if (throwable instanceof TargetAddressDeniedException
|| throwable.getCause() instanceof TargetAddressDeniedException) {
TargetAddressDeniedException targetAddressDeniedException =
(TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException
? throwable : throwable.getCause());
LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
authMethod, clientAuthRole);
} else {
LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
}
ctx()
.writeAndFlush(
Commands.newError(-1, ServerError.ServiceNotReady,
"Target broker cannot be validated."))
.addListener(ChannelFutureListener.CLOSE);
return null;
});
} else {
// Client is doing a lookup, we can consider the handshake complete
// and we'll take care of just topics and
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}
}
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
completeConnect(clientData);
return;
}
// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
}
state = State.Connecting;
}
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Init);
this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
this.protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null";
if (LOG.isDebugEnabled()) {
LOG.debug("Received CONNECT from {} proxyToBroker={}", remoteAddress, proxyToBrokerUrl);
LOG.debug(
"[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
Commands.getCurrentProtocolVersion());
}
if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) {
LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
state = State.Closing;
ctx.close();
return;
}
try {
// init authn
this.clientConf = createClientConfiguration();
// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
completeConnect(null);
return;
}
AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : EMPTY_CREDENTIALS);
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
} else if (connect.hasAuthMethod()) {
// Legacy client is passing enum
authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
} else {
authMethod = "none";
}
authenticationProvider = service
.getAuthenticationService()
.getAuthenticationProvider(authMethod);
// Not find provider named authMethod. Most used for tests.
// In AuthenticationDisabled, it will set authMethod "none".
if (authenticationProvider == null) {
clientAuthRole = service.getAuthenticationService().getAnonymousUserRole()
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));
completeConnect(clientData);
return;
}
// init authState and other var
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
authenticationData = authState.getAuthDataSource();
doAuthentication(clientData);
} catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"))
.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
checkArgument(state == State.Connecting);
checkArgument(authResponse.hasResponse());
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
if (LOG.isDebugEnabled()) {
LOG.debug("Received AuthResponse from {}, auth method: {}",
remoteAddress, authResponse.getResponse().getAuthMethodName());
}
try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
} catch (Exception e) {
String msg = "Unable to handleAuthResponse";
LOG.warn("[{}] {} ", remoteAddress, msg, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg))
.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
}
@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
}
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleGetSchema(commandGetSchema);
}
/**
* handles discovery request from client ands sends next active broker address.
*/
@Override
protected void handleLookup(CommandLookupTopic lookup) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleLookup(lookup);
}
ClientConfigurationData createClientConfiguration() {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
clientConf.setAuthentication(this.getClientAuthentication());
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
clientConf.setUseKeyStoreTls(true);
clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
} else {
clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
}
}
return clientConf;
}
private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}
long newRequestId() {
return requestIdGenerator.getAndIncrement();
}
public Authentication getClientAuthentication() {
return service.getProxyClientAuthenticationPlugin();
}
@Override
protected boolean isHandshakeCompleted() {
return state != State.Init;
}
SocketAddress clientAddress() {
return remoteAddress;
}
ChannelHandlerContext ctx() {
return ctx;
}
public boolean hasHAProxyMessage() {
return haProxyMessage != null;
}
public HAProxyMessage getHAProxyMessage() {
return haProxyMessage;
}
private boolean isBrokerActive(String targetBrokerHostPort) {
for (ServiceLookupData serviceLookupData : getAvailableBrokers()) {
if (matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort)
|| matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(),
targetBrokerHostPort)) {
return true;
}
}
return false;
}
private List<? extends ServiceLookupData> getAvailableBrokers() {
if (service.getDiscoveryProvider() == null) {
LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null."
+ "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+ "when checkActiveBrokers is enabled.");
return Collections.emptyList();
}
try {
return service.getDiscoveryProvider().getAvailableBrokers();
} catch (PulsarServerException e) {
LOG.error("Unable to get available brokers", e);
return Collections.emptyList();
}
}
static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) {
return pulsarServiceUrl != null
&& pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length()
&& pulsarServiceUrl.startsWith(expectedPrefix)
&& pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length());
}
}