blob: dd3f0e5092f564285f735218fc9cb7f93f1948f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.PulsarLengthFieldFrameDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class DirectProxyHandler {
private Channel inboundChannel;
Channel outboundChannel;
private final Authentication authentication;
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
this.authentication = service.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
// Tie the backend connection on the same thread to avoid context switches when passing data between the 2
// connections
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new PulsarLengthFieldFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast(new ProxyBackendHandler());
}
});
URI targetBroker;
try {
// targetBrokerUrl is coming in the "hostname:6650" form, so we need to extract host and port
targetBroker = new URI("pulsar://" + targetBrokerUrl);
} catch (URISyntaxException e) {
log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
inboundChannel.close();
return;
}
ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort());
outboundChannel = f.channel();
f.addListener(future -> {
if (!future.isSuccess()) {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
});
}
enum BackendState {
Init, HandshakeCompleted
}
public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {
private BackendState state = BackendState.Init;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send the Connect command to broker
String authData = "";
if (authentication.getAuthData().hasDataFromCommand()) {
authData = authentication.getAuthData().getCommandData();
}
outboundChannel
.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData, "Pulsar proxy"));
outboundChannel.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
switch (state) {
case Init:
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received msg on broker connection: {}", inboundChannel, outboundChannel,
msg.getClass());
}
// Do the regular decoding for the Connected message
super.channelRead(ctx, msg);
break;
case HandshakeCompleted:
inboundChannel.writeAndFlush(msg).addListener(this);
break;
default:
break;
}
}
@Override
public void operationComplete(Future<Void> future) throws Exception {
// This is invoked when the write operation on the paired connection is completed
if (future.isSuccess()) {
outboundChannel.read();
} else {
log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", inboundChannel,
outboundChannel, future.cause());
inboundChannel.close();
}
}
@Override
protected void messageReceived() {
// no-op
}
@Override
protected void handleConnected(CommandConnected connected) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
}
state = BackendState.HandshakeCompleted;
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
}
inboundChannel.pipeline().remove("frameDecoder");
outboundChannel.pipeline().remove("frameDecoder");
// Start reading from both connections
inboundChannel.read();
outboundChannel.read();
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
inboundChannel.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.warn("[{}] [{}] Caught exception: {}", inboundChannel, outboundChannel, cause.getMessage(), cause);
ctx.close();
}
}
private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);
}