| <?php |
| namespace com\fenqile\fsof\common\protocol\fsof; |
| |
| use com\fenqile\fsof\consumer\Type; |
| use Icecave\Flax\Serialization\Encoder; |
| use Icecave\Flax\DubboParser as Decoder; |
| use com\fenqile\fsof\consumer\ConsumerException; |
| |
| /** |
| * |
| * Dubbo网络协议 |
| * +------------------------------------------------------------------------------------------+ |
| * | 包头(二进制数据 16bit) | 包体 | |
| * +-----------------------------------------------------------------------------------------+ |
| * | 版本号 | 命令&serialize | 空白 | 包序号| 长度 | 数据 | |
| * +-----------------------------------------------------------------------------------------+ |
| * | magic(2) | cmd&serialize(1)|(1) |sn(8) | len(4) | data(N)| |
| * +-----------------------------------------------------------------------------------------+ |
| * |
| * magic:协议包起始标识, 0xdabb |
| * -------------------------------------------------------------------------------------------- |
| * cmd:命令类型:FLAG_REQUEST为0x80, FLAG_TWOWAY为0x40, FLAG_EVENT为0x20 |
| * serialize:序列化方案编号:与cmd共用一个字节,采用json,对应dubbo中编号为6 |
| * -------------------------------------------------------------------------------------------- |
| * sn:请求序号,consumer会为每个请求编制一个进程内唯一序号 |
| * ,provider处理完请求后在返回的数据包中会携带该sn号,供consumer判断当前的数据是对应哪个请求 |
| * --------------------------------------------------------------------------------------------* |
| * len:数据报文长度 |
| * -------------------------------------------------------------------------------------------- |
| * data:数据报文,目前采用json进行序列化 |
| * -------------------------------------------------------------------------------------------- |
| */ |
| class DubboParser |
| { |
| //dubbo协议基本信息 |
| const PACKAGE_HEDA_LEN = 16; |
| const MAX_RECV_LEN = 1048576;//1024*1024; |
| const RESPONSE_TCP_SEGMENT_LEN = 1048576;//1*1024*1024; |
| |
| //fsof协议ver字段,ver字段既指示协议版本信息,也作为magic使用 |
| const DUBBO_PROTOCOL_MAGIC = 0xdabb; |
| |
| //serialize 方案编号 |
| const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6; //fastjson serialization code |
| |
| const DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 = 2; //hessian2 serialization code |
| |
| const DUBBO_PROTOCOL_NAME_MAP_CODE = [ |
| 'hessian2' => self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2, |
| 'fastjson' => self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON |
| ]; |
| |
| |
| //fsof协议包头cmd字段含义 |
| const FLAG_REQUEST = 0x80; //request |
| const FLAG_TWOWAY = 0x40; //two_way |
| const FLAG_HEARTBEAT_EVENT = 0x20; //heart_event |
| const SERIALIZATION_MASK = 0x1f; //serialization_mask |
| |
| const UPPER_MASK = 0xffffffff00000000; |
| const LOWER_MASK = 0x00000000ffffffff; |
| |
| const RESPONSE_WITH_EXCEPTION = 0; |
| const RESPONSE_VALUE = 1; |
| const RESPONSE_NULL_VALUE = 2; |
| |
| private $logger; |
| |
| public function __construct() |
| { |
| $this->logger = \Logger::getLogger(__CLASS__); |
| } |
| |
| public function packRequest(DubboRequest $request) |
| { |
| if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == (self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]??null)) { |
| $reqData = $this->buildBodyForHessian2($request); |
| $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2; |
| } else { |
| $reqData = $this->buildBodyForFastJson($request); |
| $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON; |
| } |
| $upper = ($request->getSn() & self::UPPER_MASK) >> 32; |
| $lower = $request->getSn() & self::LOWER_MASK; |
| $flag = (self::FLAG_REQUEST | $serialize_type); |
| if ($request->isTwoWay()) $flag |= self::FLAG_TWOWAY; |
| if ($request->isHeartbeatEvent()) $flag |= self::FLAG_HEARTBEAT_EVENT; |
| $out = pack("n1C1a1N1N1N1", |
| self::DUBBO_PROTOCOL_MAGIC, |
| $flag, |
| "", |
| $upper, |
| $lower, |
| strlen($reqData)); |
| return $out . $reqData; |
| } |
| |
| public function buildBodyForFastJson(DubboRequest $request) |
| { |
| $reqData = json_encode($request->getDubboVersion()) . PHP_EOL . |
| json_encode($request->getService()) . PHP_EOL; |
| if ($request->getVersion()) { |
| $reqData .= json_encode($request->getVersion()) . PHP_EOL; |
| } else { |
| $reqData .= '""' . PHP_EOL; |
| } |
| $reqData .= json_encode($request->getMethod()) . PHP_EOL; |
| $reqData .= json_encode($this->typeRefs($request)) . PHP_EOL; |
| foreach ($request->getParams() as $value) { |
| $reqData .= json_encode($value) . PHP_EOL; |
| } |
| $attach = array(); |
| $attach['path'] = $request->getService(); |
| $attach['interface'] = $request->getService(); |
| if ($request->getGroup()) { |
| $attach['group'] = $request->getGroup(); |
| } |
| if ($request->getVersion()) { |
| $attach['version'] = $request->getVersion(); |
| } |
| $attach['timeout'] = $request->getTimeout(); |
| $request->setAttach($attach); |
| $reqData .= json_encode($request->getAttach()); |
| return $reqData; |
| |
| } |
| |
| public function buildBodyForHessian2(DubboRequest $request) |
| { |
| $encode = new Encoder(); |
| $reqData = ''; |
| $reqData .= $encode->encode($request->getDubboVersion()); |
| $reqData .= $encode->encode($request->getService()); |
| if ($request->getVersion()) { |
| $reqData .= $encode->encode($request->getVersion()); |
| } else { |
| $reqData .= $encode->encode(''); |
| } |
| $reqData .= $encode->encode($request->getMethod()); |
| $reqData .= $encode->encode($this->typeRefs($request)); |
| foreach ($request->getParams() as $value) { |
| $reqData .= $encode->encode($value); |
| } |
| $attach = ['path' => $request->getService(), 'interface' => $request->getService(), 'timeout' => $request->getTimeout()]; |
| if ($request->getGroup()) { |
| $attach['group'] = $request->getGroup(); |
| } |
| if ($request->getVersion()) { |
| $attach['version'] = $request->getVersion(); |
| } |
| $reqData .= $encode->encode($attach); |
| return $reqData; |
| } |
| |
| private function typeRefs(DubboRequest $request) |
| { |
| $typeRefs = ''; |
| foreach ($request->getTypes() as $type) { |
| $typeRefs .= $type; |
| } |
| return $typeRefs; |
| } |
| |
| |
| public function parseResponseHeader(DubboResponse $response) |
| { |
| $res_header = substr($response->getFullData(), 0, self::PACKAGE_HEDA_LEN); |
| $format = 'n1magic/C1flag/C1status/N1upper/N1lower/N1len'; |
| $_arr = unpack($format, $res_header); |
| $response->setStatus($_arr['status']); |
| $response->setSn($_arr["upper"] << 32 | $_arr["lower"]); |
| $flag = $_arr["flag"]; |
| if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) { |
| $response->setHeartbeatEvent(true); |
| } |
| $response->setSerialization($flag & self::SERIALIZATION_MASK); |
| $response->setLen($_arr["len"]); |
| return $response; |
| } |
| |
| public function parseResponseBody(DubboResponse $response) |
| { |
| if (DubboResponse::OK == $response->getStatus()) { |
| if (self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON == $response->getSerialization()) { |
| $this->parseResponseBodyForFastJson($response); |
| } else if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == $response->getSerialization()) { |
| $this->parseResponseBodyForHessian2($response); |
| } else { |
| throw new ConsumerException(sprintf('返回的序列化类型:(%s), 不支持解析!', $response->getSerialization())); |
| } |
| } else { |
| throw new ConsumerException($response->getFullData()); |
| } |
| return $response; |
| } |
| |
| private function parseResponseBodyForFastJson(DubboResponse $response) |
| { |
| $_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN); |
| $response->setResponseBody($_data); |
| list($status, $content) = explode(PHP_EOL, $_data); |
| if ($response->isHeartbeatEvent()) { |
| $response->setResult(json_decode($status, true)); |
| } else { |
| switch ($status) { |
| case self::RESPONSE_NULL_VALUE: |
| break; |
| case self::RESPONSE_VALUE: |
| $response->setResult(json_decode($content, true)); |
| break; |
| case self::RESPONSE_WITH_EXCEPTION: |
| $exception = json_decode($content, true); |
| if (is_array($exception) && array_key_exists('message', $exception)) { |
| throw new ConsumerException($exception['message']); |
| } else if (is_string($exception)) { |
| throw new ConsumerException($exception); |
| } else { |
| throw new ConsumerException("provider occur error"); |
| } |
| break; |
| default: |
| return false; |
| } |
| } |
| return $response; |
| } |
| |
| private function parseResponseBodyForHessian2(DubboResponse $response) |
| { |
| if (!$response->isHeartbeatEvent()) { |
| $_data = $response->getFullData(); |
| $decode = new Decoder($_data); |
| $content = $decode->getData($_data); |
| $response->setResult($content); |
| } |
| return $response; |
| } |
| |
| |
| public function parseRequestHeader(DubboRequest &$request) |
| { |
| $_data = substr($request->getFullData(), 0, self::PACKAGE_HEDA_LEN); |
| $format = 'n1magic/C1flag/C1blank/N1upper/N1lower/N1len'; |
| $_arr = unpack($format, $_data); |
| $flag = $_arr['flag']; |
| $request->setTwoWay(($flag & self::FLAG_TWOWAY) != 0); |
| if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) { |
| $request->setHeartbeatEvent(true); |
| } |
| $request->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON); |
| $request->setSn($_arr['upper'] << 32 | $_arr['lower']); |
| $request->setDataLen($_arr['len']); |
| $request->setRequestLen($request->getDataLen() + self::PACKAGE_HEDA_LEN); |
| return $request; |
| } |
| |
| public function parseRequestBody(DubboRequest &$request) |
| { |
| if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON) { |
| $this->logger->error("unknown serialization type :" . $request->getSerialization()); |
| return false; |
| } |
| $cliData = substr($request->getFullData(), self::PACKAGE_HEDA_LEN); |
| if ($cliData) { |
| if ($request->isHeartbeatEvent()) { |
| //心跳请求,不需要数据回送 |
| } else { |
| $dataArr = explode(PHP_EOL, $cliData); |
| $request->setDubboVersion(json_decode($dataArr[0], true)); |
| $request->setService(json_decode($dataArr[1], true)); |
| $request->setVersion(json_decode($dataArr[2], true)); |
| $methodName = json_decode($dataArr[3], true); |
| if ($methodName == "\$invoke") { |
| //泛化调用 |
| $request->setMethod(json_decode($dataArr[5], true)); |
| $request->setParams(json_decode($dataArr[7], true)); |
| $attach = json_decode($dataArr[8], true); |
| } else { |
| //非泛化调用 |
| $request->setMethod($methodName); |
| $paramTypes = json_decode($dataArr[4], true); |
| if ($paramTypes == "") { |
| //调用没有参数的方法 |
| $request->setTypes(NULL); |
| $request->setParams(NULL); |
| $attach = json_decode($dataArr[5], true); |
| } else { |
| $typeArr = explode(";", $paramTypes); |
| $typeArrLen = count($typeArr); |
| $request->setParamNum($typeArrLen - 1); |
| $params = array(); |
| for ($i = 0; $i < $typeArrLen - 1; $i++) { |
| $params[$i] = json_decode($dataArr[5 + $i], true); |
| } |
| $request->setParams($params); |
| $attach = json_decode($dataArr[5 + ($typeArrLen - 1)], true); |
| } |
| } |
| $request->setAttach($attach); |
| if (array_key_exists('group', $attach)) { |
| $request->setGroup($attach['group']); |
| } |
| return $request; |
| } |
| } |
| return false; |
| } |
| |
| |
| public function packResponse(DubboResponse &$response) |
| { |
| if ($response->getStatus() != DubboResponse::OK) { |
| $resData = json_encode($response->getErrorMsg()); |
| } else { |
| if ($response->getErrorMsg() != NULL && $response->getErrorMsg() != "") { |
| $resData = json_encode(self::RESPONSE_WITH_EXCEPTION) . PHP_EOL . json_encode($response->getErrorMsg()); |
| } else if ($response->getResult() == NULL) { |
| $resData = json_encode(self::RESPONSE_NULL_VALUE); |
| } else { |
| $resData = json_encode(self::RESPONSE_VALUE) . PHP_EOL . json_encode($response->getResult()); |
| } |
| } |
| $resData = $resData . PHP_EOL; |
| $upper = ($response->getSn() & self::UPPER_MASK) >> 32; |
| $lower = $response->getSn() & self::LOWER_MASK; |
| $flag = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON; |
| if ($response->isHeartbeatEvent()) { |
| $flag |= self::FLAG_HEARTBEAT_EVENT; |
| } |
| $out = pack("n1C1C1N1N1N1", |
| self::DUBBO_PROTOCOL_MAGIC, |
| $flag, |
| $response->getStatus(), |
| $upper, |
| $lower, |
| strlen($resData)); |
| |
| return $out . $resData; |
| } |
| |
| public function isNormalResponse(DubboResponse $response) |
| { |
| return !($response->isHeartbeatEvent()); |
| } |
| |
| public function isNormalRequest(DubboRequest $request) |
| { |
| return !($request->isHeartbeatEvent()); |
| } |
| |
| public function isOneWayRequest(DubboRequest $request) |
| { |
| return !($request->isTwoWay()); |
| } |
| |
| public function isHearBeatRequest(DubboRequest $request) |
| { |
| return $request->isHeartbeatEvent(); |
| } |
| |
| public function isHearBeatResponse(DubboResponse $response) |
| { |
| return $response->isHeartbeatEvent(); |
| } |
| |
| public static function getReqInQueueTime(DubboRequest $request) |
| { |
| $ret = 0; |
| if (!empty($request->reqInfo)) { |
| $ret = isset($request->reqInfo['inqueue_time']) ? $request->reqInfo['inqueue_time'] : 0; |
| } |
| return $ret; |
| } |
| } |