/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.alibaba.dubbo.remoting.exchange.codec; | |
import com.alibaba.dubbo.common.Version; | |
import com.alibaba.dubbo.common.io.Bytes; | |
import com.alibaba.dubbo.common.io.StreamUtils; | |
import com.alibaba.dubbo.common.logger.Logger; | |
import com.alibaba.dubbo.common.logger.LoggerFactory; | |
import com.alibaba.dubbo.common.serialize.Cleanable; | |
import com.alibaba.dubbo.common.serialize.ObjectInput; | |
import com.alibaba.dubbo.common.serialize.ObjectOutput; | |
import com.alibaba.dubbo.common.serialize.Serialization; | |
import com.alibaba.dubbo.common.utils.StringUtils; | |
import com.alibaba.dubbo.remoting.Channel; | |
import com.alibaba.dubbo.remoting.RemotingException; | |
import com.alibaba.dubbo.remoting.buffer.ChannelBuffer; | |
import com.alibaba.dubbo.remoting.buffer.ChannelBufferInputStream; | |
import com.alibaba.dubbo.remoting.buffer.ChannelBufferOutputStream; | |
import com.alibaba.dubbo.remoting.exchange.Request; | |
import com.alibaba.dubbo.remoting.exchange.Response; | |
import com.alibaba.dubbo.remoting.exchange.support.DefaultFuture; | |
import com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec; | |
import com.alibaba.dubbo.remoting.transport.CodecSupport; | |
import com.alibaba.dubbo.remoting.transport.ExceedPayloadLimitException; | |
import java.io.IOException; | |
import java.io.InputStream; | |
/** | |
* ExchangeCodec. | |
* | |
* | |
* | |
*/ | |
public class ExchangeCodec extends TelnetCodec { | |
// header length. | |
protected static final int HEADER_LENGTH = 16; | |
// magic header. | |
protected static final short MAGIC = (short) 0xdabb; | |
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; | |
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; | |
// message flag. | |
protected static final byte FLAG_REQUEST = (byte) 0x80; | |
protected static final byte FLAG_TWOWAY = (byte) 0x40; | |
protected static final byte FLAG_EVENT = (byte) 0x20; | |
protected static final int SERIALIZATION_MASK = 0x1f; | |
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class); | |
public Short getMagicCode() { | |
return MAGIC; | |
} | |
@Override | |
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { | |
if (msg instanceof Request) { | |
encodeRequest(channel, buffer, (Request) msg); | |
} else if (msg instanceof Response) { | |
encodeResponse(channel, buffer, (Response) msg); | |
} else { | |
super.encode(channel, buffer, msg); | |
} | |
} | |
@Override | |
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { | |
int readable = buffer.readableBytes(); | |
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; | |
buffer.readBytes(header); | |
return decode(channel, buffer, readable, header); | |
} | |
@Override | |
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { | |
// check magic number. | |
if (readable > 0 && header[0] != MAGIC_HIGH | |
|| readable > 1 && header[1] != MAGIC_LOW) { | |
int length = header.length; | |
if (header.length < readable) { | |
header = Bytes.copyOf(header, readable); | |
buffer.readBytes(header, length, readable - length); | |
} | |
for (int i = 1; i < header.length - 1; i++) { | |
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { | |
buffer.readerIndex(buffer.readerIndex() - header.length + i); | |
header = Bytes.copyOf(header, i); | |
break; | |
} | |
} | |
return super.decode(channel, buffer, readable, header); | |
} | |
// check length. | |
if (readable < HEADER_LENGTH) { | |
return DecodeResult.NEED_MORE_INPUT; | |
} | |
// get data length. | |
int len = Bytes.bytes2int(header, 12); | |
checkPayload(channel, len); | |
int tt = len + HEADER_LENGTH; | |
if (readable < tt) { | |
return DecodeResult.NEED_MORE_INPUT; | |
} | |
// limit input stream. | |
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); | |
try { | |
return decodeBody(channel, is, header); | |
} finally { | |
if (is.available() > 0) { | |
try { | |
if (logger.isWarnEnabled()) { | |
logger.warn("Skip input stream " + is.available()); | |
} | |
StreamUtils.skipUnusedStream(is); | |
} catch (IOException e) { | |
logger.warn(e.getMessage(), e); | |
} | |
} | |
} | |
} | |
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { | |
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); | |
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); | |
ObjectInput in = s.deserialize(channel.getUrl(), is); | |
// get request id. | |
long id = Bytes.bytes2long(header, 4); | |
if ((flag & FLAG_REQUEST) == 0) { | |
// decode response. | |
Response res = new Response(id); | |
if ((flag & FLAG_EVENT) != 0) { | |
res.setEvent(Response.HEARTBEAT_EVENT); | |
} | |
// get status. | |
byte status = header[3]; | |
res.setStatus(status); | |
if (status == Response.OK) { | |
try { | |
Object data; | |
if (res.isHeartbeat()) { | |
data = decodeHeartbeatData(channel, in); | |
} else if (res.isEvent()) { | |
data = decodeEventData(channel, in); | |
} else { | |
data = decodeResponseData(channel, in, getRequestData(id)); | |
} | |
res.setResult(data); | |
} catch (Throwable t) { | |
res.setStatus(Response.CLIENT_ERROR); | |
res.setErrorMessage(StringUtils.toString(t)); | |
} | |
} else { | |
res.setErrorMessage(in.readUTF()); | |
} | |
return res; | |
} else { | |
// decode request. | |
Request req = new Request(id); | |
req.setVersion(Version.getProtocolVersion()); | |
req.setTwoWay((flag & FLAG_TWOWAY) != 0); | |
if ((flag & FLAG_EVENT) != 0) { | |
req.setEvent(Request.HEARTBEAT_EVENT); | |
} | |
try { | |
Object data; | |
if (req.isHeartbeat()) { | |
data = decodeHeartbeatData(channel, in); | |
} else if (req.isEvent()) { | |
data = decodeEventData(channel, in); | |
} else { | |
data = decodeRequestData(channel, in); | |
} | |
req.setData(data); | |
} catch (Throwable t) { | |
// bad request | |
req.setBroken(true); | |
req.setData(t); | |
} | |
return req; | |
} | |
} | |
protected Object getRequestData(long id) { | |
DefaultFuture future = DefaultFuture.getFuture(id); | |
if (future == null) | |
return null; | |
Request req = future.getRequest(); | |
if (req == null) | |
return null; | |
return req.getData(); | |
} | |
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { | |
Serialization serialization = getSerialization(channel); | |
// header. | |
byte[] header = new byte[HEADER_LENGTH]; | |
// set magic number. | |
Bytes.short2bytes(MAGIC, header); | |
// set request and serialization flag. | |
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); | |
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; | |
if (req.isEvent()) header[2] |= FLAG_EVENT; | |
// set request id. | |
Bytes.long2bytes(req.getId(), header, 4); | |
// encode request data. | |
int savedWriteIndex = buffer.writerIndex(); | |
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); | |
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); | |
ObjectOutput out = serialization.serialize(channel.getUrl(), bos); | |
if (req.isEvent()) { | |
encodeEventData(channel, out, req.getData()); | |
} else { | |
encodeRequestData(channel, out, req.getData(), req.getVersion()); | |
} | |
out.flushBuffer(); | |
if (out instanceof Cleanable) { | |
((Cleanable) out).cleanup(); | |
} | |
bos.flush(); | |
bos.close(); | |
int len = bos.writtenBytes(); | |
checkPayload(channel, len); | |
Bytes.int2bytes(len, header, 12); | |
// write | |
buffer.writerIndex(savedWriteIndex); | |
buffer.writeBytes(header); // write header. | |
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); | |
} | |
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { | |
int savedWriteIndex = buffer.writerIndex(); | |
try { | |
Serialization serialization = getSerialization(channel); | |
// header. | |
byte[] header = new byte[HEADER_LENGTH]; | |
// set magic number. | |
Bytes.short2bytes(MAGIC, header); | |
// set request and serialization flag. | |
header[2] = serialization.getContentTypeId(); | |
if (res.isHeartbeat()) header[2] |= FLAG_EVENT; | |
// set response status. | |
byte status = res.getStatus(); | |
header[3] = status; | |
// set request id. | |
Bytes.long2bytes(res.getId(), header, 4); | |
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); | |
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); | |
ObjectOutput out = serialization.serialize(channel.getUrl(), bos); | |
// encode response data or error message. | |
if (status == Response.OK) { | |
if (res.isHeartbeat()) { | |
encodeHeartbeatData(channel, out, res.getResult()); | |
} else { | |
encodeResponseData(channel, out, res.getResult(), res.getVersion()); | |
} | |
} else out.writeUTF(res.getErrorMessage()); | |
out.flushBuffer(); | |
if (out instanceof Cleanable) { | |
((Cleanable) out).cleanup(); | |
} | |
bos.flush(); | |
bos.close(); | |
int len = bos.writtenBytes(); | |
checkPayload(channel, len); | |
Bytes.int2bytes(len, header, 12); | |
// write | |
buffer.writerIndex(savedWriteIndex); | |
buffer.writeBytes(header); // write header. | |
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); | |
} catch (Throwable t) { | |
// clear buffer | |
buffer.writerIndex(savedWriteIndex); | |
// send error message to Consumer, otherwise, Consumer will wait till timeout. | |
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) { | |
Response r = new Response(res.getId(), res.getVersion()); | |
r.setStatus(Response.BAD_RESPONSE); | |
if (t instanceof ExceedPayloadLimitException) { | |
logger.warn(t.getMessage(), t); | |
try { | |
r.setErrorMessage(t.getMessage()); | |
channel.send(r); | |
return; | |
} catch (RemotingException e) { | |
logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e); | |
} | |
} else { | |
// FIXME log error message in Codec and handle in caught() of IoHanndler? | |
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t); | |
try { | |
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t)); | |
channel.send(r); | |
return; | |
} catch (RemotingException e) { | |
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e); | |
} | |
} | |
} | |
// Rethrow exception | |
if (t instanceof IOException) { | |
throw (IOException) t; | |
} else if (t instanceof RuntimeException) { | |
throw (RuntimeException) t; | |
} else if (t instanceof Error) { | |
throw (Error) t; | |
} else { | |
throw new RuntimeException(t.getMessage(), t); | |
} | |
} | |
} | |
@Override | |
protected Object decodeData(ObjectInput in) throws IOException { | |
return decodeRequestData(in); | |
} | |
@Deprecated | |
protected Object decodeHeartbeatData(ObjectInput in) throws IOException { | |
try { | |
return in.readObject(); | |
} catch (ClassNotFoundException e) { | |
throw new IOException(StringUtils.toString("Read object failed.", e)); | |
} | |
} | |
protected Object decodeRequestData(ObjectInput in) throws IOException { | |
try { | |
return in.readObject(); | |
} catch (ClassNotFoundException e) { | |
throw new IOException(StringUtils.toString("Read object failed.", e)); | |
} | |
} | |
protected Object decodeResponseData(ObjectInput in) throws IOException { | |
try { | |
return in.readObject(); | |
} catch (ClassNotFoundException e) { | |
throw new IOException(StringUtils.toString("Read object failed.", e)); | |
} | |
} | |
@Override | |
protected void encodeData(ObjectOutput out, Object data) throws IOException { | |
encodeRequestData(out, data); | |
} | |
private void encodeEventData(ObjectOutput out, Object data) throws IOException { | |
out.writeObject(data); | |
} | |
@Deprecated | |
protected void encodeHeartbeatData(ObjectOutput out, Object data) throws IOException { | |
encodeEventData(out, data); | |
} | |
protected void encodeRequestData(ObjectOutput out, Object data) throws IOException { | |
out.writeObject(data); | |
} | |
protected void encodeResponseData(ObjectOutput out, Object data) throws IOException { | |
out.writeObject(data); | |
} | |
@Override | |
protected Object decodeData(Channel channel, ObjectInput in) throws IOException { | |
return decodeRequestData(channel, in); | |
} | |
protected Object decodeEventData(Channel channel, ObjectInput in) throws IOException { | |
try { | |
return in.readObject(); | |
} catch (ClassNotFoundException e) { | |
throw new IOException(StringUtils.toString("Read object failed.", e)); | |
} | |
} | |
@Deprecated | |
protected Object decodeHeartbeatData(Channel channel, ObjectInput in) throws IOException { | |
try { | |
return in.readObject(); | |
} catch (ClassNotFoundException e) { | |
throw new IOException(StringUtils.toString("Read object failed.", e)); | |
} | |
} | |
protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException { | |
return decodeRequestData(in); | |
} | |
protected Object decodeResponseData(Channel channel, ObjectInput in) throws IOException { | |
return decodeResponseData(in); | |
} | |
protected Object decodeResponseData(Channel channel, ObjectInput in, Object requestData) throws IOException { | |
return decodeResponseData(channel, in); | |
} | |
@Override | |
protected void encodeData(Channel channel, ObjectOutput out, Object data) throws IOException { | |
encodeRequestData(channel, out, data); | |
} | |
private void encodeEventData(Channel channel, ObjectOutput out, Object data) throws IOException { | |
encodeEventData(out, data); | |
} | |
@Deprecated | |
protected void encodeHeartbeatData(Channel channel, ObjectOutput out, Object data) throws IOException { | |
encodeHeartbeatData(out, data); | |
} | |
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { | |
encodeRequestData(out, data); | |
} | |
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException { | |
encodeResponseData(out, data); | |
} | |
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { | |
encodeRequestData(out, data); | |
} | |
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { | |
encodeResponseData(out, data); | |
} | |
} |