Polish the protocol/encoder/decoder etc.
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
index 2f6343c..8802148 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
@@ -22,7 +22,7 @@
import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
import org.apache.rocketmq.remoting.common.Pair;
-public interface RemotingService extends RemotingMarshaller, ConnectionService, ObjectLifecycle {
+public interface RemotingService extends ConnectionService, ObjectLifecycle {
void registerInterceptor(Interceptor interceptor);
void registerRequestProcessor(final String requestCode, final RequestProcessor processor,
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java
index f21a45d..8496b04 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java
@@ -17,31 +17,28 @@
package org.apache.rocketmq.remoting.api.command;
-import java.lang.reflect.Type;
import java.util.Map;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.common.TypePresentation;
public interface RemotingCommand {
- byte protocolType();
+ short cmdCode();
- void protocolType(byte value);
+ void cmdCode(short code);
+
+ short cmdVersion();
+
+ void cmdVersion(short version);
int requestID();
void requestID(int value);
- byte serializerType();
-
- void serializerType(byte value);
-
TrafficType trafficType();
void trafficType(TrafficType value);
- String opCode();
+ short opCode();
- void opCode(String value);
+ void opCode(short value);
String remark();
@@ -55,36 +52,7 @@
void property(String key, String value);
- Object parameter();
+ byte[] payload();
- void parameter(Object value);
-
- byte[] parameterBytes();
-
- void parameterBytes(byte[] value);
-
- byte[] extraPayload();
-
- void extraPayload(byte[] value);
-
- <T> T parameter(final SerializerFactory serializerFactory, Class<T> c);
-
- <T> T parameter(final SerializerFactory serializerFactory, final TypePresentation<T> typePresentation);
-
- <T> T parameter(final SerializerFactory serializerFactory, final Type type);
-
- enum CommandFlag {
- SUCCESS("0"),
- ERROR("-1");
-
- private String flag;
-
- CommandFlag(final String flag) {
- this.flag = flag;
- }
-
- public String flag() {
- return flag;
- }
- }
+ void payload(byte[] payload);
}
\ No newline at end of file
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java
similarity index 94%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java
rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java
index 4688c45..d5c378e 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.api.compressable;
+package org.apache.rocketmq.remoting.api.compression;
public interface Compressor {
String name();
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java
similarity index 94%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java
rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java
index 2494c78..4afd599 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.api.compressable;
+package org.apache.rocketmq.remoting.api.compression;
public interface CompressorFactory {
void register(Compressor compressor);
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
deleted file mode 100644
index 8ef8dcd..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.serializable;
-
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-
-public interface Serializer {
- String name();
-
- byte type();
-
- <T> T decode(final byte[] content, final Class<T> c);
-
- <T> T decode(final byte[] content, final TypePresentation<T> typePresentation);
-
- <T> T decode(final byte[] content, final Type type);
-
- ByteBuffer encode(final Object object);
-}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
deleted file mode 100644
index b47bf99..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.serializable;
-
-public interface SerializerFactory {
- void register(Serializer serialization);
-
- byte type(String serializationName);
-
- Serializer get(byte type);
-
- void clearAll();
-}
diff --git a/remoting-core/remoting-impl/pom.xml b/remoting-core/remoting-impl/pom.xml
index c8be150..604416b 100644
--- a/remoting-core/remoting-impl/pom.xml
+++ b/remoting-core/remoting-impl/pom.xml
@@ -25,6 +25,17 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@@ -35,14 +46,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>com.esotericsoftware</groupId>
- <artifactId>kryo</artifactId>
- </dependency>
- <dependency>
- <groupId>org.msgpack</groupId>
- <artifactId>msgpack</artifactId>
- </dependency>
- <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>1.1.33.Fork26</version>
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index f7a4b67..3f5282c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -19,8 +19,7 @@
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
+import org.apache.rocketmq.remoting.impl.compression.GZipCompressor;
public class RemotingConfig extends TcpSocketConfig {
private int connectionMaxRetries = 3;
@@ -37,7 +36,6 @@
private int threadTaskLowWaterMark = 30000;
private int threadTaskHighWaterMark = 50000;
private int connectionRetryBackoffMillis = 3000;
- private String serializerName = JsonSerializer.SERIALIZER_NAME;
private String compressorName = GZipCompressor.COMPRESSOR_NAME;
private int serviceThreadBlockQueueSize = 50000;
private boolean clientNativeEpollEnable = false;
@@ -147,14 +145,6 @@
this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
}
- public String getSerializerName() {
- return serializerName;
- }
-
- public void setSerializerName(final String serializerName) {
- this.serializerName = serializerName;
- }
-
public String getCompressorName() {
return compressorName;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
index 44d4fd9..bfc536b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -17,36 +17,43 @@
package org.apache.rocketmq.remoting.impl.command;
-import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map.Entry;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
public class CodecHelper {
- //ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength
- public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4;
- public final static char PROPERTY_SEPARATOR = '\n';
- public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
+ // ProtocolMagic(1) + TotalLength(4) + CmdCode(2) + CmdVersion(2) + RequestID(4) + TrafficType(1) + OpCode(2)
+ // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4);
+ public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4;
+ private final static char PROPERTY_SEPARATOR = '\n';
+ private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
- public final static int CODE_MAX_LEN = 512;
- public final static int PARAMETER_MAX_LEN = 33554432;
- public final static int BODY_MAX_LEN = 33554432;
- public final static int PACKET_MAX_LEN = 33554432;
+ public final static byte PROTOCOL_MAGIC = 0x14;
+ private final static int REMARK_MAX_LEN = Short.MAX_VALUE;
+ private final static int PROPERTY_MAX_LEN = 524288; // 512KB
+ private final static int PAYLOAD_MAX_LEN = 16777216; // 16MB
+ public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN;
- public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength,
- final int extraPayload) {
- byte[] code = command.opCode().getBytes(REMOTING_CHARSET);
- int codeLength = code.length;
+ public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) {
+ out.writeByte(PROTOCOL_MAGIC);
- byte[] remark = command.remark().getBytes(REMOTING_CHARSET);
- int remarkLength = remark.length;
+ short remarkLen = 0;
+ byte [] remark = null;
+ if (command.remark() != null) {
+ remark = command.remark().getBytes(REMOTING_CHARSET);
+ if (remark.length > REMARK_MAX_LEN) {
+ throw new RemoteCodecException(String.format("Remark len: %d over max limit: %d", remark.length, REMARK_MAX_LEN));
+ }
+ remarkLen = (short) remark.length;
+ }
byte[][] props = null;
- int propsLength = 0;
+ int propsLen = 0;
StringBuilder sb = new StringBuilder();
- if (!command.properties().isEmpty()) {
+ if (command.properties() != null && !command.properties().isEmpty()) {
props = new byte[command.properties().size()][];
int i = 0;
for (Entry<String, String> next : command.properties().entrySet()) {
@@ -57,122 +64,110 @@
props[i] = sb.toString().getBytes(REMOTING_CHARSET);
- propsLength += 2;
- propsLength += props[i].length;
+ if (props[i].length > Short.MAX_VALUE) {
+ throw new RemoteCodecException(String.format("Property KV len: %d over max limit: %d", props[i].length, Short.MAX_VALUE));
+ }
+
+ propsLen += 2;
+ propsLen += props[i].length;
i++;
}
}
- int totalLength = MIN_PROTOCOL_LEN - 1 - 4
- + codeLength
- + remarkLength
- + propsLength
- + parameterLength
- + extraPayload;
-
- int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload;
-
- ByteBuffer buf = ByteBuffer.allocate(headerLength);
- buf.put(command.protocolType());
- buf.putInt(totalLength);
- buf.putInt(command.requestID());
- buf.put(command.serializerType());
- buf.put((byte) command.trafficType().ordinal());
-
- buf.putShort((short) codeLength);
- if (codeLength > 0) {
- buf.put(code);
+ if (propsLen > PROPERTY_MAX_LEN) {
+ throw new RemoteCodecException(String.format("Properties total len: %d over max limit: %d", propsLen, PROPERTY_MAX_LEN));
}
- buf.putShort((short) remarkLength);
- if (remarkLength > 0) {
- buf.put(remark);
+
+ int payloadLen = command.payload() == null ? 0 : command.payload().length;
+
+ if (payloadLen > PAYLOAD_MAX_LEN) {
+ throw new RemoteCodecException(String.format("Payload len: %d over max limit: %d", payloadLen, PAYLOAD_MAX_LEN));
}
- if (props != null) {
- buf.putShort((short) props.length);
+
+ int totalLength = MIN_PROTOCOL_LEN
+ + remarkLen
+ + propsLen
+ + payloadLen;
+
+ out.writeInt(totalLength);
+ out.writeShort(command.cmdCode());
+ out.writeShort(command.cmdVersion());
+ out.writeInt(command.requestID());
+ out.writeByte((byte) command.trafficType().ordinal());
+ out.writeShort(command.opCode());
+
+ out.writeShort(remarkLen);
+ if (remarkLen != 0) {
+ out.writeBytes(remark);
+ }
+
+ if (propsLen != 0) {
+ out.writeShort((short) props.length);
for (byte[] prop : props) {
- buf.putShort((short) prop.length);
- buf.put(prop);
+ out.writeShort((short) prop.length);
+ out.writeBytes(prop);
}
- } else {
- buf.putShort((short) 0);
}
- buf.putInt(parameterLength);
-
- buf.flip();
-
- return buf;
+ out.writeInt(payloadLen);
+ if (payloadLen != 0) {
+ out.writeBytes(command.payload());
+ }
}
- public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+ public static RemotingCommand decode(final ByteBufferWrapper in) {
RemotingCommandImpl cmd = new RemotingCommandImpl();
- int totalLength = byteBuffer.limit();
- cmd.requestID(byteBuffer.getInt());
- cmd.serializerType(byteBuffer.get());
- cmd.trafficType(TrafficType.parse(byteBuffer.get()));
- {
- short size = byteBuffer.getShort();
- if (size > 0 && size <= CODE_MAX_LEN) {
- byte[] bytes = new byte[size];
- byteBuffer.get(bytes);
- String str = new String(bytes, REMOTING_CHARSET);
- cmd.opCode(str);
- } else {
- throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN));
- }
+ cmd.cmdCode(in.readShort());
+ cmd.cmdVersion(in.readShort());
+ cmd.requestID(in.readInt());
+ cmd.trafficType(TrafficType.parse(in.readByte()));
+ cmd.opCode(in.readShort());
+
+
+ short remarkLen = in.readShort();
+ if (remarkLen > 0) {
+ byte[] bytes = new byte[remarkLen];
+ in.readBytes(bytes);
+ String str = new String(bytes, REMOTING_CHARSET);
+ cmd.remark(str);
}
- {
- short size = byteBuffer.getShort();
- if (size > 0) {
- byte[] bytes = new byte[size];
- byteBuffer.get(bytes);
- String str = new String(bytes, REMOTING_CHARSET);
- cmd.remark(str);
- }
- }
-
- {
- short size = byteBuffer.getShort();
- if (size > 0) {
- for (int i = 0; i < size; i++) {
- short length = byteBuffer.getShort();
- if (length > 0) {
- byte[] bytes = new byte[length];
- byteBuffer.get(bytes);
- String str = new String(bytes, REMOTING_CHARSET);
- int index = str.indexOf(PROPERTY_SEPARATOR);
- if (index > 0) {
- String key = str.substring(0, index);
- String value = str.substring(index + 1);
- cmd.property(key, value);
- }
+ short propsSize = in.readShort();
+ int propsLen = 0;
+ if (propsSize > 0) {
+ for (int i = 0; i < propsSize; i++) {
+ short length = in.readShort();
+ if (length > 0) {
+ byte[] bytes = new byte[length];
+ in.readBytes(bytes);
+ String str = new String(bytes, REMOTING_CHARSET);
+ int index = str.indexOf(PROPERTY_SEPARATOR);
+ if (index > 0) {
+ String key = str.substring(0, index);
+ String value = str.substring(index + 1);
+ cmd.property(key, value);
}
}
+
+ propsLen += 2;
+ propsLen += length;
+ if (propsLen > PROPERTY_MAX_LEN) {
+ throw new RemoteCodecException(String.format("Properties total len: %d over max limit: %d", propsLen, PROPERTY_MAX_LEN));
+ }
}
}
- {
- int size = byteBuffer.getInt();
- if (size > 0 && size <= PARAMETER_MAX_LEN) {
- byte[] bytes = new byte[size];
- byteBuffer.get(bytes);
- cmd.parameterBytes(bytes);
- } else if (size != 0) {
- throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN));
- }
+ int payloadLen = in.readInt();
+
+ if (payloadLen > PAYLOAD_MAX_LEN) {
+ throw new RemoteCodecException(String.format("Payload len: %d over max limit: %d", payloadLen, PAYLOAD_MAX_LEN));
}
- {
- int size = totalLength - byteBuffer.position();
- if (size > 0 && size <= BODY_MAX_LEN) {
- byte[] bytes = new byte[size];
- byteBuffer.get(bytes);
- cmd.extraPayload(bytes);
- } else if (size != 0) {
- throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN));
- }
+ if (payloadLen > 0) {
+ byte[] bytes = new byte[payloadLen];
+ in.readBytes(bytes);
+ cmd.payload(bytes);
}
return cmd;
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
index 6e1efaa..adbb42c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
@@ -20,37 +20,23 @@
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
import org.apache.rocketmq.remoting.api.command.TrafficType;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
public class RemotingCommandFactoryImpl implements RemotingCommandFactory {
- private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
- private byte serializeType = JsonSerializer.SERIALIZER_TYPE;
-
- private byte PROTOCOL_MAGIC = 0x14;
-
public RemotingCommandFactoryImpl() {
}
- public RemotingCommandFactoryImpl(final String serializeName) {
- this.serializeType = serializerFactory.type(serializeName);
- }
-
@Override
public RemotingCommand createRequest() {
RemotingCommand request = new RemotingCommandImpl();
- request.protocolType(this.PROTOCOL_MAGIC);
- request.serializerType(this.serializeType);
return request;
}
@Override
- public RemotingCommand createResponse(final RemotingCommand command) {
+ public RemotingCommand createResponse(final RemotingCommand request) {
RemotingCommand response = new RemotingCommandImpl();
- response.requestID(command.requestID());
- response.protocolType(command.protocolType());
- response.serializerType(command.serializerType());
+ response.cmdCode(request.cmdCode());
+ response.cmdVersion(request.cmdVersion());
+ response.requestID(request.requestID());
response.trafficType(TrafficType.RESPONSE);
return response;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
index bcf2338..b405eda 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.remoting.impl.command;
-import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -26,39 +25,23 @@
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.common.TypePresentation;
public class RemotingCommandImpl implements RemotingCommand {
public final static RequestIdGenerator REQUEST_ID_GENERATOR = RequestIdGenerator.inst;
- private byte protocolType;
- private byte serializeType;
-
+ private short cmdCode;
+ private short cmdVersion;
private volatile int requestId = REQUEST_ID_GENERATOR.incrementAndGet();
private TrafficType trafficType = TrafficType.REQUEST_SYNC;
- private String code = CommandFlag.SUCCESS.flag();
+ private short opCode = RemotingSysResponseCode.SUCCESS;
private String remark = "";
- private Map<String, String> properties = new HashMap<String, String>();
- private Object parameter;
- private byte[] extraPayload;
-
- private byte[] parameterByte;
+ private Map<String, String> properties = new HashMap<>();
+ private byte[] payload;
protected RemotingCommandImpl() {
}
@Override
- public byte protocolType() {
- return this.protocolType;
- }
-
- @Override
- public void protocolType(byte value) {
- this.protocolType = value;
- }
-
- @Override
public int requestID() {
return requestId;
}
@@ -69,16 +52,6 @@
}
@Override
- public byte serializerType() {
- return this.serializeType;
- }
-
- @Override
- public void serializerType(byte value) {
- this.serializeType = value;
- }
-
- @Override
public TrafficType trafficType() {
return this.trafficType;
}
@@ -89,13 +62,13 @@
}
@Override
- public String opCode() {
- return this.code;
+ public short opCode() {
+ return this.opCode;
}
@Override
- public void opCode(String value) {
- this.code = value;
+ public void opCode(short value) {
+ this.opCode = value;
}
@Override
@@ -129,68 +102,33 @@
}
@Override
- public Object parameter() {
- return this.parameter;
+ public short cmdCode() {
+ return this.cmdCode;
}
@Override
- public void parameter(Object value) {
- this.parameter = value;
+ public void cmdCode(short code) {
+ this.cmdCode = code;
}
@Override
- public byte[] parameterBytes() {
- return this.getParameterByte();
- }
-
- public byte[] getParameterByte() {
- return parameterByte;
- }
-
- public void setParameterByte(byte[] parameterByte) {
- this.parameterByte = parameterByte;
+ public short cmdVersion() {
+ return this.cmdVersion;
}
@Override
- public void parameterBytes(byte[] value) {
- this.setParameterByte(value);
+ public void cmdVersion(short version) {
+ this.cmdVersion = version;
}
@Override
- public byte[] extraPayload() {
- return this.extraPayload;
+ public byte[] payload() {
+ return this.payload;
}
@Override
- public void extraPayload(byte[] value) {
- this.extraPayload = value;
- }
-
- @Override
- public <T> T parameter(SerializerFactory serializerFactory, Class<T> c) {
- if (this.parameter() != null)
- return (T) this.parameter();
- final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), c);
- this.parameter(decode);
- return decode;
- }
-
- @Override
- public <T> T parameter(SerializerFactory serializerFactory, TypePresentation<T> typePresentation) {
- if (this.parameter() != null)
- return (T) this.parameter();
- final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), typePresentation);
- this.parameter(decode);
- return decode;
- }
-
- @Override
- public <T> T parameter(SerializerFactory serializerFactory, Type type) {
- if (this.parameter() != null)
- return (T) this.parameter();
- final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), type);
- this.parameter(decode);
- return decode;
+ public void payload(byte[] payload) {
+ this.payload = payload;
}
@Override
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
similarity index 69%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
index 62c9dda..ae76c6f 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
@@ -15,10 +15,17 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.api;
+package org.apache.rocketmq.remoting.impl.command;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+public class RemotingSysResponseCode {
-public interface RemotingMarshaller {
- SerializerFactory serializerFactory();
+ public static final short SUCCESS = 0;
+
+ public static final short SYSTEM_ERROR = 1;
+
+ public static final short SYSTEM_BUSY = 2;
+
+ public static final short REQUEST_CODE_NOT_SUPPORTED = 3;
+
+ public static final short TRANSACTION_FAILED = 4;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java
similarity index 91%
rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java
index 10e97ba..40576ad 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.impl.protocol.compression;
+package org.apache.rocketmq.remoting.impl.compression;
-import org.apache.rocketmq.remoting.api.compressable.Compressor;
-import org.apache.rocketmq.remoting.api.compressable.CompressorFactory;
+import org.apache.rocketmq.remoting.api.compression.Compressor;
+import org.apache.rocketmq.remoting.api.compression.CompressorFactory;
public class CompressorFactoryImpl implements CompressorFactory {
private static final int MAX_COUNT = 0x0FF;
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java
similarity index 95%
rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java
index fc33f4c..53dd4bf 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.impl.protocol.compression;
+package org.apache.rocketmq.remoting.impl.compression;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -23,7 +23,7 @@
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import org.apache.rocketmq.remoting.api.compressable.Compressor;
+import org.apache.rocketmq.remoting.api.compression.Compressor;
public class GZipCompressor implements Compressor {
public static final int BUFFER = 1024;
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index a4c33e1..5026224 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -48,8 +48,6 @@
import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.ResponseResult;
@@ -58,7 +56,7 @@
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.internal.UIDGenerator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -66,7 +64,6 @@
public abstract class NettyRemotingAbstract implements RemotingService {
protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
- protected final SerializerFactory serializerFactory = new SerializerFactoryImpl();
protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
private final Semaphore semaphoreOneway;
private final Semaphore semaphoreAsync;
@@ -87,11 +84,7 @@
this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientAsyncCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
- this.remotingCommandFactory = new RemotingCommandFactoryImpl(clientConfig.getSerializerName());
- }
-
- public SerializerFactory getSerializerFactory() {
- return serializerFactory;
+ this.remotingCommandFactory = new RemotingCommandFactoryImpl();
}
protected void putNettyEvent(final NettyChannelEvent event) {
@@ -158,7 +151,7 @@
extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
RemotingCommand response = remotingCommandFactory.createResponse(cmd);
- response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
+ response.opCode(RemotingSysResponseCode.SYSTEM_BUSY);
response.remark("SYSTEM_BUSY");
writeAndFlush(ctx.channel(), response);
}
@@ -194,19 +187,9 @@
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
//FiXME Exception interceptor can not throw exception
interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
- RemotingCommand response = remotingCommandFactory.createResponse(cmd);
- response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
- response.remark(serializeException(cmd.serializerType(), e));
- response.property("Exception", e.getClass().getName());
- ctx.writeAndFlush(response);
}
}
- private String serializeException(byte serializeType, Throwable exception) {
- final Serializer serialization = getSerializerFactory().get(serializeType);
- return serialization.encode(exception).toString();
- }
-
private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
if (response != null) {
@@ -508,11 +491,6 @@
}
@Override
- public SerializerFactory serializerFactory() {
- return this.serializerFactory;
- }
-
- @Override
public RemotingCommandFactory commandFactory() {
return this.remotingCommandFactory;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
index ec1d69d..f239ee9 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
@@ -22,7 +22,6 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
-import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -55,13 +54,12 @@
private Object decode(final ChannelHandlerContext ctx, ByteBufferWrapper wrapper) throws Exception {
int originReaderIndex = wrapper.readerIndex();
- byte type = wrapper.readByte();
+ byte magic = wrapper.readByte();
try {
- RemotingCommand cmd = decode(wrapper, originReaderIndex);
- if (cmd != null) {
- cmd.protocolType(type);
+ if (magic != CodecHelper.PROTOCOL_MAGIC) {
+ throw new RemoteCodecException(String.format("MagicCode %d is wrong, expect %d", magic, CodecHelper.PROTOCOL_MAGIC));
}
- return cmd;
+ return decode(wrapper, originReaderIndex);
} catch (final RemoteCodecException e) {
LOG.warn("Decode error {}, close the channel {}", e.getMessage(), ctx.channel());
ctx.channel().close().addListener(new ChannelFutureListener() {
@@ -76,7 +74,7 @@
public RemotingCommand decode(final ByteBufferWrapper wrapper, final int originReaderIndex) {
// Full message isn't available yet, return nothing for now
- if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1) {
+ if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1 /*MagicCode*/) {
wrapper.setReaderIndex(originReaderIndex);
return null;
}
@@ -91,17 +89,10 @@
throw new IllegalArgumentException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN));
}
- if (wrapper.readableBytes() < totalLength) {
+ if (wrapper.readableBytes() < totalLength - 1 /*MagicCode*/ - 4 /*TotalLen*/) {
wrapper.setReaderIndex(originReaderIndex);
return null;
}
-
- ByteBuffer totalBuffer = ByteBuffer.allocate(totalLength);
-
- wrapper.readBytes(totalBuffer);
-
- totalBuffer.flip();
-
- return CodecHelper.decode(totalBuffer);
+ return CodecHelper.decode(wrapper);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
index 10aa504..78e5e5c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -23,22 +23,16 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
import org.apache.rocketmq.remoting.impl.command.CodecHelper;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Encoder extends MessageToByteEncoder<RemotingCommand> {
private static final Logger LOG = LoggerFactory.getLogger(Encoder.class);
- private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
-
public Encoder() {
}
@@ -47,7 +41,7 @@
try {
ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out);
- encode(serializerFactory, remotingCommand, wrapper);
+ encode(remotingCommand, wrapper);
} catch (final Exception e) {
LOG.error("Error occurred when encoding response for channel " + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(), e);
if (remotingCommand != null) {
@@ -62,28 +56,7 @@
}
}
- public void encode(final SerializerFactory serializerFactory, final RemotingCommand remotingCommand,
- final ByteBufferWrapper out) {
- ByteBuffer encodeParameter = null;
- if (remotingCommand.parameterBytes() != null) {
- encodeParameter = ByteBuffer.wrap(remotingCommand.parameterBytes());
- } else if (remotingCommand.parameter() != null) {
- final Serializer serialization = serializerFactory.get(remotingCommand.serializerType());
- encodeParameter = serialization.encode(remotingCommand.parameter());
- }
-
- int parameterLength = encodeParameter != null ? encodeParameter.limit() : 0;
- int extBodyLength = remotingCommand.extraPayload() != null ? remotingCommand.extraPayload().length : 0;
-
- ByteBuffer header = CodecHelper.encodeHeader(remotingCommand, parameterLength, extBodyLength);
- out.writeBytes(header);
-
- if (encodeParameter != null) {
- out.writeBytes(encodeParameter);
- }
-
- if (remotingCommand.extraPayload() != null) {
- out.writeBytes(remotingCommand.extraPayload());
- }
+ public void encode(final RemotingCommand remotingCommand, final ByteBufferWrapper out) {
+ CodecHelper.encodeCommand(remotingCommand, out);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
deleted file mode 100644
index c85d44b..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import com.alibaba.fastjson.JSON;
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-import org.apache.rocketmq.remoting.impl.command.CodecHelper;
-
-public class JsonSerializer implements Serializer {
- public static final String SERIALIZER_NAME = JsonSerializer.class.getSimpleName();
- public static final byte SERIALIZER_TYPE = 'J';
-
- public JsonSerializer() {
- }
-
- @Override
- public String name() {
- return SERIALIZER_NAME;
- }
-
- @Override
- public byte type() {
- return SERIALIZER_TYPE;
- }
-
- @Override
- public <T> T decode(final byte[] content, final Class<T> c) {
- if (content != null) {
- try {
- final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
- return JSON.parseObject(jsonString, c);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-
- @Override
- public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
- return decode(content, typePresentation.getType());
- }
-
- @Override
- public <T> T decode(byte[] content, Type type) {
- if (content != null) {
- try {
- final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
- return JSON.parseObject(jsonString, type);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-
- @Override
- public ByteBuffer encode(final Object object) {
- if (object != null) {
- String jsonString = JSON.toJSONString(object);
- byte[] bytes = jsonString.getBytes(CodecHelper.REMOTING_CHARSET);
- try {
- return ByteBuffer.wrap(bytes, 0, bytes.length);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
deleted file mode 100644
index 06ea217..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-
-public class Kryo3Serializer implements Serializer {
- public static final String SERIALIZER_NAME = Kryo3Serializer.class.getSimpleName();
- public static final byte SERIALIZER_TYPE = 'K';
-
- public Kryo3Serializer() {
- }
-
- @Override
- public String name() {
- return SERIALIZER_NAME;
- }
-
- @Override
- public byte type() {
- return SERIALIZER_TYPE;
- }
-
- @Override
- public <T> T decode(final byte[] content, final Class<T> c) {
- if (content != null) {
- Input input = null;
- try {
- input = new Input(content);
- return (T) ThreadSafeKryo.getKryoInstance().readClassAndObject(input);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- input.close();
- }
- }
-
- return null;
- }
-
- @Override
- public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
- return decode(content, typePresentation.getType());
- }
-
- @Override
- public <T> T decode(byte[] content, Type type) {
- if (type instanceof ParameterizedType) {
- return decode(content, (Class<? extends T>) ((ParameterizedType) type).getRawType());
- } else if (type instanceof Class) {
- return decode(content, (Class<? extends T>) type);
- }
- return null;
- }
-
- @Override
- public ByteBuffer encode(final Object object) {
- if (object != null) {
- try (Output output = new Output(1024, 1024 * 1024 * 6)) {
- ThreadSafeKryo.getKryoInstance().writeClassAndObject(output, object);
- return ByteBuffer.wrap(output.getBuffer(), 0, output.position());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- return null;
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
deleted file mode 100644
index 1097f8f..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import java.lang.reflect.Type;
-import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.common.TypePresentation;
-import org.msgpack.MessagePack;
-import org.msgpack.template.Template;
-
-public class MsgPackSerializer implements Serializer {
- public static final String SERIALIZER_NAME = MsgPackSerializer.class.getSimpleName();
- public static final byte SERIALIZER_TYPE = 'M';
- private final MessagePack messagePack = new MessagePack();
-
- public MsgPackSerializer() {
- }
-
- @Override
- public String name() {
- return SERIALIZER_NAME;
- }
-
- @Override
- public byte type() {
- return SERIALIZER_TYPE;
- }
-
- @Override
- public <T> T decode(final byte[] content, final Class<T> c) {
- try {
- return messagePack.read(content, c);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
- return decode(content, typePresentation.getType());
- }
-
- @Override
- public <T> T decode(byte[] content, Type type) {
- Template<T> template = (Template<T>) messagePack.lookup(type);
- try {
- return messagePack.read(content, template);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ByteBuffer encode(final Object object) {
- try {
- byte[] data = messagePack.write(object);
- return ByteBuffer.wrap(data);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
deleted file mode 100644
index 632b61f..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import org.apache.rocketmq.remoting.api.serializable.Serializer;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-
-public class SerializerFactoryImpl implements SerializerFactory {
- private static final int MAX_COUNT = 0x0FF;
- private final Serializer[] tables = new Serializer[MAX_COUNT];
-
- public SerializerFactoryImpl() {
- this.register(new JsonSerializer());
- this.register(new Kryo3Serializer());
- this.register(new MsgPackSerializer());
- }
-
- @Override
- public void register(Serializer serialization) {
- if (tables[serialization.type() & MAX_COUNT] != null) {
- throw new RuntimeException("serialization header's sign is overlapped");
- }
- tables[serialization.type() & MAX_COUNT] = serialization;
- }
-
- @Override
- public byte type(final String serializationName) {
- for (Serializer table : this.tables) {
- if (table != null) {
- if (table.name().equalsIgnoreCase(serializationName)) {
- return table.type();
- }
- }
- }
-
- throw new IllegalArgumentException(String.format("the serialization: %s not exist", serializationName));
- }
-
- @Override
- public Serializer get(byte type) {
- return tables[type & MAX_COUNT];
- }
-
- @Override
- public void clearAll() {
- for (int i = 0; i < this.tables.length; i++) {
- this.tables[i] = null;
- }
- }
-
- public Serializer[] getTables() {
- return tables;
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
deleted file mode 100644
index cadfc27..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol.serializer;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Currency;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-public class ThreadSafeKryo {
- private static final ThreadLocal<Kryo> KRYOS = new ThreadLocal<Kryo>() {
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
-
- kryo.register(byte[].class);
- kryo.register(char[].class);
- kryo.register(short[].class);
- kryo.register(int[].class);
- kryo.register(long[].class);
- kryo.register(float[].class);
- kryo.register(double[].class);
- kryo.register(boolean[].class);
- kryo.register(String[].class);
- kryo.register(Object[].class);
- kryo.register(KryoSerializable.class);
- kryo.register(BigInteger.class);
- kryo.register(BigDecimal.class);
- kryo.register(Class.class);
- kryo.register(Date.class);
- // kryo.register(Enum.class);
- kryo.register(EnumSet.class);
- kryo.register(Currency.class);
- kryo.register(StringBuffer.class);
- kryo.register(StringBuilder.class);
- kryo.register(Collections.EMPTY_LIST.getClass());
- kryo.register(Collections.EMPTY_MAP.getClass());
- kryo.register(Collections.EMPTY_SET.getClass());
- kryo.register(Collections.singletonList(null).getClass());
- kryo.register(Collections.singletonMap(null, null).getClass());
- kryo.register(Collections.singleton(null).getClass());
- kryo.register(TreeSet.class);
- kryo.register(Collection.class);
- kryo.register(TreeMap.class);
- kryo.register(Map.class);
- try {
- kryo.register(Class.forName("sun.util.calendar.ZoneInfo"));
- } catch (ClassNotFoundException e) {
- // Noop
- }
- kryo.register(Calendar.class);
- kryo.register(Locale.class);
-
- kryo.register(BitSet.class);
- kryo.register(HashMap.class);
- kryo.register(Timestamp.class);
- kryo.register(ArrayList.class);
-
- // kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
-
- return kryo;
- }
- };
-
- public static Kryo getKryoInstance() {
- return KRYOS.get();
- }
-}