blob: 3c66d7558ae953bfe2096da6bc61e9a682496c42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.dubbo.decode;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler;
import org.apache.dubbo.remoting.transport.DecodeHandler;
import org.apache.dubbo.remoting.transport.MultiMessageHandler;
import org.apache.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer;
import org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation;
import org.apache.dubbo.rpc.protocol.dubbo.DubboCodec;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* These junit tests aim to test unpack and stick pack of dubbo and telnet
*/
public class DubboTelnetDecodeTest {
private static AtomicInteger dubbo = new AtomicInteger(0);
private static AtomicInteger telnet = new AtomicInteger(0);
private static AtomicInteger telnetDubbo = new AtomicInteger(0);
private static AtomicInteger dubboDubbo = new AtomicInteger(0);
private static AtomicInteger dubboTelnet = new AtomicInteger(0);
private static AtomicInteger telnetTelnet = new AtomicInteger(0);
@BeforeAll
public static void setup() {
ApplicationModel.getServiceRepository().destroy();
ApplicationModel.getServiceRepository().registerService(DemoService.class);
}
@AfterAll
public static void teardown() {
ApplicationModel.getServiceRepository().destroy();
}
/**
* just dubbo request
*
* @throws InterruptedException
*/
@Test
public void testDubboDecode() throws InterruptedException, IOException {
ByteBuf dubboByteBuf = createDubboByteBuf();
EmbeddedChannel ch = null;
try {
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
MockHandler mockHandler = new MockHandler(null,
new MultiMessageHandler(
new DecodeHandler(
new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
if (checkDubboDecoded(msg)) {
dubbo.incrementAndGet();
}
return getDefaultFuture();
}
}))));
ch = new LocalEmbeddedChannel();
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("handler", mockHandler);
ch.writeInbound(dubboByteBuf);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ch != null) {
ch.close().await(200, TimeUnit.MILLISECONDS);
}
}
TimeUnit.MILLISECONDS.sleep(100);
Assertions.assertEquals(1, dubbo.get());
}
/**
* just telnet request
*
* @throws InterruptedException
*/
@Test
public void testTelnetDecode() throws InterruptedException {
ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("test\r\n".getBytes());
EmbeddedChannel ch = null;
try {
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
MockHandler mockHandler = new MockHandler((msg) -> {
if (checkTelnetDecoded(msg)) {
telnet.incrementAndGet();
}
},
new MultiMessageHandler(
new DecodeHandler(
new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
return getDefaultFuture();
}
}))));
ch = new LocalEmbeddedChannel();
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("handler", mockHandler);
ch.writeInbound(telnetByteBuf);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ch != null) {
ch.close().await(200, TimeUnit.MILLISECONDS);
}
}
TimeUnit.MILLISECONDS.sleep(100);
Assertions.assertEquals(1, telnet.get());
}
/**
* telnet and dubbo request
*
* <p>
* First ByteBuf:
* +--------------------------------------------------+
* | telnet(incomplete) |
* +--------------------------------------------------+
* <p>
*
* Second ByteBuf:
* +--------------------------++----------------------+
* | telnet(the remaining) || dubbo(complete) |
* +--------------------------++----------------------+
* ||
* Magic Code
*
* @throws InterruptedException
*/
@Test
public void testTelnetDubboDecoded() throws InterruptedException, IOException {
ByteBuf dubboByteBuf = createDubboByteBuf();
ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("test\r".getBytes());
EmbeddedChannel ch = null;
try {
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
MockHandler mockHandler = new MockHandler((msg) -> {
if (checkTelnetDecoded(msg)) {
telnetDubbo.incrementAndGet();
}
},
new MultiMessageHandler(
new DecodeHandler(
new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
if (checkDubboDecoded(msg)) {
telnetDubbo.incrementAndGet();
}
return getDefaultFuture();
}
}))));
ch = new LocalEmbeddedChannel();
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("handler", mockHandler);
ch.writeInbound(telnetByteBuf);
ch.writeInbound(Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("\n".getBytes()), dubboByteBuf));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ch != null) {
ch.close().await(200, TimeUnit.MILLISECONDS);
}
}
TimeUnit.MILLISECONDS.sleep(100);
Assertions.assertEquals(2, telnetDubbo.get());
}
/**
* NOTE: This test case actually will fail, but the probability of this case is very small,
* and users should use telnet in new QOS port(default port is 22222) since dubbo 2.5.8,
* so we could ignore this problem.
*
* <p>
* telnet and telnet request
*
* <p>
* First ByteBuf (firstByteBuf):
* +--------------------------------------------------+
* | telnet(incomplete) |
* +--------------------------------------------------+
* <p>
*
* Second ByteBuf (secondByteBuf):
* +--------------------------------------------------+
* | telnet(the remaining) | telnet(complete) |
* +--------------------------------------------------+
*
* @throws InterruptedException
*/
// @Test
public void testTelnetTelnetDecoded() throws InterruptedException {
ByteBuf firstByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes());
ByteBuf secondByteBuf = Unpooled.wrappedBuffer("\nls\r\n".getBytes());
EmbeddedChannel ch = null;
try {
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
MockHandler mockHandler = new MockHandler((msg) -> {
if (checkTelnetDecoded(msg)) {
telnetTelnet.incrementAndGet();
}
},
new MultiMessageHandler(
new DecodeHandler(
new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
return getDefaultFuture();
}
}))));
ch = new LocalEmbeddedChannel();
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("handler", mockHandler);
ch.writeInbound(firstByteBuf);
ch.writeInbound(secondByteBuf);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ch != null) {
ch.close().await(200, TimeUnit.MILLISECONDS);
}
}
TimeUnit.MILLISECONDS.sleep(100);
Assertions.assertEquals(2, telnetTelnet.get());
}
/**
* dubbo and dubbo request
*
* <p>
* First ByteBuf (firstDubboByteBuf):
* ++-------------------------------------------------+
* || dubbo(incomplete) |
* ++-------------------------------------------------+
* ||
* Magic Code
* <p>
*
* <p>
* Second ByteBuf (secondDubboByteBuf):
* +-------------------------++-----------------------+
* | dubbo(the remaining) || dubbo(complete) |
* +-------------------------++-----------------------+
* ||
* Magic Code
*
* @throws InterruptedException
*/
@Test
public void testDubboDubboDecoded() throws InterruptedException, IOException {
ByteBuf dubboByteBuf = createDubboByteBuf();
ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50);
ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes() - 50);
ByteBuf secondDubboByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, dubboByteBuf);
EmbeddedChannel ch = null;
try {
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
MockHandler mockHandler = new MockHandler(null,
new MultiMessageHandler(
new DecodeHandler(
new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
if (checkDubboDecoded(msg)) {
dubboDubbo.incrementAndGet();
}
return getDefaultFuture();
}
}))));
ch = new LocalEmbeddedChannel();
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("handler", mockHandler);
ch.writeInbound(firstDubboByteBuf);
ch.writeInbound(secondDubboByteBuf);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ch != null) {
ch.close().await(200, TimeUnit.MILLISECONDS);
}
}
TimeUnit.MILLISECONDS.sleep(100);
Assertions.assertEquals(2, dubboDubbo.get());
}
/**
* dubbo and telnet request
*
* <p>
* First ByteBuf:
* ++-------------------------------------------------+
* || dubbo(incomplete) |
* ++-------------------------------------------------+
* ||
* Magic Code
*
* <p>
* Second ByteBuf:
* +--------------------------------------------------+
* | dubbo(the remaining) | telnet(complete) |
* +--------------------------------------------------+
*
* @throws InterruptedException
*/
@Test
public void testDubboTelnetDecoded() throws InterruptedException, IOException {
ByteBuf dubboByteBuf = createDubboByteBuf();
ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50);
ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes());
ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("\r\n".getBytes());
ByteBuf secondByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, telnetByteBuf);
EmbeddedChannel ch = null;
try {
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler());
MockHandler mockHandler = new MockHandler((msg) -> {
if (checkTelnetDecoded(msg)) {
dubboTelnet.incrementAndGet();
}
},
new MultiMessageHandler(
new DecodeHandler(
new HeaderExchangeHandler(new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
if (checkDubboDecoded(msg)) {
dubboTelnet.incrementAndGet();
}
return getDefaultFuture();
}
}))));
ch = new LocalEmbeddedChannel();
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("handler", mockHandler);
ch.writeInbound(firstDubboByteBuf);
ch.writeInbound(secondByteBuf);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ch != null) {
ch.close().await(200, TimeUnit.MILLISECONDS);
}
}
TimeUnit.MILLISECONDS.sleep(100);
Assertions.assertEquals(2, dubboTelnet.get());
}
private ByteBuf createDubboByteBuf() throws IOException {
Request request = new Request();
RpcInvocation rpcInvocation = new RpcInvocation();
rpcInvocation.setMethodName("sayHello");
rpcInvocation.setParameterTypes(new Class[]{String.class});
rpcInvocation.setParameterTypesDesc(ReflectUtils.getDesc(new Class[]{String.class}));
rpcInvocation.setArguments(new String[]{"dubbo"});
rpcInvocation.setAttachment("path", DemoService.class.getName());
rpcInvocation.setAttachment("interface", DemoService.class.getName());
rpcInvocation.setAttachment("version", "0.0.0");
request.setData(rpcInvocation);
request.setVersion("2.0.2");
ByteBuf dubboByteBuf = Unpooled.buffer();
ChannelBuffer buffer = new NettyBackedChannelBuffer(dubboByteBuf);
DubboCodec dubboCodec = new DubboCodec();
dubboCodec.encode(new MockChannel(), buffer, request);
return dubboByteBuf;
}
private static boolean checkTelnetDecoded(Object msg) {
if (msg instanceof String && !msg.toString().contains("Unsupported command:")) {
return true;
}
return false;
}
private static boolean checkDubboDecoded(Object msg) {
if (msg instanceof DecodeableRpcInvocation) {
DecodeableRpcInvocation invocation = (DecodeableRpcInvocation) msg;
if ("sayHello".equals(invocation.getMethodName())
&& invocation.getParameterTypes().length == 1
&& String.class.equals(invocation.getParameterTypes()[0])
&& invocation.getArguments().length == 1
&& "dubbo".equals(invocation.getArguments()[0])
&& DemoService.class.getName().equals(invocation.getAttachment("path"))
&& DemoService.class.getName().equals(invocation.getAttachment("interface"))
&& "0.0.0".equals(invocation.getAttachment("version"))) {
return true;
}
}
return false;
}
private static CompletableFuture<Object> getDefaultFuture() {
CompletableFuture<Object> future = new CompletableFuture<>();
AppResponse result = new AppResponse();
result.setValue("default result");
future.complete(result);
return future;
}
}