Polish the protocol/encoder/decoder etc.
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
index 2f6343c..8802148 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
@@ -22,7 +22,7 @@
 import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
 import org.apache.rocketmq.remoting.common.Pair;
 
-public interface RemotingService extends RemotingMarshaller, ConnectionService, ObjectLifecycle {
+public interface RemotingService extends ConnectionService, ObjectLifecycle {
     void registerInterceptor(Interceptor interceptor);
 
     void registerRequestProcessor(final String requestCode, final RequestProcessor processor,
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java
index f21a45d..8496b04 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java
@@ -17,31 +17,28 @@
 
 package org.apache.rocketmq.remoting.api.command;
 
-import java.lang.reflect.Type;
 import java.util.Map;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.common.TypePresentation;
 
 public interface RemotingCommand {
-    byte protocolType();
+    short cmdCode();
 
-    void protocolType(byte value);
+    void cmdCode(short code);
+
+    short cmdVersion();
+
+    void cmdVersion(short version);
 
     int requestID();
 
     void requestID(int value);
 
-    byte serializerType();
-
-    void serializerType(byte value);
-
     TrafficType trafficType();
 
     void trafficType(TrafficType value);
 
-    String opCode();
+    short opCode();
 
-    void opCode(String value);
+    void opCode(short value);
 
     String remark();
 
@@ -55,36 +52,7 @@
 
     void property(String key, String value);
 
-    Object parameter();
+    byte[] payload();
 
-    void parameter(Object value);
-
-    byte[] parameterBytes();
-
-    void parameterBytes(byte[] value);
-
-    byte[] extraPayload();
-
-    void extraPayload(byte[] value);
-
-    <T> T parameter(final SerializerFactory serializerFactory, Class<T> c);
-
-    <T> T parameter(final SerializerFactory serializerFactory, final TypePresentation<T> typePresentation);
-
-    <T> T parameter(final SerializerFactory serializerFactory, final Type type);
-
-    enum CommandFlag {
-        SUCCESS("0"),
-        ERROR("-1");
-
-        private String flag;
-
-        CommandFlag(final String flag) {
-            this.flag = flag;
-        }
-
-        public String flag() {
-            return flag;
-        }
-    }
+    void payload(byte[] payload);
 }
\ No newline at end of file
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java
similarity index 94%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java
rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java
index 4688c45..d5c378e 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.api.compressable;
+package org.apache.rocketmq.remoting.api.compression;
 
 public interface Compressor {
     String name();
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java
similarity index 94%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java
rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java
index 2494c78..4afd599 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.api.compressable;
+package org.apache.rocketmq.remoting.api.compression;
 
 public interface CompressorFactory {
     void register(Compressor compressor);
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
deleted file mode 100644
index 8ef8dcd..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.serializable;
-
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-
-public interface Serializer {
-    String name();
-
-    byte type();
-
-    <T> T decode(final byte[] content, final Class<T> c);
-
-    <T> T decode(final byte[] content, final TypePresentation<T> typePresentation);
-
-    <T> T decode(final byte[] content, final Type type);
-
-    ByteBuffer encode(final Object object);
-}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
deleted file mode 100644
index b47bf99..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.serializable;
-
-public interface SerializerFactory {
-    void register(Serializer serialization);
-
-    byte type(String serializationName);
-
-    Serializer get(byte type);
-
-    void clearAll();
-}
diff --git a/remoting-core/remoting-impl/pom.xml b/remoting-core/remoting-impl/pom.xml
index c8be150..604416b 100644
--- a/remoting-core/remoting-impl/pom.xml
+++ b/remoting-core/remoting-impl/pom.xml
@@ -25,6 +25,17 @@
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.msgpack</groupId>
+            <artifactId>msgpack</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
@@ -35,14 +46,6 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.esotericsoftware</groupId>
-            <artifactId>kryo</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.msgpack</groupId>
-            <artifactId>msgpack</artifactId>
-        </dependency>
-        <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-tcnative-boringssl-static</artifactId>
             <version>1.1.33.Fork26</version>
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index f7a4b67..3f5282c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -19,8 +19,7 @@
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
+import org.apache.rocketmq.remoting.impl.compression.GZipCompressor;
 
 public class RemotingConfig extends TcpSocketConfig {
     private int connectionMaxRetries = 3;
@@ -37,7 +36,6 @@
     private int threadTaskLowWaterMark = 30000;
     private int threadTaskHighWaterMark = 50000;
     private int connectionRetryBackoffMillis = 3000;
-    private String serializerName = JsonSerializer.SERIALIZER_NAME;
     private String compressorName = GZipCompressor.COMPRESSOR_NAME;
     private int serviceThreadBlockQueueSize = 50000;
     private boolean clientNativeEpollEnable = false;
@@ -147,14 +145,6 @@
         this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
     }
 
-    public String getSerializerName() {
-        return serializerName;
-    }
-
-    public void setSerializerName(final String serializerName) {
-        this.serializerName = serializerName;
-    }
-
     public String getCompressorName() {
         return compressorName;
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
index 44d4fd9..bfc536b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -17,36 +17,43 @@
 
 package org.apache.rocketmq.remoting.impl.command;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Map.Entry;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
 
 public class CodecHelper {
-    //ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength
-    public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4;
-    public final static char PROPERTY_SEPARATOR = '\n';
-    public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
+    // ProtocolMagic(1) + TotalLength(4) + CmdCode(2) + CmdVersion(2) + RequestID(4) + TrafficType(1) + OpCode(2)
+    // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4);
+    public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4;
+    private final static char PROPERTY_SEPARATOR = '\n';
+    private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
 
-    public final static int CODE_MAX_LEN = 512;
-    public final static int PARAMETER_MAX_LEN = 33554432;
-    public final static int BODY_MAX_LEN = 33554432;
-    public final static int PACKET_MAX_LEN = 33554432;
+    public final static byte PROTOCOL_MAGIC = 0x14;
+    private final static int REMARK_MAX_LEN = Short.MAX_VALUE;
+    private final static int PROPERTY_MAX_LEN = 524288; // 512KB
+    private final static int PAYLOAD_MAX_LEN = 16777216; // 16MB
+    public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN;
 
-    public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength,
-        final int extraPayload) {
-        byte[] code = command.opCode().getBytes(REMOTING_CHARSET);
-        int codeLength = code.length;
+    public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) {
+        out.writeByte(PROTOCOL_MAGIC);
 
-        byte[] remark = command.remark().getBytes(REMOTING_CHARSET);
-        int remarkLength = remark.length;
+        short remarkLen = 0;
+        byte [] remark = null;
+        if (command.remark() != null) {
+            remark = command.remark().getBytes(REMOTING_CHARSET);
+            if (remark.length > REMARK_MAX_LEN) {
+                throw new RemoteCodecException(String.format("Remark len: %d over max limit: %d", remark.length, REMARK_MAX_LEN));
+            }
+            remarkLen = (short) remark.length;
+        }
 
         byte[][] props = null;
-        int propsLength = 0;
+        int propsLen = 0;
         StringBuilder sb = new StringBuilder();
-        if (!command.properties().isEmpty()) {
+        if (command.properties() != null && !command.properties().isEmpty()) {
             props = new byte[command.properties().size()][];
             int i = 0;
             for (Entry<String, String> next : command.properties().entrySet()) {
@@ -57,122 +64,110 @@
 
                 props[i] = sb.toString().getBytes(REMOTING_CHARSET);
 
-                propsLength += 2;
-                propsLength += props[i].length;
+                if (props[i].length > Short.MAX_VALUE) {
+                    throw new RemoteCodecException(String.format("Property KV len: %d over max limit: %d", props[i].length, Short.MAX_VALUE));
+                }
+
+                propsLen += 2;
+                propsLen += props[i].length;
                 i++;
             }
         }
 
-        int totalLength = MIN_PROTOCOL_LEN - 1 - 4
-            + codeLength
-            + remarkLength
-            + propsLength
-            + parameterLength
-            + extraPayload;
-
-        int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload;
-
-        ByteBuffer buf = ByteBuffer.allocate(headerLength);
-        buf.put(command.protocolType());
-        buf.putInt(totalLength);
-        buf.putInt(command.requestID());
-        buf.put(command.serializerType());
-        buf.put((byte) command.trafficType().ordinal());
-
-        buf.putShort((short) codeLength);
-        if (codeLength > 0) {
-            buf.put(code);
+        if (propsLen > PROPERTY_MAX_LEN) {
+            throw new RemoteCodecException(String.format("Properties total len: %d over max limit: %d", propsLen, PROPERTY_MAX_LEN));
         }
-        buf.putShort((short) remarkLength);
-        if (remarkLength > 0) {
-            buf.put(remark);
+
+        int payloadLen = command.payload() == null ? 0 : command.payload().length;
+
+        if (payloadLen > PAYLOAD_MAX_LEN) {
+            throw new RemoteCodecException(String.format("Payload len: %d over max limit: %d", payloadLen, PAYLOAD_MAX_LEN));
         }
-        if (props != null) {
-            buf.putShort((short) props.length);
+
+        int totalLength = MIN_PROTOCOL_LEN
+            + remarkLen
+            + propsLen
+            + payloadLen;
+
+        out.writeInt(totalLength);
+        out.writeShort(command.cmdCode());
+        out.writeShort(command.cmdVersion());
+        out.writeInt(command.requestID());
+        out.writeByte((byte) command.trafficType().ordinal());
+        out.writeShort(command.opCode());
+
+        out.writeShort(remarkLen);
+        if (remarkLen != 0) {
+            out.writeBytes(remark);
+        }
+
+        if (propsLen != 0) {
+            out.writeShort((short) props.length);
             for (byte[] prop : props) {
-                buf.putShort((short) prop.length);
-                buf.put(prop);
+                out.writeShort((short) prop.length);
+                out.writeBytes(prop);
             }
-        } else {
-            buf.putShort((short) 0);
         }
 
-        buf.putInt(parameterLength);
-
-        buf.flip();
-
-        return buf;
+        out.writeInt(payloadLen);
+        if (payloadLen != 0) {
+            out.writeBytes(command.payload());
+        }
     }
 
-    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+    public static RemotingCommand decode(final ByteBufferWrapper in) {
         RemotingCommandImpl cmd = new RemotingCommandImpl();
-        int totalLength = byteBuffer.limit();
-        cmd.requestID(byteBuffer.getInt());
-        cmd.serializerType(byteBuffer.get());
-        cmd.trafficType(TrafficType.parse(byteBuffer.get()));
 
-        {
-            short size = byteBuffer.getShort();
-            if (size > 0 && size <= CODE_MAX_LEN) {
-                byte[] bytes = new byte[size];
-                byteBuffer.get(bytes);
-                String str = new String(bytes, REMOTING_CHARSET);
-                cmd.opCode(str);
-            } else {
-                throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN));
-            }
+        cmd.cmdCode(in.readShort());
+        cmd.cmdVersion(in.readShort());
+        cmd.requestID(in.readInt());
+        cmd.trafficType(TrafficType.parse(in.readByte()));
+        cmd.opCode(in.readShort());
+
+
+        short remarkLen = in.readShort();
+        if (remarkLen > 0) {
+            byte[] bytes = new byte[remarkLen];
+            in.readBytes(bytes);
+            String str = new String(bytes, REMOTING_CHARSET);
+            cmd.remark(str);
         }
 
-        {
-            short size = byteBuffer.getShort();
-            if (size > 0) {
-                byte[] bytes = new byte[size];
-                byteBuffer.get(bytes);
-                String str = new String(bytes, REMOTING_CHARSET);
-                cmd.remark(str);
-            }
-        }
-
-        {
-            short size = byteBuffer.getShort();
-            if (size > 0) {
-                for (int i = 0; i < size; i++) {
-                    short length = byteBuffer.getShort();
-                    if (length > 0) {
-                        byte[] bytes = new byte[length];
-                        byteBuffer.get(bytes);
-                        String str = new String(bytes, REMOTING_CHARSET);
-                        int index = str.indexOf(PROPERTY_SEPARATOR);
-                        if (index > 0) {
-                            String key = str.substring(0, index);
-                            String value = str.substring(index + 1);
-                            cmd.property(key, value);
-                        }
+        short propsSize = in.readShort();
+        int propsLen = 0;
+        if (propsSize > 0) {
+            for (int i = 0; i < propsSize; i++) {
+                short length = in.readShort();
+                if (length > 0) {
+                    byte[] bytes = new byte[length];
+                    in.readBytes(bytes);
+                    String str = new String(bytes, REMOTING_CHARSET);
+                    int index = str.indexOf(PROPERTY_SEPARATOR);
+                    if (index > 0) {
+                        String key = str.substring(0, index);
+                        String value = str.substring(index + 1);
+                        cmd.property(key, value);
                     }
                 }
+
+                propsLen += 2;
+                propsLen += length;
+                if (propsLen > PROPERTY_MAX_LEN) {
+                    throw new RemoteCodecException(String.format("Properties total len: %d over max limit: %d", propsLen, PROPERTY_MAX_LEN));
+                }
             }
         }
 
-        {
-            int size = byteBuffer.getInt();
-            if (size > 0 && size <= PARAMETER_MAX_LEN) {
-                byte[] bytes = new byte[size];
-                byteBuffer.get(bytes);
-                cmd.parameterBytes(bytes);
-            } else if (size != 0) {
-                throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN));
-            }
+        int payloadLen = in.readInt();
+
+        if (payloadLen > PAYLOAD_MAX_LEN) {
+            throw new RemoteCodecException(String.format("Payload len: %d over max limit: %d", payloadLen, PAYLOAD_MAX_LEN));
         }
 
-        {
-            int size = totalLength - byteBuffer.position();
-            if (size > 0 && size <= BODY_MAX_LEN) {
-                byte[] bytes = new byte[size];
-                byteBuffer.get(bytes);
-                cmd.extraPayload(bytes);
-            } else if (size != 0) {
-                throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN));
-            }
+        if (payloadLen > 0) {
+            byte[] bytes = new byte[payloadLen];
+            in.readBytes(bytes);
+            cmd.payload(bytes);
         }
 
         return cmd;
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
index 6e1efaa..adbb42c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
@@ -20,37 +20,23 @@
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
 
 public class RemotingCommandFactoryImpl implements RemotingCommandFactory {
-    private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
-    private byte serializeType = JsonSerializer.SERIALIZER_TYPE;
-
-    private byte PROTOCOL_MAGIC = 0x14;
-
     public RemotingCommandFactoryImpl() {
     }
 
-    public RemotingCommandFactoryImpl(final String serializeName) {
-        this.serializeType = serializerFactory.type(serializeName);
-    }
-
     @Override
     public RemotingCommand createRequest() {
         RemotingCommand request = new RemotingCommandImpl();
-        request.protocolType(this.PROTOCOL_MAGIC);
-        request.serializerType(this.serializeType);
         return request;
     }
 
     @Override
-    public RemotingCommand createResponse(final RemotingCommand command) {
+    public RemotingCommand createResponse(final RemotingCommand request) {
         RemotingCommand response = new RemotingCommandImpl();
-        response.requestID(command.requestID());
-        response.protocolType(command.protocolType());
-        response.serializerType(command.serializerType());
+        response.cmdCode(request.cmdCode());
+        response.cmdVersion(request.cmdVersion());
+        response.requestID(request.requestID());
         response.trafficType(TrafficType.RESPONSE);
         return response;
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
index bcf2338..b405eda 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.remoting.impl.command;
 
-import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -26,39 +25,23 @@
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.common.TypePresentation;
 
 public class RemotingCommandImpl implements RemotingCommand {
     public final static RequestIdGenerator REQUEST_ID_GENERATOR = RequestIdGenerator.inst;
 
-    private byte protocolType;
-    private byte serializeType;
-
+    private short cmdCode;
+    private short cmdVersion;
     private volatile int requestId = REQUEST_ID_GENERATOR.incrementAndGet();
     private TrafficType trafficType = TrafficType.REQUEST_SYNC;
-    private String code = CommandFlag.SUCCESS.flag();
+    private short opCode = RemotingSysResponseCode.SUCCESS;
     private String remark = "";
-    private Map<String, String> properties = new HashMap<String, String>();
-    private Object parameter;
-    private byte[] extraPayload;
-
-    private byte[] parameterByte;
+    private Map<String, String> properties = new HashMap<>();
+    private byte[] payload;
 
     protected RemotingCommandImpl() {
     }
 
     @Override
-    public byte protocolType() {
-        return this.protocolType;
-    }
-
-    @Override
-    public void protocolType(byte value) {
-        this.protocolType = value;
-    }
-
-    @Override
     public int requestID() {
         return requestId;
     }
@@ -69,16 +52,6 @@
     }
 
     @Override
-    public byte serializerType() {
-        return this.serializeType;
-    }
-
-    @Override
-    public void serializerType(byte value) {
-        this.serializeType = value;
-    }
-
-    @Override
     public TrafficType trafficType() {
         return this.trafficType;
     }
@@ -89,13 +62,13 @@
     }
 
     @Override
-    public String opCode() {
-        return this.code;
+    public short opCode() {
+        return this.opCode;
     }
 
     @Override
-    public void opCode(String value) {
-        this.code = value;
+    public void opCode(short value) {
+        this.opCode = value;
     }
 
     @Override
@@ -129,68 +102,33 @@
     }
 
     @Override
-    public Object parameter() {
-        return this.parameter;
+    public short cmdCode() {
+        return this.cmdCode;
     }
 
     @Override
-    public void parameter(Object value) {
-        this.parameter = value;
+    public void cmdCode(short code) {
+        this.cmdCode = code;
     }
 
     @Override
-    public byte[] parameterBytes() {
-        return this.getParameterByte();
-    }
-
-    public byte[] getParameterByte() {
-        return parameterByte;
-    }
-
-    public void setParameterByte(byte[] parameterByte) {
-        this.parameterByte = parameterByte;
+    public short cmdVersion() {
+        return this.cmdVersion;
     }
 
     @Override
-    public void parameterBytes(byte[] value) {
-        this.setParameterByte(value);
+    public void cmdVersion(short version) {
+        this.cmdVersion = version;
     }
 
     @Override
-    public byte[] extraPayload() {
-        return this.extraPayload;
+    public byte[] payload() {
+        return this.payload;
     }
 
     @Override
-    public void extraPayload(byte[] value) {
-        this.extraPayload = value;
-    }
-
-    @Override
-    public <T> T parameter(SerializerFactory serializerFactory, Class<T> c) {
-        if (this.parameter() != null)
-            return (T) this.parameter();
-        final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), c);
-        this.parameter(decode);
-        return decode;
-    }
-
-    @Override
-    public <T> T parameter(SerializerFactory serializerFactory, TypePresentation<T> typePresentation) {
-        if (this.parameter() != null)
-            return (T) this.parameter();
-        final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), typePresentation);
-        this.parameter(decode);
-        return decode;
-    }
-
-    @Override
-    public <T> T parameter(SerializerFactory serializerFactory, Type type) {
-        if (this.parameter() != null)
-            return (T) this.parameter();
-        final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), type);
-        this.parameter(decode);
-        return decode;
+    public void payload(byte[] payload) {
+        this.payload = payload;
     }
 
     @Override
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
similarity index 69%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
index 62c9dda..ae76c6f 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
@@ -15,10 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.api;
+package org.apache.rocketmq.remoting.impl.command;
 
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+public class RemotingSysResponseCode {
 
-public interface RemotingMarshaller {
-    SerializerFactory serializerFactory();
+    public static final short SUCCESS = 0;
+
+    public static final short SYSTEM_ERROR = 1;
+
+    public static final short SYSTEM_BUSY = 2;
+
+    public static final short REQUEST_CODE_NOT_SUPPORTED = 3;
+
+    public static final short TRANSACTION_FAILED = 4;
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java
similarity index 91%
rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java
index 10e97ba..40576ad 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.impl.protocol.compression;
+package org.apache.rocketmq.remoting.impl.compression;
 
-import org.apache.rocketmq.remoting.api.compressable.Compressor;
-import org.apache.rocketmq.remoting.api.compressable.CompressorFactory;
+import org.apache.rocketmq.remoting.api.compression.Compressor;
+import org.apache.rocketmq.remoting.api.compression.CompressorFactory;
 
 public class CompressorFactoryImpl implements CompressorFactory {
     private static final int MAX_COUNT = 0x0FF;
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java
similarity index 95%
rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java
index fc33f4c..53dd4bf 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.impl.protocol.compression;
+package org.apache.rocketmq.remoting.impl.compression;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -23,7 +23,7 @@
 import java.io.OutputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
-import org.apache.rocketmq.remoting.api.compressable.Compressor;
+import org.apache.rocketmq.remoting.api.compression.Compressor;
 
 public class GZipCompressor implements Compressor {
     public static final int BUFFER = 1024;
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index a4c33e1..5026224 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -48,8 +48,6 @@
 import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
 import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
 import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
 import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.ResponseResult;
@@ -58,7 +56,7 @@
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
 import org.apache.rocketmq.remoting.internal.UIDGenerator;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
@@ -66,7 +64,6 @@
 
 public abstract class NettyRemotingAbstract implements RemotingService {
     protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
-    protected final SerializerFactory serializerFactory = new SerializerFactoryImpl();
     protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
     private final Semaphore semaphoreOneway;
     private final Semaphore semaphoreAsync;
@@ -87,11 +84,7 @@
         this.publicExecutor = ThreadUtils.newFixedThreadPool(
             clientConfig.getClientAsyncCallbackExecutorThreads(),
             10000, "Remoting-PublicExecutor", true);
-        this.remotingCommandFactory = new RemotingCommandFactoryImpl(clientConfig.getSerializerName());
-    }
-
-    public SerializerFactory getSerializerFactory() {
-        return serializerFactory;
+        this.remotingCommandFactory = new RemotingCommandFactoryImpl();
     }
 
     protected void putNettyEvent(final NettyChannelEvent event) {
@@ -158,7 +151,7 @@
                     extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
 
                 RemotingCommand response = remotingCommandFactory.createResponse(cmd);
-                response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
+                response.opCode(RemotingSysResponseCode.SYSTEM_BUSY);
                 response.remark("SYSTEM_BUSY");
                 writeAndFlush(ctx.channel(), response);
             }
@@ -194,19 +187,9 @@
         if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
             //FiXME Exception interceptor can not throw exception
             interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
-            RemotingCommand response = remotingCommandFactory.createResponse(cmd);
-            response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
-            response.remark(serializeException(cmd.serializerType(), e));
-            response.property("Exception", e.getClass().getName());
-            ctx.writeAndFlush(response);
         }
     }
 
-    private String serializeException(byte serializeType, Throwable exception) {
-        final Serializer serialization = getSerializerFactory().get(serializeType);
-        return serialization.encode(exception).toString();
-    }
-
     private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
         if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
             if (response != null) {
@@ -508,11 +491,6 @@
     }
 
     @Override
-    public SerializerFactory serializerFactory() {
-        return this.serializerFactory;
-    }
-
-    @Override
     public RemotingCommandFactory commandFactory() {
         return this.remotingCommandFactory;
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
index ec1d69d..f239ee9 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
@@ -22,7 +22,6 @@
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
-import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -55,13 +54,12 @@
     private Object decode(final ChannelHandlerContext ctx, ByteBufferWrapper wrapper) throws Exception {
         int originReaderIndex = wrapper.readerIndex();
 
-        byte type = wrapper.readByte();
+        byte magic = wrapper.readByte();
         try {
-            RemotingCommand cmd = decode(wrapper, originReaderIndex);
-            if (cmd != null) {
-                cmd.protocolType(type);
+            if (magic != CodecHelper.PROTOCOL_MAGIC) {
+                throw new RemoteCodecException(String.format("MagicCode %d is wrong, expect %d", magic, CodecHelper.PROTOCOL_MAGIC));
             }
-            return cmd;
+            return decode(wrapper, originReaderIndex);
         } catch (final RemoteCodecException e) {
             LOG.warn("Decode error {}, close the channel {}", e.getMessage(), ctx.channel());
             ctx.channel().close().addListener(new ChannelFutureListener() {
@@ -76,7 +74,7 @@
 
     public RemotingCommand decode(final ByteBufferWrapper wrapper, final int originReaderIndex) {
         // Full message isn't available yet, return nothing for now
-        if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1) {
+        if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1 /*MagicCode*/) {
             wrapper.setReaderIndex(originReaderIndex);
             return null;
         }
@@ -91,17 +89,10 @@
             throw new IllegalArgumentException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN));
         }
 
-        if (wrapper.readableBytes() < totalLength) {
+        if (wrapper.readableBytes() < totalLength - 1 /*MagicCode*/ - 4 /*TotalLen*/) {
             wrapper.setReaderIndex(originReaderIndex);
             return null;
         }
-
-        ByteBuffer totalBuffer = ByteBuffer.allocate(totalLength);
-
-        wrapper.readBytes(totalBuffer);
-
-        totalBuffer.flip();
-
-        return CodecHelper.decode(totalBuffer);
+        return CodecHelper.decode(wrapper);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
index 10aa504..78e5e5c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -23,22 +23,16 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
 import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
 import org.apache.rocketmq.remoting.impl.command.CodecHelper;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Encoder extends MessageToByteEncoder<RemotingCommand> {
     private static final Logger LOG = LoggerFactory.getLogger(Encoder.class);
 
-    private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
-
     public Encoder() {
     }
 
@@ -47,7 +41,7 @@
         try {
             ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out);
 
-            encode(serializerFactory, remotingCommand, wrapper);
+            encode(remotingCommand, wrapper);
         } catch (final Exception e) {
             LOG.error("Error occurred when encoding response for channel " + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(), e);
             if (remotingCommand != null) {
@@ -62,28 +56,7 @@
         }
     }
 
-    public void encode(final SerializerFactory serializerFactory, final RemotingCommand remotingCommand,
-        final ByteBufferWrapper out) {
-        ByteBuffer encodeParameter = null;
-        if (remotingCommand.parameterBytes() != null) {
-            encodeParameter = ByteBuffer.wrap(remotingCommand.parameterBytes());
-        } else if (remotingCommand.parameter() != null) {
-            final Serializer serialization = serializerFactory.get(remotingCommand.serializerType());
-            encodeParameter = serialization.encode(remotingCommand.parameter());
-        }
-
-        int parameterLength = encodeParameter != null ? encodeParameter.limit() : 0;
-        int extBodyLength = remotingCommand.extraPayload() != null ? remotingCommand.extraPayload().length : 0;
-
-        ByteBuffer header = CodecHelper.encodeHeader(remotingCommand, parameterLength, extBodyLength);
-        out.writeBytes(header);
-
-        if (encodeParameter != null) {
-            out.writeBytes(encodeParameter);
-        }
-
-        if (remotingCommand.extraPayload() != null) {
-            out.writeBytes(remotingCommand.extraPayload());
-        }
+    public void encode(final RemotingCommand remotingCommand, final ByteBufferWrapper out) {
+        CodecHelper.encodeCommand(remotingCommand, out);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
deleted file mode 100644
index c85d44b..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import com.alibaba.fastjson.JSON;
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-import org.apache.rocketmq.remoting.impl.command.CodecHelper;
-
-public class JsonSerializer implements Serializer {
-    public static final String SERIALIZER_NAME = JsonSerializer.class.getSimpleName();
-    public static final byte SERIALIZER_TYPE = 'J';
-
-    public JsonSerializer() {
-    }
-
-    @Override
-    public String name() {
-        return SERIALIZER_NAME;
-    }
-
-    @Override
-    public byte type() {
-        return SERIALIZER_TYPE;
-    }
-
-    @Override
-    public <T> T decode(final byte[] content, final Class<T> c) {
-        if (content != null) {
-            try {
-                final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
-                return JSON.parseObject(jsonString, c);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
-        return decode(content, typePresentation.getType());
-    }
-
-    @Override
-    public <T> T decode(byte[] content, Type type) {
-        if (content != null) {
-            try {
-                final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
-                return JSON.parseObject(jsonString, type);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public ByteBuffer encode(final Object object) {
-        if (object != null) {
-            String jsonString = JSON.toJSONString(object);
-            byte[] bytes = jsonString.getBytes(CodecHelper.REMOTING_CHARSET);
-            try {
-                return ByteBuffer.wrap(bytes, 0, bytes.length);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return null;
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
deleted file mode 100644
index 06ea217..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-
-public class Kryo3Serializer implements Serializer {
-    public static final String SERIALIZER_NAME = Kryo3Serializer.class.getSimpleName();
-    public static final byte SERIALIZER_TYPE = 'K';
-
-    public Kryo3Serializer() {
-    }
-
-    @Override
-    public String name() {
-        return SERIALIZER_NAME;
-    }
-
-    @Override
-    public byte type() {
-        return SERIALIZER_TYPE;
-    }
-
-    @Override
-    public <T> T decode(final byte[] content, final Class<T> c) {
-        if (content != null) {
-            Input input = null;
-            try {
-                input = new Input(content);
-                return (T) ThreadSafeKryo.getKryoInstance().readClassAndObject(input);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            } finally {
-                input.close();
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
-        return decode(content, typePresentation.getType());
-    }
-
-    @Override
-    public <T> T decode(byte[] content, Type type) {
-        if (type instanceof ParameterizedType) {
-            return decode(content, (Class<? extends T>) ((ParameterizedType) type).getRawType());
-        } else if (type instanceof Class) {
-            return decode(content, (Class<? extends T>) type);
-        }
-        return null;
-    }
-
-    @Override
-    public ByteBuffer encode(final Object object) {
-        if (object != null) {
-            try (Output output = new Output(1024, 1024 * 1024 * 6)) {
-                ThreadSafeKryo.getKryoInstance().writeClassAndObject(output, object);
-                return ByteBuffer.wrap(output.getBuffer(), 0, output.position());
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        return null;
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
deleted file mode 100644
index 1097f8f..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-import org.msgpack.MessagePack;
-import org.msgpack.template.Template;
-
-public class MsgPackSerializer implements Serializer {
-    public static final String SERIALIZER_NAME = MsgPackSerializer.class.getSimpleName();
-    public static final byte SERIALIZER_TYPE = 'M';
-    private final MessagePack messagePack = new MessagePack();
-
-    public MsgPackSerializer() {
-    }
-
-    @Override
-    public String name() {
-        return SERIALIZER_NAME;
-    }
-
-    @Override
-    public byte type() {
-        return SERIALIZER_TYPE;
-    }
-
-    @Override
-    public <T> T decode(final byte[] content, final Class<T> c) {
-        try {
-            return messagePack.read(content, c);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
-        return decode(content, typePresentation.getType());
-    }
-
-    @Override
-    public <T> T decode(byte[] content, Type type) {
-        Template<T> template = (Template<T>) messagePack.lookup(type);
-        try {
-            return messagePack.read(content, template);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public ByteBuffer encode(final Object object) {
-        try {
-            byte[] data = messagePack.write(object);
-            return ByteBuffer.wrap(data);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
deleted file mode 100644
index 632b61f..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-
-public class SerializerFactoryImpl implements SerializerFactory {
-    private static final int MAX_COUNT = 0x0FF;
-    private final Serializer[] tables = new Serializer[MAX_COUNT];
-
-    public SerializerFactoryImpl() {
-        this.register(new JsonSerializer());
-        this.register(new Kryo3Serializer());
-        this.register(new MsgPackSerializer());
-    }
-
-    @Override
-    public void register(Serializer serialization) {
-        if (tables[serialization.type() & MAX_COUNT] != null) {
-            throw new RuntimeException("serialization header's sign is overlapped");
-        }
-        tables[serialization.type() & MAX_COUNT] = serialization;
-    }
-
-    @Override
-    public byte type(final String serializationName) {
-        for (Serializer table : this.tables) {
-            if (table != null) {
-                if (table.name().equalsIgnoreCase(serializationName)) {
-                    return table.type();
-                }
-            }
-        }
-
-        throw new IllegalArgumentException(String.format("the serialization: %s not exist", serializationName));
-    }
-
-    @Override
-    public Serializer get(byte type) {
-        return tables[type & MAX_COUNT];
-    }
-
-    @Override
-    public void clearAll() {
-        for (int i = 0; i < this.tables.length; i++) {
-            this.tables[i] = null;
-        }
-    }
-
-    public Serializer[] getTables() {
-        return tables;
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
deleted file mode 100644
index cadfc27..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Currency;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-public class ThreadSafeKryo {
-    private static final ThreadLocal<Kryo> KRYOS = new ThreadLocal<Kryo>() {
-        protected Kryo initialValue() {
-            Kryo kryo = new Kryo();
-
-            kryo.register(byte[].class);
-            kryo.register(char[].class);
-            kryo.register(short[].class);
-            kryo.register(int[].class);
-            kryo.register(long[].class);
-            kryo.register(float[].class);
-            kryo.register(double[].class);
-            kryo.register(boolean[].class);
-            kryo.register(String[].class);
-            kryo.register(Object[].class);
-            kryo.register(KryoSerializable.class);
-            kryo.register(BigInteger.class);
-            kryo.register(BigDecimal.class);
-            kryo.register(Class.class);
-            kryo.register(Date.class);
-            // kryo.register(Enum.class);
-            kryo.register(EnumSet.class);
-            kryo.register(Currency.class);
-            kryo.register(StringBuffer.class);
-            kryo.register(StringBuilder.class);
-            kryo.register(Collections.EMPTY_LIST.getClass());
-            kryo.register(Collections.EMPTY_MAP.getClass());
-            kryo.register(Collections.EMPTY_SET.getClass());
-            kryo.register(Collections.singletonList(null).getClass());
-            kryo.register(Collections.singletonMap(null, null).getClass());
-            kryo.register(Collections.singleton(null).getClass());
-            kryo.register(TreeSet.class);
-            kryo.register(Collection.class);
-            kryo.register(TreeMap.class);
-            kryo.register(Map.class);
-            try {
-                kryo.register(Class.forName("sun.util.calendar.ZoneInfo"));
-            } catch (ClassNotFoundException e) {
-                // Noop
-            }
-            kryo.register(Calendar.class);
-            kryo.register(Locale.class);
-
-            kryo.register(BitSet.class);
-            kryo.register(HashMap.class);
-            kryo.register(Timestamp.class);
-            kryo.register(ArrayList.class);
-
-            // kryo.setRegistrationRequired(true);
-            kryo.setReferences(false);
-            kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
-
-            return kryo;
-        }
-    };
-
-    public static Kryo getKryoInstance() {
-        return KRYOS.get();
-    }
-}