| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.alibaba.dubbo.rpc.protocol.dubbo; |
| |
| import com.alibaba.dubbo.common.Constants; |
| import com.alibaba.dubbo.common.URL; |
| import com.alibaba.dubbo.common.Version; |
| import com.alibaba.dubbo.common.io.Bytes; |
| import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream; |
| import com.alibaba.dubbo.common.logger.Logger; |
| import com.alibaba.dubbo.common.logger.LoggerFactory; |
| import com.alibaba.dubbo.common.serialize.ObjectInput; |
| import com.alibaba.dubbo.common.serialize.ObjectOutput; |
| import com.alibaba.dubbo.common.serialize.Serialization; |
| import com.alibaba.dubbo.common.utils.ReflectUtils; |
| import com.alibaba.dubbo.common.utils.StringUtils; |
| import com.alibaba.dubbo.remoting.Channel; |
| import com.alibaba.dubbo.remoting.Codec2; |
| import com.alibaba.dubbo.remoting.exchange.Request; |
| import com.alibaba.dubbo.remoting.exchange.Response; |
| import com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec; |
| import com.alibaba.dubbo.remoting.transport.CodecSupport; |
| import com.alibaba.dubbo.rpc.Invocation; |
| import com.alibaba.dubbo.rpc.Result; |
| import com.alibaba.dubbo.rpc.RpcInvocation; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| |
| import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument; |
| |
| /** |
| * Dubbo codec. |
| */ |
| public class DubboCodec extends ExchangeCodec implements Codec2 { |
| |
| public static final String NAME = "dubbo"; |
| public static final String DUBBO_VERSION = Version.getProtocolVersion(); |
| public static final byte RESPONSE_WITH_EXCEPTION = 0; |
| public static final byte RESPONSE_VALUE = 1; |
| public static final byte RESPONSE_NULL_VALUE = 2; |
| public static final byte RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS = 3; |
| public static final byte RESPONSE_VALUE_WITH_ATTACHMENTS = 4; |
| public static final byte RESPONSE_NULL_VALUE_WITH_ATTACHMENTS = 5; |
| public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; |
| public static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0]; |
| private static final Logger log = LoggerFactory.getLogger(DubboCodec.class); |
| |
| @Override |
| protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { |
| byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); |
| Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); |
| // get request id. |
| long id = Bytes.bytes2long(header, 4); |
| if ((flag & FLAG_REQUEST) == 0) { |
| // decode response. |
| Response res = new Response(id); |
| if ((flag & FLAG_EVENT) != 0) { |
| res.setEvent(Response.HEARTBEAT_EVENT); |
| } |
| // get status. |
| byte status = header[3]; |
| res.setStatus(status); |
| if (status == Response.OK) { |
| try { |
| Object data; |
| if (res.isHeartbeat()) { |
| data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); |
| } else if (res.isEvent()) { |
| data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); |
| } else { |
| DecodeableRpcResult result; |
| if (channel.getUrl().getParameter( |
| Constants.DECODE_IN_IO_THREAD_KEY, |
| Constants.DEFAULT_DECODE_IN_IO_THREAD)) { |
| result = new DecodeableRpcResult(channel, res, is, |
| (Invocation) getRequestData(id), proto); |
| result.decode(); |
| } else { |
| result = new DecodeableRpcResult(channel, res, |
| new UnsafeByteArrayInputStream(readMessageData(is)), |
| (Invocation) getRequestData(id), proto); |
| } |
| data = result; |
| } |
| res.setResult(data); |
| } catch (Throwable t) { |
| if (log.isWarnEnabled()) { |
| log.warn("Decode response failed: " + t.getMessage(), t); |
| } |
| res.setStatus(Response.CLIENT_ERROR); |
| res.setErrorMessage(StringUtils.toString(t)); |
| } |
| } else { |
| res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); |
| } |
| return res; |
| } else { |
| // decode request. |
| Request req = new Request(id); |
| req.setVersion(Version.getProtocolVersion()); |
| req.setTwoWay((flag & FLAG_TWOWAY) != 0); |
| if ((flag & FLAG_EVENT) != 0) { |
| req.setEvent(Request.HEARTBEAT_EVENT); |
| } |
| try { |
| Object data; |
| if (req.isHeartbeat()) { |
| data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); |
| } else if (req.isEvent()) { |
| data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); |
| } else { |
| DecodeableRpcInvocation inv; |
| if (channel.getUrl().getParameter( |
| Constants.DECODE_IN_IO_THREAD_KEY, |
| Constants.DEFAULT_DECODE_IN_IO_THREAD)) { |
| inv = new DecodeableRpcInvocation(channel, req, is, proto); |
| inv.decode(); |
| } else { |
| inv = new DecodeableRpcInvocation(channel, req, |
| new UnsafeByteArrayInputStream(readMessageData(is)), proto); |
| } |
| data = inv; |
| } |
| req.setData(data); |
| } catch (Throwable t) { |
| if (log.isWarnEnabled()) { |
| log.warn("Decode request failed: " + t.getMessage(), t); |
| } |
| // bad request |
| req.setBroken(true); |
| req.setData(t); |
| } |
| return req; |
| } |
| } |
| |
| private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) |
| throws IOException { |
| return serialization.deserialize(url, is); |
| } |
| |
| private byte[] readMessageData(InputStream is) throws IOException { |
| if (is.available() > 0) { |
| byte[] result = new byte[is.available()]; |
| is.read(result); |
| return result; |
| } |
| return new byte[]{}; |
| } |
| |
| @Override |
| protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { |
| encodeRequestData(channel, out, data, DUBBO_VERSION); |
| } |
| |
| @Override |
| protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException { |
| encodeResponseData(channel, out, data, DUBBO_VERSION); |
| } |
| |
| @Override |
| protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { |
| RpcInvocation inv = (RpcInvocation) data; |
| |
| out.writeUTF(version); |
| out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); |
| out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); |
| |
| out.writeUTF(inv.getMethodName()); |
| out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); |
| Object[] args = inv.getArguments(); |
| if (args != null) |
| for (int i = 0; i < args.length; i++) { |
| out.writeObject(encodeInvocationArgument(channel, inv, i)); |
| } |
| out.writeObject(inv.getAttachments()); |
| } |
| |
| @Override |
| protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { |
| Result result = (Result) data; |
| // currently, the version value in Response records the version of Request |
| boolean attach = Version.isSupportResponseAttatchment(version); |
| Throwable th = result.getException(); |
| if (th == null) { |
| Object ret = result.getValue(); |
| if (ret == null) { |
| out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); |
| } else { |
| out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); |
| out.writeObject(ret); |
| } |
| } else { |
| out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); |
| out.writeObject(th); |
| } |
| |
| if (attach) { |
| // returns current version of Response to consumer side. |
| result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); |
| out.writeObject(result.getAttachments()); |
| } |
| } |
| } |