blob: 901ea4e4aa03380db5350196cf73eeeec8e3a56b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
/**
* Handles incoming discovery request from client and sends appropriate response back to client
*
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
private ProxyService service;
String clientAuthRole = null;
private State state;
private LookupProxyHandler lookupProxyHandler = null;
private DirectProxyHandler directProxyHandler = null;
enum State {
Init,
// Proxy the lookup requests to a random broker
// Follow redirects
ProxyLookupRequests,
// If we are proxying a connection to a specific broker, we
// are just forwarding data between the 2 connections, without
// looking into it
ProxyConnectionToBroker,
Closed,
}
private static final Gauge activeConnections = Gauge
.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
.register();
private static final Counter newConnections = Counter
.build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
.register();
public ProxyConnection(ProxyService proxyService) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
this.state = State.Init;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
activeConnections.inc();
newConnections.inc();
LOG.info("[{}] New connection opened", remoteAddress);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
activeConnections.dec();
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
switch (state) {
case Init:
case ProxyLookupRequests:
// Do the regular decoding for the Connected message
super.channelRead(ctx, msg);
break;
case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read only
// if we can write on the connection
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;
default:
break;
}
}
@Override
public void operationComplete(Future<Void> future) throws Exception {
// This is invoked when the write operation on the paired connection is completed
if (future.isSuccess()) {
ctx.read();
} else {
LOG.warn("[{}] Error in writing to inbound channel. Closing", remoteAddress, future.cause());
directProxyHandler.outboundChannel.close();
}
}
/**
* handles connect request and sends {@code State.Connected} ack to client
*/
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Init);
remoteEndpointProtocolVersion = connect.getProtocolVersion();
if (LOG.isDebugEnabled()) {
LOG.debug("Received CONNECT from {} proxyToBroker={}", remoteAddress,
connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null");
}
// Client need to do some minimal cooperation logic.
if (remoteEndpointProtocolVersion < PulsarApi.ProtocolVersion.v10_VALUE) {
LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
ctx.close();
return;
}
if (!verifyAuthenticationIfNeeded(connect)) {
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
if (connect.hasProxyToBrokerUrl()) {
// Client already knows which broker to connect. Let's open a connection
// there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl());
} else {
// Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
}
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
}
/**
* handles discovery request from client ands sends next active broker address
*/
@Override
protected void handleLookup(CommandLookupTopic lookup) {
checkArgument(state == State.ProxyLookupRequests);
lookupProxyHandler.handleLookup(lookup);
}
private void close() {
state = State.Closed;
ctx.close();
}
private boolean verifyAuthenticationIfNeeded(CommandConnect connect) {
if (!service.getConfiguration().isAuthenticationEnabled()) {
return true;
}
try {
String authMethod = "none";
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
} else if (connect.hasAuthMethod()) {
// Legacy client is passing enum
authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
}
String authData = connect.getAuthData().toStringUtf8();
ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
clientAuthRole = service.getAuthenticationService()
.authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
clientAuthRole);
return true;
} catch (AuthenticationException e) {
LOG.warn("[{}] Unable to authenticate: {}", remoteAddress, e.getMessage());
return false;
}
}
@Override
protected boolean isHandshakeCompleted() {
return state != State.Init;
}
SocketAddress clientAddress() {
return remoteAddress;
}
ChannelHandlerContext ctx() {
return ctx;
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
}