blob: 44d4fd9957a61d76d83e93c6b3664a338517c0df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.impl.command;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map.Entry;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
public class CodecHelper {
//ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength
public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4;
public final static char PROPERTY_SEPARATOR = '\n';
public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
public final static int CODE_MAX_LEN = 512;
public final static int PARAMETER_MAX_LEN = 33554432;
public final static int BODY_MAX_LEN = 33554432;
public final static int PACKET_MAX_LEN = 33554432;
public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength,
final int extraPayload) {
byte[] code = command.opCode().getBytes(REMOTING_CHARSET);
int codeLength = code.length;
byte[] remark = command.remark().getBytes(REMOTING_CHARSET);
int remarkLength = remark.length;
byte[][] props = null;
int propsLength = 0;
StringBuilder sb = new StringBuilder();
if (!command.properties().isEmpty()) {
props = new byte[command.properties().size()][];
int i = 0;
for (Entry<String, String> next : command.properties().entrySet()) {
sb.setLength(0);
sb.append(next.getKey());
sb.append(PROPERTY_SEPARATOR);
sb.append(next.getValue());
props[i] = sb.toString().getBytes(REMOTING_CHARSET);
propsLength += 2;
propsLength += props[i].length;
i++;
}
}
int totalLength = MIN_PROTOCOL_LEN - 1 - 4
+ codeLength
+ remarkLength
+ propsLength
+ parameterLength
+ extraPayload;
int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload;
ByteBuffer buf = ByteBuffer.allocate(headerLength);
buf.put(command.protocolType());
buf.putInt(totalLength);
buf.putInt(command.requestID());
buf.put(command.serializerType());
buf.put((byte) command.trafficType().ordinal());
buf.putShort((short) codeLength);
if (codeLength > 0) {
buf.put(code);
}
buf.putShort((short) remarkLength);
if (remarkLength > 0) {
buf.put(remark);
}
if (props != null) {
buf.putShort((short) props.length);
for (byte[] prop : props) {
buf.putShort((short) prop.length);
buf.put(prop);
}
} else {
buf.putShort((short) 0);
}
buf.putInt(parameterLength);
buf.flip();
return buf;
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
RemotingCommandImpl cmd = new RemotingCommandImpl();
int totalLength = byteBuffer.limit();
cmd.requestID(byteBuffer.getInt());
cmd.serializerType(byteBuffer.get());
cmd.trafficType(TrafficType.parse(byteBuffer.get()));
{
short size = byteBuffer.getShort();
if (size > 0 && size <= CODE_MAX_LEN) {
byte[] bytes = new byte[size];
byteBuffer.get(bytes);
String str = new String(bytes, REMOTING_CHARSET);
cmd.opCode(str);
} else {
throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN));
}
}
{
short size = byteBuffer.getShort();
if (size > 0) {
byte[] bytes = new byte[size];
byteBuffer.get(bytes);
String str = new String(bytes, REMOTING_CHARSET);
cmd.remark(str);
}
}
{
short size = byteBuffer.getShort();
if (size > 0) {
for (int i = 0; i < size; i++) {
short length = byteBuffer.getShort();
if (length > 0) {
byte[] bytes = new byte[length];
byteBuffer.get(bytes);
String str = new String(bytes, REMOTING_CHARSET);
int index = str.indexOf(PROPERTY_SEPARATOR);
if (index > 0) {
String key = str.substring(0, index);
String value = str.substring(index + 1);
cmd.property(key, value);
}
}
}
}
}
{
int size = byteBuffer.getInt();
if (size > 0 && size <= PARAMETER_MAX_LEN) {
byte[] bytes = new byte[size];
byteBuffer.get(bytes);
cmd.parameterBytes(bytes);
} else if (size != 0) {
throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN));
}
}
{
int size = totalLength - byteBuffer.position();
if (size > 0 && size <= BODY_MAX_LEN) {
byte[] bytes = new byte[size];
byteBuffer.get(bytes);
cmd.extraPayload(bytes);
} else if (size != 0) {
throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN));
}
}
return cmd;
}
}