blob: 3e6de20afeda093f8a5d01fe26cd79c7879eb562 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.network.netty;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;
import org.mockito.Mockito;
import com.baidu.hugegraph.computer.core.network.MockUnDecodeMessage;
import com.baidu.hugegraph.computer.core.network.buffer.NettyBuffer;
import com.baidu.hugegraph.computer.core.network.buffer.NetworkBuffer;
import com.baidu.hugegraph.computer.core.network.buffer.NioBuffer;
import com.baidu.hugegraph.computer.core.network.message.DataMessage;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.network.message.StartMessage;
import com.baidu.hugegraph.computer.core.network.netty.codec.FrameDecoder;
import com.baidu.hugegraph.computer.core.util.StringEncoding;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.testutil.Assert;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
public class NettyEncodeDecodeHandlerTest extends AbstractNetworkTest {
@Override
protected void initOption() {
}
@Test
public void testSendMsgWithEncoderExceptionMock() throws IOException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
client.startSession();
int requestId = 1;
int partition = 1;
byte[] bytes = StringEncoding.encode("mock msg");
ByteBuffer buffer = ByteBuffer.wrap(bytes);
NetworkBuffer body = new NioBuffer(buffer);
DataMessage dataMessage = new DataMessage(null, requestId,
partition, body);
ChannelFutureListenerOnWrite listener =
new ChannelFutureListenerOnWrite(clientHandler);
ChannelFutureListenerOnWrite spyListener = Mockito.spy(listener);
client.channel().writeAndFlush(dataMessage)
.addListener(spyListener);
Mockito.verify(clientHandler, Mockito.timeout(3000L).times(1))
.onChannelActive(Mockito.any());
Mockito.verify(clientHandler, Mockito.timeout(3000L).times(1))
.exceptionCaught(Mockito.any(), Mockito.any());
Mockito.verify(spyListener, Mockito.timeout(3000L).times(1))
.onFailure(Mockito.any(), Mockito.any());
}
@Test
public void testSendMsgWithDecodeException() throws IOException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
client.channel().writeAndFlush(new MockUnDecodeMessage());
Mockito.verify(serverHandler, Mockito.timeout(2000L).times(1))
.onChannelActive(Mockito.any());
Mockito.verify(serverHandler, Mockito.timeout(2000L).times(1))
.exceptionCaught(Mockito.any(), Mockito.any());
}
@Test
public void testSendMsgWithFrameDecode() {
FrameDecoder frameDecoder = new FrameDecoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel(frameDecoder);
NetworkBuffer buffer = new NettyBuffer(Unpooled.buffer());
ByteBuf buf = buffer.nettyByteBuf();
StartMessage.INSTANCE.encode(buf);
boolean writeInbound = embeddedChannel.writeInbound(buf);
Assert.assertTrue(writeInbound);
Assert.assertTrue(embeddedChannel.finish());
buffer.release();
}
@Test
public void testSendMsgWithFrameDecodeMagicError() {
FrameDecoder frameDecoder = new FrameDecoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel(frameDecoder);
NetworkBuffer buffer = new NettyBuffer(Unpooled.buffer());
short magicError = 10;
ByteBuf buf = buffer.nettyByteBuf();
StartMessage.INSTANCE.encode(buf);
buf.setShort(0, magicError);
embeddedChannel.writeInbound(buf);
Assert.assertFalse(embeddedChannel.finish());
Assert.assertNull(embeddedChannel.readInbound());
}
@Test
public void testSendMsgWithFrameDecodeVersionError() {
FrameDecoder frameDecoder = new FrameDecoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel(frameDecoder);
NetworkBuffer buffer = new NettyBuffer(Unpooled.buffer());
byte versionError = 10;
ByteBuf buf = buffer.nettyByteBuf();
StartMessage.INSTANCE.encode(buf);
buf.setByte(2, versionError);
embeddedChannel.writeInbound(buf);
Assert.assertFalse(embeddedChannel.finish());
Assert.assertNull(embeddedChannel.readInbound());
}
@Test
public void testClientDecodeException() throws IOException {
Mockito.doAnswer(invocationOnMock -> {
invocationOnMock.callRealMethod();
UnitTestBase.sleep(200L);
Channel channel = invocationOnMock.getArgument(0);
channel.writeAndFlush(new MockUnDecodeMessage());
return null;
}).when(serverProtocol).initializeServerPipeline(Mockito.any(),
Mockito.any());
this.oneClient();
Mockito.verify(clientHandler, Mockito.timeout(15_000L).times(1))
.exceptionCaught(Mockito.any(), Mockito.any());
}
@Test
public void testMessageRelease() {
int requestId = 99;
int partition = 1;
byte[] bytes = StringEncoding.encode("mock msg");
ByteBuf buf = Unpooled.directBuffer().writeBytes(bytes);
try {
NettyBuffer managedBuffer = new NettyBuffer(buf);
DataMessage dataMessage = new DataMessage(MessageType.MSG,
requestId, partition,
managedBuffer);
Assert.assertEquals("DataMessage[messageType=MSG," +
"sequenceNumber=99,partition=1,hasBody=true," +
"bodyLength=8]", dataMessage.toString());
Assert.assertEquals(1, managedBuffer.referenceCount());
dataMessage.release();
Assert.assertEquals(0, managedBuffer.referenceCount());
} finally {
if (buf.refCnt() > 0) {
buf.release();
}
}
}
}