| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.socks5.handler; |
| |
| import io.netty.bootstrap.Bootstrap; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelInboundHandlerAdapter; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.SimpleChannelInboundHandler; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.channel.socket.nio.NioSocketChannel; |
| import io.netty.handler.codec.socksx.v5.DefaultSocks5CommandRequest; |
| import io.netty.handler.codec.socksx.v5.DefaultSocks5CommandResponse; |
| import io.netty.handler.codec.socksx.v5.Socks5AddressType; |
| import io.netty.handler.codec.socksx.v5.Socks5CommandStatus; |
| import io.netty.handler.codec.socksx.v5.Socks5CommandType; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.pulsar.socks5.Socks5Server; |
| |
| @Slf4j |
| public class CommandRequestHandler extends SimpleChannelInboundHandler<DefaultSocks5CommandRequest> { |
| |
| private final Socks5Server socks5Server; |
| |
| public CommandRequestHandler(Socks5Server socks5Server) { |
| this.socks5Server = socks5Server; |
| } |
| |
| @Override |
| protected void channelRead0(final ChannelHandlerContext clientChannelContext, DefaultSocks5CommandRequest msg) throws Exception { |
| if (Socks5CommandType.CONNECT.equals(msg.type())) { |
| Bootstrap bootstrap = new Bootstrap(); |
| bootstrap.group(socks5Server.getBoss()) |
| .channel(NioSocketChannel.class) |
| .option(ChannelOption.TCP_NODELAY, true) |
| .handler(new ChannelInitializer<SocketChannel>() { |
| @Override |
| protected void initChannel(SocketChannel ch) throws Exception { |
| ch.pipeline().addLast(new ClientHandler(clientChannelContext)); |
| } |
| }); |
| ChannelFuture future = bootstrap.connect(msg.dstAddr(), msg.dstPort()); |
| future.addListener(new ChannelFutureListener() { |
| |
| public void operationComplete(final ChannelFuture future) throws Exception { |
| if (future.isSuccess()) { |
| if (log.isDebugEnabled()) { |
| log.debug("connected : {} {}", msg.dstAddr(), msg.dstPort()); |
| } |
| clientChannelContext.pipeline().addLast(new TargetHandler(future)); |
| clientChannelContext.writeAndFlush(new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, Socks5AddressType.IPv4)); |
| } else { |
| clientChannelContext.writeAndFlush(new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, Socks5AddressType.IPv4)); |
| } |
| } |
| }); |
| } else { |
| clientChannelContext.fireChannelRead(msg); |
| } |
| } |
| |
| private static class ClientHandler extends ChannelInboundHandlerAdapter { |
| |
| private ChannelHandlerContext clientChannelContext; |
| |
| public ClientHandler(ChannelHandlerContext clientChannelContext) { |
| this.clientChannelContext = clientChannelContext; |
| } |
| |
| @Override |
| public void channelRead(ChannelHandlerContext ctx2, Object destMsg) throws Exception { |
| clientChannelContext.writeAndFlush(destMsg); |
| } |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx2) throws Exception { |
| clientChannelContext.channel().close(); |
| } |
| } |
| |
| private static class TargetHandler extends ChannelInboundHandlerAdapter { |
| |
| private ChannelFuture targetChannel; |
| |
| public TargetHandler(ChannelFuture targetChannel) { |
| this.targetChannel = targetChannel; |
| } |
| |
| @Override |
| public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| targetChannel.channel().writeAndFlush(msg); |
| } |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
| targetChannel.channel().close(); |
| } |
| } |
| |
| } |