| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.remoting.codec; |
| |
| import org.apache.dubbo.common.Version; |
| import org.apache.dubbo.common.extension.ExtensionLoader; |
| import org.apache.dubbo.common.io.Bytes; |
| import org.apache.dubbo.common.io.UnsafeByteArrayOutputStream; |
| import org.apache.dubbo.common.serialize.ObjectOutput; |
| import org.apache.dubbo.common.serialize.Serialization; |
| import org.apache.dubbo.common.serialize.support.DefaultSerializationSelector; |
| import org.apache.dubbo.remoting.Channel; |
| import org.apache.dubbo.remoting.Constants; |
| import org.apache.dubbo.remoting.buffer.ChannelBuffer; |
| import org.apache.dubbo.remoting.buffer.ChannelBuffers; |
| import org.apache.dubbo.remoting.exchange.Request; |
| import org.apache.dubbo.remoting.exchange.Response; |
| import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec; |
| import org.apache.dubbo.remoting.exchange.support.DefaultFuture; |
| import org.apache.dubbo.remoting.telnet.codec.TelnetCodec; |
| import org.apache.dubbo.rpc.model.FrameworkModel; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.junit.jupiter.api.Assertions; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Disabled; |
| import org.junit.jupiter.api.Test; |
| import org.mockito.Mockito; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.READONLY_EVENT; |
| |
| /** |
| * |
| * byte 16 |
| * 0-1 magic code |
| * 2 flag |
| * 8 - 1-request/0-response |
| * 7 - two way |
| * 6 - heartbeat |
| * 1-5 serialization id |
| * 3 status |
| * 20 ok |
| * 90 error? |
| * 4-11 id (long) |
| * 12 -15 datalength |
| */ |
| class ExchangeCodecTest extends TelnetCodecTest { |
| // magic header. |
| private static final short MAGIC = (short) 0xdabb; |
| private static final byte MAGIC_HIGH = (byte) Bytes.short2bytes(MAGIC)[0]; |
| private static final byte MAGIC_LOW = (byte) Bytes.short2bytes(MAGIC)[1]; |
| Serialization serialization = getSerialization(DefaultSerializationSelector.getDefaultRemotingSerialization()); |
| private static final byte SERIALIZATION_BYTE = FrameworkModel.defaultModel() |
| .getExtension(Serialization.class, DefaultSerializationSelector.getDefaultRemotingSerialization()) |
| .getContentTypeId(); |
| |
| private static Serialization getSerialization(String name) { |
| Serialization serialization = |
| ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name); |
| return serialization; |
| } |
| |
| private Object decode(byte[] request) throws IOException { |
| ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(request); |
| AbstractMockChannel channel = getServerSideChannel(url); |
| // decode |
| Object obj = codec.decode(channel, buffer); |
| return obj; |
| } |
| |
| private byte[] getRequestBytes(Object obj, byte[] header) throws IOException { |
| // encode request data. |
| UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream(1024); |
| ObjectOutput out = serialization.serialize(url, bos); |
| out.writeObject(obj); |
| |
| out.flushBuffer(); |
| bos.flush(); |
| bos.close(); |
| byte[] data = bos.toByteArray(); |
| byte[] len = Bytes.int2bytes(data.length); |
| System.arraycopy(len, 0, header, 12, 4); |
| byte[] request = join(header, data); |
| return request; |
| } |
| |
| private byte[] getReadonlyEventRequestBytes(Object obj, byte[] header) throws IOException { |
| // encode request data. |
| UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream(1024); |
| ObjectOutput out = serialization.serialize(url, bos); |
| out.writeObject(obj); |
| |
| out.flushBuffer(); |
| bos.flush(); |
| bos.close(); |
| byte[] data = bos.toByteArray(); |
| // byte[] len = Bytes.int2bytes(data.length); |
| System.arraycopy(data, 0, header, 12, data.length); |
| byte[] request = join(header, data); |
| return request; |
| } |
| |
| private byte[] assemblyDataProtocol(byte[] header) { |
| Person request = new Person(); |
| byte[] newbuf = join(header, objectToByte(request)); |
| return newbuf; |
| } |
| // =================================================================================== |
| |
| @BeforeEach |
| public void setUp() throws Exception { |
| codec = new ExchangeCodec(); |
| } |
| |
| @Test |
| void test_Decode_Error_MagicNum() throws IOException { |
| HashMap<byte[], Object> inputBytes = new HashMap<byte[], Object>(); |
| inputBytes.put(new byte[] {0}, TelnetCodec.DecodeResult.NEED_MORE_INPUT); |
| inputBytes.put(new byte[] {MAGIC_HIGH, 0}, TelnetCodec.DecodeResult.NEED_MORE_INPUT); |
| inputBytes.put(new byte[] {0, MAGIC_LOW}, TelnetCodec.DecodeResult.NEED_MORE_INPUT); |
| |
| for (Map.Entry<byte[], Object> entry : inputBytes.entrySet()) { |
| testDecode_assertEquals(assemblyDataProtocol(entry.getKey()), entry.getValue()); |
| } |
| } |
| |
| @Test |
| void test_Decode_Error_Length() throws IOException { |
| DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); |
| |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| |
| Channel channel = getServerSideChannel(url); |
| byte[] baddata = new byte[] {1, 2}; |
| ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(join(request, baddata)); |
| Response obj = (Response) codec.decode(channel, buffer); |
| Assertions.assertEquals(person, obj.getResult()); |
| // only decode necessary bytes |
| Assertions.assertEquals(request.length, buffer.readerIndex()); |
| |
| future.cancel(); |
| } |
| |
| @Test |
| void test_Decode_Error_Response_Object() throws IOException { |
| // 00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| // bad object |
| byte[] badbytes = new byte[] {-1, -2, -3, -4, -3, -4, -3, -4, -3, -4, -3, -4}; |
| System.arraycopy(badbytes, 0, request, 21, badbytes.length); |
| |
| Response obj = (Response) decode(request); |
| Assertions.assertEquals(90, obj.getStatus()); |
| } |
| |
| @Test |
| void testInvalidSerializaitonId() throws Exception { |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, (byte) 0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| Object obj = decode(header); |
| Assertions.assertTrue(obj instanceof Request); |
| Request request = (Request) obj; |
| Assertions.assertTrue(request.isBroken()); |
| Assertions.assertTrue(request.getData() instanceof IOException); |
| header = new byte[] {MAGIC_HIGH, MAGIC_LOW, (byte) 0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| |
| obj = decode(header); |
| Assertions.assertTrue(obj instanceof Response); |
| Response response = (Response) obj; |
| Assertions.assertEquals(response.getStatus(), Response.CLIENT_ERROR); |
| Assertions.assertTrue(response.getErrorMessage().contains("IOException")); |
| } |
| |
| @Test |
| void test_Decode_Check_Payload() throws IOException { |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; |
| byte[] request = assemblyDataProtocol(header); |
| |
| try { |
| Channel channel = getServerSideChannel(url); |
| ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(request); |
| Object obj = codec.decode(channel, buffer); |
| |
| Assertions.assertTrue(obj instanceof Response); |
| Assertions.assertTrue(((Response) obj) |
| .getErrorMessage() |
| .startsWith("Data length too large: " + Bytes.bytes2int(new byte[] {1, 1, 1, 1}))); |
| } catch (IOException expected) { |
| Assertions.assertTrue(expected.getMessage() |
| .startsWith("Data length too large: " + Bytes.bytes2int(new byte[] {1, 1, 1, 1}))); |
| } |
| } |
| |
| @Test |
| void test_Decode_Header_Need_Readmore() throws IOException { |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| testDecode_assertEquals(header, TelnetCodec.DecodeResult.NEED_MORE_INPUT); |
| } |
| |
| @Test |
| void test_Decode_Body_Need_Readmore() throws IOException { |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 'a', 'a'}; |
| testDecode_assertEquals(header, TelnetCodec.DecodeResult.NEED_MORE_INPUT); |
| } |
| |
| @Test |
| void test_Decode_MigicCodec_Contain_ExchangeHeader() throws IOException { |
| byte[] header = new byte[] {0, 0, MAGIC_HIGH, MAGIC_LOW, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| |
| Channel channel = getServerSideChannel(url); |
| ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(header); |
| Object obj = codec.decode(channel, buffer); |
| Assertions.assertEquals(TelnetCodec.DecodeResult.NEED_MORE_INPUT, obj); |
| // If the telnet data and request data are in the same data packet, we should guarantee that the receipt of |
| // request data won't be affected by the factor that telnet does not have an end characters. |
| Assertions.assertEquals(2, buffer.readerIndex()); |
| } |
| |
| @Test |
| void test_Decode_Return_Response_Person() throws IOException { |
| DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); |
| |
| // 00000010-response/oneway/hearbeat=false/hessian |20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| |
| Response obj = (Response) decode(request); |
| Assertions.assertEquals(20, obj.getStatus()); |
| Assertions.assertEquals(person, obj.getResult()); |
| System.out.println(obj); |
| |
| future.cancel(); |
| } |
| |
| @Test // The status input has a problem, and the read information is wrong when the serialization is serialized. |
| public void test_Decode_Return_Response_Error() throws IOException { |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| String errorString = "encode request data error "; |
| byte[] request = getRequestBytes(errorString, header); |
| Response obj = (Response) decode(request); |
| Assertions.assertEquals(90, obj.getStatus()); |
| Assertions.assertEquals(errorString, obj.getErrorMessage()); |
| } |
| |
| @Test |
| @Disabled("Event should not be object.") |
| void test_Decode_Return_Request_Event_Object() throws IOException { |
| // |10011111|20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] { |
| MAGIC_HIGH, MAGIC_LOW, (byte) (SERIALIZATION_BYTE | (byte) 0xe0), 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 |
| }; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| |
| System.setProperty("deserialization.event.size", "100"); |
| Request obj = (Request) decode(request); |
| Assertions.assertEquals(person, obj.getData()); |
| Assertions.assertTrue(obj.isTwoWay()); |
| Assertions.assertTrue(obj.isEvent()); |
| Assertions.assertEquals(Version.getProtocolVersion(), obj.getVersion()); |
| System.out.println(obj); |
| System.clearProperty("deserialization.event.size"); |
| } |
| |
| @Test |
| void test_Decode_Return_Request_Event_String() throws IOException { |
| // |10011111|20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] { |
| MAGIC_HIGH, MAGIC_LOW, (byte) (SERIALIZATION_BYTE | (byte) 0xe0), 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 |
| }; |
| String event = READONLY_EVENT; |
| byte[] request = getRequestBytes(event, header); |
| |
| Request obj = (Request) decode(request); |
| Assertions.assertEquals(event, obj.getData()); |
| Assertions.assertTrue(obj.isTwoWay()); |
| Assertions.assertTrue(obj.isEvent()); |
| Assertions.assertEquals(Version.getProtocolVersion(), obj.getVersion()); |
| System.out.println(obj); |
| } |
| |
| @Test |
| void test_Decode_Return_Request_Heartbeat_Object() throws IOException { |
| // |10011111|20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] { |
| MAGIC_HIGH, MAGIC_LOW, (byte) (SERIALIZATION_BYTE | (byte) 0xe0), 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 |
| }; |
| byte[] request = getRequestBytes(null, header); |
| Request obj = (Request) decode(request); |
| Assertions.assertNull(obj.getData()); |
| Assertions.assertTrue(obj.isTwoWay()); |
| Assertions.assertTrue(obj.isHeartbeat()); |
| Assertions.assertEquals(Version.getProtocolVersion(), obj.getVersion()); |
| System.out.println(obj); |
| } |
| |
| @Test |
| @Disabled("Event should not be object.") |
| void test_Decode_Return_Request_Object() throws IOException { |
| // |10011111|20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] { |
| MAGIC_HIGH, MAGIC_LOW, (byte) (SERIALIZATION_BYTE | (byte) 0xe0), 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 |
| }; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| |
| System.setProperty("deserialization.event.size", "100"); |
| Request obj = (Request) decode(request); |
| Assertions.assertEquals(person, obj.getData()); |
| Assertions.assertTrue(obj.isTwoWay()); |
| Assertions.assertFalse(obj.isHeartbeat()); |
| Assertions.assertEquals(Version.getProtocolVersion(), obj.getVersion()); |
| System.out.println(obj); |
| System.clearProperty("deserialization.event.size"); |
| } |
| |
| @Test |
| void test_Decode_Error_Request_Object() throws IOException { |
| // 00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] { |
| MAGIC_HIGH, MAGIC_LOW, (byte) (SERIALIZATION_BYTE | (byte) 0xe0), 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 |
| }; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| // bad object |
| byte[] badbytes = new byte[] {-1, -2, -3, -4, -3, -4, -3, -4, -3, -4, -3, -4}; |
| System.arraycopy(badbytes, 0, request, 21, badbytes.length); |
| |
| Request obj = (Request) decode(request); |
| Assertions.assertTrue(obj.isBroken()); |
| Assertions.assertTrue(obj.getData() instanceof Throwable); |
| } |
| |
| @Test |
| void test_Header_Response_NoSerializationFlag() throws IOException { |
| DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); |
| |
| // 00000010-response/oneway/hearbeat=false/noset |20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| |
| Response obj = (Response) decode(request); |
| Assertions.assertEquals(20, obj.getStatus()); |
| Assertions.assertEquals(person, obj.getResult()); |
| System.out.println(obj); |
| |
| future.cancel(); |
| } |
| |
| @Test |
| void test_Header_Response_Heartbeat() throws IOException { |
| DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); |
| |
| // 00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0 |
| byte[] header = new byte[] {MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; |
| Person person = new Person(); |
| byte[] request = getRequestBytes(person, header); |
| |
| Response obj = (Response) decode(request); |
| Assertions.assertEquals(20, obj.getStatus()); |
| Assertions.assertEquals(person, obj.getResult()); |
| System.out.println(obj); |
| |
| future.cancel(); |
| } |
| |
| @Test |
| void test_Encode_Request() throws IOException { |
| ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(2014); |
| Channel channel = getClientSideChannel(url); |
| Request request = new Request(); |
| Person person = new Person(); |
| request.setData(person); |
| |
| codec.encode(channel, encodeBuffer, request); |
| |
| // encode resault check need decode |
| byte[] data = new byte[encodeBuffer.writerIndex()]; |
| encodeBuffer.readBytes(data); |
| ChannelBuffer decodeBuffer = ChannelBuffers.wrappedBuffer(data); |
| Request obj = (Request) codec.decode(channel, decodeBuffer); |
| Assertions.assertEquals(request.isBroken(), obj.isBroken()); |
| Assertions.assertEquals(request.isHeartbeat(), obj.isHeartbeat()); |
| Assertions.assertEquals(request.isTwoWay(), obj.isTwoWay()); |
| Assertions.assertEquals(person, obj.getData()); |
| } |
| |
| @Test |
| @Disabled("Event should not be object.") |
| void test_Encode_Response() throws IOException { |
| DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000, null); |
| |
| ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024); |
| Channel channel = getClientSideChannel(url); |
| Response response = new Response(); |
| response.setHeartbeat(true); |
| response.setId(1001L); |
| response.setStatus((byte) 20); |
| response.setVersion("11"); |
| Person person = new Person(); |
| response.setResult(person); |
| |
| codec.encode(channel, encodeBuffer, response); |
| byte[] data = new byte[encodeBuffer.writerIndex()]; |
| encodeBuffer.readBytes(data); |
| |
| // encode resault check need decode |
| ChannelBuffer decodeBuffer = ChannelBuffers.wrappedBuffer(data); |
| Response obj = (Response) codec.decode(channel, decodeBuffer); |
| |
| Assertions.assertEquals(response.getId(), obj.getId()); |
| Assertions.assertEquals(response.getStatus(), obj.getStatus()); |
| Assertions.assertEquals(response.isHeartbeat(), obj.isHeartbeat()); |
| Assertions.assertEquals(person, obj.getResult()); |
| // encode response version ?? |
| // Assertions.assertEquals(response.getProtocolVersion(), obj.getVersion()); |
| |
| future.cancel(); |
| } |
| |
| @Test |
| void test_Encode_Error_Response() throws IOException { |
| ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024); |
| Channel channel = getClientSideChannel(url); |
| Response response = new Response(); |
| response.setHeartbeat(true); |
| response.setId(1001L); |
| response.setStatus((byte) 10); |
| response.setVersion("11"); |
| String badString = "bad"; |
| response.setErrorMessage(badString); |
| Person person = new Person(); |
| response.setResult(person); |
| |
| codec.encode(channel, encodeBuffer, response); |
| byte[] data = new byte[encodeBuffer.writerIndex()]; |
| encodeBuffer.readBytes(data); |
| |
| // encode resault check need decode |
| ChannelBuffer decodeBuffer = ChannelBuffers.wrappedBuffer(data); |
| Response obj = (Response) codec.decode(channel, decodeBuffer); |
| Assertions.assertEquals(response.getId(), obj.getId()); |
| Assertions.assertEquals(response.getStatus(), obj.getStatus()); |
| Assertions.assertEquals(response.isHeartbeat(), obj.isHeartbeat()); |
| Assertions.assertEquals(badString, obj.getErrorMessage()); |
| Assertions.assertNull(obj.getResult()); |
| // Assertions.assertEquals(response.getProtocolVersion(), obj.getVersion()); |
| } |
| |
| @Test |
| void testMessageLengthGreaterThanMessageActualLength() throws Exception { |
| Channel channel = getClientSideChannel(url); |
| Request request = new Request(1L); |
| request.setVersion(Version.getProtocolVersion()); |
| Date date = new Date(); |
| request.setData(date); |
| ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024); |
| codec.encode(channel, encodeBuffer, request); |
| byte[] bytes = new byte[encodeBuffer.writerIndex()]; |
| encodeBuffer.readBytes(bytes); |
| int len = Bytes.bytes2int(bytes, 12); |
| ByteArrayOutputStream out = new ByteArrayOutputStream(1024); |
| out.write(bytes, 0, 12); |
| /* |
| * The fill length can not be less than 256, because by default, hessian reads 256 bytes from the stream each time. |
| * Refer Hessian2Input.readBuffer for more details |
| */ |
| int padding = 512; |
| out.write(Bytes.int2bytes(len + padding)); |
| out.write(bytes, 16, bytes.length - 16); |
| for (int i = 0; i < padding; i++) { |
| out.write(1); |
| } |
| out.write(bytes); |
| /* request|1111...|request */ |
| ChannelBuffer decodeBuffer = ChannelBuffers.wrappedBuffer(out.toByteArray()); |
| Request decodedRequest = (Request) codec.decode(channel, decodeBuffer); |
| Assertions.assertEquals(date, decodedRequest.getData()); |
| Assertions.assertEquals(bytes.length + padding, decodeBuffer.readerIndex()); |
| decodedRequest = (Request) codec.decode(channel, decodeBuffer); |
| Assertions.assertEquals(date, decodedRequest.getData()); |
| } |
| |
| @Test |
| void testMessageLengthExceedPayloadLimitWhenEncode() throws Exception { |
| Request request = new Request(1L); |
| request.setData("hello"); |
| ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(512); |
| AbstractMockChannel channel = getClientSideChannel(url.addParameter(Constants.PAYLOAD_KEY, 4)); |
| try { |
| codec.encode(channel, encodeBuffer, request); |
| Assertions.fail(); |
| } catch (IOException e) { |
| Assertions.assertTrue(e.getMessage().startsWith("Data length too large: ")); |
| Assertions.assertTrue(e.getMessage() |
| .contains("max payload: 4, channel: org.apache.dubbo.remoting.codec.AbstractMockChannel")); |
| } |
| |
| Response response = new Response(1L); |
| response.setResult("hello"); |
| encodeBuffer = ChannelBuffers.dynamicBuffer(512); |
| channel = getServerSideChannel(url.addParameter(Constants.PAYLOAD_KEY, 4)); |
| codec.encode(channel, encodeBuffer, response); |
| Assertions.assertTrue(channel.getReceivedMessage() instanceof Response); |
| Response receiveMessage = (Response) channel.getReceivedMessage(); |
| Assertions.assertEquals(Response.SERIALIZATION_ERROR, receiveMessage.getStatus()); |
| Assertions.assertTrue(receiveMessage.getErrorMessage().contains("Data length too large: ")); |
| } |
| } |