Merge pull request #22 from crazyxman/dev
新增hessian序列化及其他功能,功能点已列到评论处
diff --git a/agent/src/c/Makefile b/agent/src/c/Makefile
index a228884..274f73a 100644
--- a/agent/src/c/Makefile
+++ b/agent/src/c/Makefile
@@ -1,5 +1,5 @@
TARGET = release/agent
-CFLAGS = -g -Wall -lpthread -lzookeeper_mt -L./lib -lhiredis -lzlog -I./include
+CFLAGS = -g -Wall -lpthread -lzookeeper_mt -L./lib -lhiredis -lzlog -I./include
CC = gcc
$(TARGET):service/fsof_mq.c src/fsof_main.c service/fsof_zookeeper.c common/strmap.c service/fsof_redis.c service/fsof_log.c common/fsof_url.c common/fsof_util.c service/mt_array.c
$(CC) $^ $(CFLAGS) -o $@
diff --git a/agent/src/c/release/agent b/agent/src/c/release/agent
new file mode 100755
index 0000000..b6ff71d
--- /dev/null
+++ b/agent/src/c/release/agent
Binary files differ
diff --git a/agent/src/c/release/agent_server b/agent/src/c/release/agent_server
new file mode 100755
index 0000000..b6ff71d
--- /dev/null
+++ b/agent/src/c/release/agent_server
Binary files differ
diff --git a/common/config/FSOFConstants.php b/common/config/FSOFConstants.php
index 0b38d0c..aa880da 100644
--- a/common/config/FSOFConstants.php
+++ b/common/config/FSOFConstants.php
@@ -29,6 +29,13 @@
//monitor定时监控时间,暂定5分钟
const FSOF_MONITOR_TIMER = 300000;
- //所使用的redis端口号
+ //默认所使用的redis端口号
const FSOF_SERVICE_REDIS_PORT =6379;
+ //默认所使用的redis地址
+ const FSOF_SERVICE_REDIS_HOST ='127.0.0.1';
+
+ const FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP = 'TCP';
+
+ const FSOF_SERVICE_REDIS_CONNECT_TYPE_SOCK = 'SOCK';
+
}
\ No newline at end of file
diff --git a/common/file/FSOFRedis.php b/common/file/FSOFRedis.php
index 9ec56a1..42a162b 100644
--- a/common/file/FSOFRedis.php
+++ b/common/file/FSOFRedis.php
@@ -22,21 +22,33 @@
class FSOFRedis
{
- const REDIS_TIME_OUT = 1;
private static $_instance;
private $m_redis = null;
private $logger;
+
+ private $connect_timeout = 1;
+
+ private $read_timeout = 2;
+
+ private $retry_count = 1;
+
+ private $connect_type = FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP;
+
+ private $hosts = [
+ [FSOFConstants::FSOF_SERVICE_REDIS_HOST, FSOFConstants::FSOF_SERVICE_REDIS_PORT],
+ ];
- public static function instance()
+ public static function instance($config = [])
{
if (extension_loaded('redis'))
{
if (!isset(FSOFRedis::$_instance))
{
- FSOFRedis::$_instance = new FSOFRedis();
+ FSOFRedis::$_instance = new FSOFRedis($config);
+ FSOFRedis::$_instance->get_redis();
}
return FSOFRedis::$_instance;
}
@@ -47,33 +59,68 @@
return NULL;
}
- public function __construct()
+ public function __construct($config = [])
{
$this->logger = \Logger::getLogger(__CLASS__);
- $this->get_redis();
+ if(isset($config['redis_hosts']))
+ {
+ $this->hosts = [];
+ $address = explode(',', $config['redis_hosts']);
+ foreach ($address as $node){
+ list($host, $port) = explode(':', $node);
+ $this->hosts[] = [$host, $port??FSOFConstants::FSOF_SERVICE_REDIS_PORT];
+ }
+ }
+ if(isset($config['redis_connect_timeout']))
+ {
+ $this->connect_timeout = $config['redis_connect_timeout'];
+ }
+ if(isset($config['redis_read_timeout']))
+ {
+ $this->read_timeout = $config['redis_read_timeout'];
+ }
+ if(isset($config['redis_connect_type']))
+ {
+ $this->connect_type = $config['redis_connect_type'];
+ }
+ if(isset($config['redis_retry_count']))
+ {
+ $this->retry = min($config['redis_retry_count'], 1);
+ }
}
public function get_redis()
{
if (!isset($this->m_redis))
{
- try
- {
- $redis_cli = new \Redis();
- $ret = $redis_cli->connect("127.0.0.1",FSOFConstants::FSOF_SERVICE_REDIS_PORT,self::REDIS_TIME_OUT);
- if (!$ret)
- {
- $this->logger->warn("connect redis failed[127.0.0.1:6379]");
- $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,self::REDIS_TIME_OUT);
- }
- }
- catch (\Exception $e)
- {
- $ret = false;
- $this->logger->error('connect redis excepiton:'.$e->getMessage().', errno:' . $e->getCode());
- throw new \Exception($e->getMessage());
- }
-
+ $hosts_count = count($this->hosts);
+ $retry = $this->retry_count;
+ $rand_num = rand() % $hosts_count;
+ $ret = false;
+ do{
+ try{
+ $redis_cli = new \Redis();
+ if($this->connect_type == FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP)
+ {
+ $node = $this->hosts[$rand_num];
+ $ret = $redis_cli->connect($node[0],$node[1],$this->connect_timeout);
+ $redis_cli->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout);
+ $rand_num = ($rand_num + 1)%$hosts_count;
+ if (!$ret)
+ {
+ $this->logger->warn("connect redis failed[{$node[0]}:{$node[1]}]");
+ }
+ }else{
+ $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,FSOFConstants::FSOF_SERVICE_REDIS_PORT,$this->connect_timeout);
+ }
+ if($ret)
+ {
+ break;
+ }
+ }catch (\Exception $e){
+ $this->logger->error('connect redis excepiton:'.$e->getMessage().', errno:' . $e->getCode());
+ }
+ }while($retry-- > 0);
if($ret)
{
$this->m_redis = $redis_cli;
@@ -81,7 +128,7 @@
else
{
$this->logger->error('connect redis failed:|errno:' . $redis_cli->getLastError());
- throw new \Exception("连接本地redis异常");
+ throw new \Exception("连接redis异常");
}
}
@@ -120,27 +167,22 @@
public function getlist($key)
{
- $ret = NULL;
- if (!empty($key))
+ if (!empty($key) && isset($this->m_redis))
{
- try
- {
- if(!isset($this->m_redis))
- {
- $this->get_redis();
- }
- $ret = $this->getlRange($key);
- }
- catch (\Exception $e)
- {
+ try{
+ return $this->getlRange($key);
+ }catch (\Exception $e){
$this->logger->warn('redis current connect excepiton'.' |errcode:'.$e->getCode().' |errmsg:'.$e->getMessage());
- $this->close();
- //重试一次
- $this->get_redis();
- $ret = $this->getlRange($key);
- }
- }
- return $ret;
+ $this->close();
+ //重试一次防止连接成功后,连接断开
+ $this->get_redis();
+ return $this->getlRange($key);
+ }
+ }
+ else
+ {
+ return null;
+ }
}
public function set($key, $value)
diff --git a/common/protocol/fsof/DubboParser.php b/common/protocol/fsof/DubboParser.php
index 0aee201..0497f5f 100644
--- a/common/protocol/fsof/DubboParser.php
+++ b/common/protocol/fsof/DubboParser.php
@@ -1,6 +1,11 @@
<?php
namespace com\fenqile\fsof\common\protocol\fsof;
+use com\fenqile\fsof\consumer\Type;
+use Icecave\Flax\Serialization\Encoder;
+use Icecave\Flax\DubboParser as Decoder;
+use com\fenqile\fsof\consumer\ConsumerException;
+
/**
*
* Dubbo网络协议
@@ -36,7 +41,14 @@
const DUBBO_PROTOCOL_MAGIC = 0xdabb;
//serialize 方案编号
- const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6; //serialization code
+ const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6; //fastjson serialization code
+
+ const DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 = 2; //hessian2 serialization code
+
+ const DUBBO_PROTOCOL_NAME_MAP_CODE = [
+ 'hessian2' => self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2,
+ 'fastjson' => self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON
+ ];
//fsof协议包头cmd字段含义
@@ -54,37 +66,23 @@
private $logger;
- public function __construct()
+ public function __construct()
{
$this->logger = \Logger::getLogger(__CLASS__);
}
- public function packRequest(DubboRequest &$request)
+ public function packRequest(DubboRequest $request)
{
- $reqData = json_encode($request->getDubboVersion()) . PHP_EOL .
- json_encode($request->getService()) . PHP_EOL .
- json_encode($request->getVersion()) . PHP_EOL .
- json_encode($request->getMethod()) . PHP_EOL ;
- $typeStr = "";
- for($i=0;$i < count($request->getTypes());$i++){
- $typeStr = $typeStr.$request->getTypes()[$i];
+ if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == (self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]??null)) {
+ $reqData = $this->buildBodyForHessian2($request);
+ $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2;
+ } else {
+ $reqData = $this->buildBodyForFastJson($request);
+ $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON;
}
- $reqData = $reqData.json_encode($typeStr) . PHP_EOL;
- for ($x = 0; $x < count($request->getParams()); $x++) {
- $reqData = $reqData . json_encode($request->getParams()[$x]) . PHP_EOL;
- }
- $attach = array();
- $attach['path'] = $request->getService();
- $attach['interface'] = $request->getService();
- $attach['group'] = $request->getGroup();
- $attach['timeout'] = $request->getTimeout();
- $attach['version'] = $request->getVersion();
- $request->setAttach($attach);
- $reqData = $reqData . json_encode($request->getAttach());
-
$upper = ($request->getSn() & self::UPPER_MASK) >> 32;
$lower = $request->getSn() & self::LOWER_MASK;
- $flag = (self::FLAG_REQUEST | self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
+ $flag = (self::FLAG_REQUEST | $serialize_type);
if ($request->isTwoWay()) $flag |= self::FLAG_TWOWAY;
if ($request->isHeartbeatEvent()) $flag |= self::FLAG_HEARTBEAT_EVENT;
$out = pack("n1C1a1N1N1N1",
@@ -97,8 +95,74 @@
return $out . $reqData;
}
+ public function buildBodyForFastJson(DubboRequest $request)
+ {
+ $reqData = json_encode($request->getDubboVersion()) . PHP_EOL .
+ json_encode($request->getService()) . PHP_EOL;
+ if ($request->getVersion()) {
+ $reqData .= json_encode($request->getVersion()) . PHP_EOL;
+ } else {
+ $reqData .= '""' . PHP_EOL;
+ }
+ $reqData .= json_encode($request->getMethod()) . PHP_EOL;
+ $reqData .= json_encode($this->typeRefs($request)) . PHP_EOL;
+ foreach ($request->getParams() as $value) {
+ $reqData .= json_encode($value) . PHP_EOL;
+ }
+ $attach = array();
+ $attach['path'] = $request->getService();
+ $attach['interface'] = $request->getService();
+ if ($request->getGroup()) {
+ $attach['group'] = $request->getGroup();
+ }
+ if ($request->getVersion()) {
+ $attach['version'] = $request->getVersion();
+ }
+ $attach['timeout'] = $request->getTimeout();
+ $request->setAttach($attach);
+ $reqData .= json_encode($request->getAttach());
+ return $reqData;
- public function parseResponseHeader(DubboResponse &$response)
+ }
+
+ public function buildBodyForHessian2(DubboRequest $request)
+ {
+ $encode = new Encoder();
+ $reqData = '';
+ $reqData .= $encode->encode($request->getDubboVersion());
+ $reqData .= $encode->encode($request->getService());
+ if ($request->getVersion()) {
+ $reqData .= $encode->encode($request->getVersion());
+ } else {
+ $reqData .= $encode->encode('');
+ }
+ $reqData .= $encode->encode($request->getMethod());
+ $reqData .= $encode->encode($this->typeRefs($request));
+ foreach ($request->getParams() as $value) {
+ $reqData .= $encode->encode($value);
+ }
+ $attach = ['path' => $request->getService(), 'interface' => $request->getService(), 'timeout' => $request->getTimeout()];
+ if ($request->getGroup()) {
+ $attach['group'] = $request->getGroup();
+ }
+ if ($request->getVersion()) {
+ $attach['version'] = $request->getVersion();
+ }
+ $reqData .= $encode->encode($attach);
+ return $reqData;
+ }
+
+ private function typeRefs(DubboRequest $request)
+ {
+ $typeRefs = '';
+ foreach ($request->getTypes() as $type) {
+ $typeRefs .= $type;
+ }
+ return $typeRefs;
+ }
+
+
+ public function parseResponseHeader(DubboResponse $response)
{
$res_header = substr($response->getFullData(), 0, self::PACKAGE_HEDA_LEN);
$format = 'n1magic/C1flag/C1status/N1upper/N1lower/N1len';
@@ -106,55 +170,70 @@
$response->setStatus($_arr['status']);
$response->setSn($_arr["upper"] << 32 | $_arr["lower"]);
$flag = $_arr["flag"];
- if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
- {
+ if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
$response->setHeartbeatEvent(true);
}
- $response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
+ $response->setSerialization($flag & self::SERIALIZATION_MASK);
$response->setLen($_arr["len"]);
return $response;
}
- public function parseResponseBody(DubboResponse &$response)
+ public function parseResponseBody(DubboResponse $response)
+ {
+ if (DubboResponse::OK == $response->getStatus()) {
+ if (self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON == $response->getSerialization()) {
+ $this->parseResponseBodyForFastJson($response);
+ } else if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == $response->getSerialization()) {
+ $this->parseResponseBodyForHessian2($response);
+ } else {
+ throw new ConsumerException(sprintf('返回的序列化类型:(%s), 不支持解析!', $response->getSerialization()));
+ }
+ } else {
+ throw new ConsumerException($response->getFullData());
+ }
+ return $response;
+ }
+
+ private function parseResponseBodyForFastJson(DubboResponse $response)
{
$_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN);
$response->setResponseBody($_data);
- $dataArr = array();
- if ($_data){
- $dataArr = explode(PHP_EOL,$_data);
- }
- if ($response->getStatus() == DubboResponse::OK)
- {
- if ($response->isHeartbeatEvent())
- {
- $response->setResult(json_decode($dataArr[0], true));
- } else
- {
- switch ($dataArr[0])
- {
- case self::RESPONSE_NULL_VALUE:
- break;
- case self::RESPONSE_VALUE:
- $response->setResult(json_decode($dataArr[1], true));
- break;
- case self::RESPONSE_WITH_EXCEPTION:
- $exception = json_decode($dataArr[1], true);
- if(is_array($exception) && array_key_exists('message', $exception)){
- throw new \Exception($exception['message']);
- }else if(is_string($exception)){
- throw new \Exception($exception);
- }else{
- throw new \Exception("provider occur error");
- }
- break;
- default:
- return false;
- }
- }
- return $response;
+ list($status, $content) = explode(PHP_EOL, $_data);
+ if ($response->isHeartbeatEvent()) {
+ $response->setResult(json_decode($status, true));
} else {
- throw new \Exception(json_decode($dataArr[0], true));
+ switch ($status) {
+ case self::RESPONSE_NULL_VALUE:
+ break;
+ case self::RESPONSE_VALUE:
+ $response->setResult(json_decode($content, true));
+ break;
+ case self::RESPONSE_WITH_EXCEPTION:
+ $exception = json_decode($content, true);
+ if (is_array($exception) && array_key_exists('message', $exception)) {
+ throw new ConsumerException($exception['message']);
+ } else if (is_string($exception)) {
+ throw new ConsumerException($exception);
+ } else {
+ throw new ConsumerException("provider occur error");
+ }
+ break;
+ default:
+ return false;
+ }
}
+ return $response;
+ }
+
+ private function parseResponseBodyForHessian2(DubboResponse $response)
+ {
+ if (!$response->isHeartbeatEvent()) {
+ $_data = $response->getFullData();
+ $decode = new Decoder($_data);
+ $content = $decode->getData($_data);
+ $response->setResult($content);
+ }
+ return $response;
}
@@ -165,8 +244,7 @@
$_arr = unpack($format, $_data);
$flag = $_arr['flag'];
$request->setTwoWay(($flag & self::FLAG_TWOWAY) != 0);
- if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
- {
+ if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
$request->setHeartbeatEvent(true);
}
$request->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
@@ -178,57 +256,48 @@
public function parseRequestBody(DubboRequest &$request)
{
- if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON)
- {
+ if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON) {
$this->logger->error("unknown serialization type :" . $request->getSerialization());
return false;
}
$cliData = substr($request->getFullData(), self::PACKAGE_HEDA_LEN);
- if ($cliData)
- {
- if ($request->isHeartbeatEvent())
- {
+ if ($cliData) {
+ if ($request->isHeartbeatEvent()) {
//心跳请求,不需要数据回送
- } else
- {
+ } else {
$dataArr = explode(PHP_EOL, $cliData);
$request->setDubboVersion(json_decode($dataArr[0], true));
$request->setService(json_decode($dataArr[1], true));
$request->setVersion(json_decode($dataArr[2], true));
- $methodName = json_decode($dataArr[3],true);
- if ($methodName == "\$invoke")
- {
+ $methodName = json_decode($dataArr[3], true);
+ if ($methodName == "\$invoke") {
//泛化调用
- $request->setMethod(json_decode($dataArr[5],true));
- $request->setParams(json_decode($dataArr[7],true));
- $attach = json_decode($dataArr[8],true);
- } else
- {
+ $request->setMethod(json_decode($dataArr[5], true));
+ $request->setParams(json_decode($dataArr[7], true));
+ $attach = json_decode($dataArr[8], true);
+ } else {
//非泛化调用
$request->setMethod($methodName);
$paramTypes = json_decode($dataArr[4], true);
- if ($paramTypes == "")
- {
+ if ($paramTypes == "") {
//调用没有参数的方法
$request->setTypes(NULL);
$request->setParams(NULL);
- $attach = json_decode($dataArr[5],true);
- } else
- {
+ $attach = json_decode($dataArr[5], true);
+ } else {
$typeArr = explode(";", $paramTypes);
$typeArrLen = count($typeArr);
$request->setParamNum($typeArrLen - 1);
$params = array();
for ($i = 0; $i < $typeArrLen - 1; $i++) {
- $params[$i] = json_decode($dataArr[5 + $i],true);
+ $params[$i] = json_decode($dataArr[5 + $i], true);
}
$request->setParams($params);
- $attach = json_decode($dataArr[5 + ($typeArrLen - 1)],true);
+ $attach = json_decode($dataArr[5 + ($typeArrLen - 1)], true);
}
}
$request->setAttach($attach);
- if (array_key_exists('group', $attach))
- {
+ if (array_key_exists('group', $attach)) {
$request->setGroup($attach['group']);
}
return $request;
@@ -241,17 +310,17 @@
public function packResponse(DubboResponse &$response)
{
if ($response->getStatus() != DubboResponse::OK) {
- $resData = json_encode($response->getErrorMsg()) ;
+ $resData = json_encode($response->getErrorMsg());
} else {
- if($response->getErrorMsg() != NULL && $response->getErrorMsg() != ""){
+ if ($response->getErrorMsg() != NULL && $response->getErrorMsg() != "") {
$resData = json_encode(self::RESPONSE_WITH_EXCEPTION) . PHP_EOL . json_encode($response->getErrorMsg());
- }else if ($response->getResult() == NULL) {
+ } else if ($response->getResult() == NULL) {
$resData = json_encode(self::RESPONSE_NULL_VALUE);
} else {
$resData = json_encode(self::RESPONSE_VALUE) . PHP_EOL . json_encode($response->getResult());
}
}
- $resData = $resData.PHP_EOL;
+ $resData = $resData . PHP_EOL;
$upper = ($response->getSn() & self::UPPER_MASK) >> 32;
$lower = $response->getSn() & self::LOWER_MASK;
$flag = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON;
diff --git a/common/protocol/fsof/DubboRequest.php b/common/protocol/fsof/DubboRequest.php
index cb30cb7..2d64f8c 100644
--- a/common/protocol/fsof/DubboRequest.php
+++ b/common/protocol/fsof/DubboRequest.php
@@ -15,8 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
namespace com\fenqile\fsof\common\protocol\fsof;
+use com\fenqile\fsof\consumer\Type;
+
class DubboRequest
{
//包头字段
@@ -89,7 +92,6 @@
}
-
/**
* @return boolean
*/
@@ -107,8 +109,6 @@
}
-
-
/**
* @return mixed
*/
@@ -142,7 +142,6 @@
}
-
/**
* @return mixed
*/
@@ -269,6 +268,16 @@
public function setParams($params)
{
$this->params = $params;
+ foreach ($this->params as &$value) {
+ if ($value instanceof Type) {
+ $value = $value->object;
+ } elseif (is_object($value)) {
+ $value = new \stdClass();
+ foreach ($value as $property => $val) {
+ $value->$property = $val;
+ }
+ }
+ }
}
/**
@@ -320,7 +329,6 @@
}
-
//用于监控service中各方法的性能
public $startTime; //开始处理请求的时间
public $endTime; //请求处理结束的时间
diff --git a/common/url/FSOFUrl.php b/common/url/FSOFUrl.php
index 7b5637a..bb9fcb4 100644
--- a/common/url/FSOFUrl.php
+++ b/common/url/FSOFUrl.php
@@ -35,6 +35,7 @@
const URL_CATEGORY = 'category';
const URL_WEIGHT = 'weight';
+ const URL_SERIALIZATION = 'serialization';
private $originUrl = NULL;
private $encodedUrl = NULL;
@@ -50,6 +51,7 @@
private $version = NULL;
private $params = NULL;
private $weight = NULL;
+ private $serialization = NULL;
private $logger;
@@ -147,6 +149,13 @@
{
$this->service = $getArgs[self::URL_SERVICE];
}
+
+ if(isset($getArgs[self::URL_SERIALIZATION]))
+ {
+ $this->serialization = $getArgs[self::URL_SERIALIZATION];
+ } else {
+ $this->serialization = 'hessian2';
+ }
}
$this->joinUrlStr();
@@ -270,6 +279,23 @@
$this->weight = $weight;
}
+ public function getSerialization($defaultValue = NULL)
+ {
+ if(empty($this->serialization))
+ {
+ return $defaultValue;
+ }
+ else
+ {
+ return $this->serialization;
+ }
+ }
+
+ public function setSerialization($serialization)
+ {
+ $this->serialization = $serialization;
+ }
+
public function getParams($key)
{
return $this->getparam($key);
diff --git a/composer.json b/composer.json
index ef301f0..dff3f4e 100644
--- a/composer.json
+++ b/composer.json
@@ -1,5 +1,6 @@
{
"require": {
- "apache/log4php": "^2.3"
+ "apache/log4php": "^2.3",
+ "crazyxman/hessian-parser": "dev-master"
}
}
diff --git a/consumer/ConsumerException.php b/consumer/ConsumerException.php
new file mode 100644
index 0000000..dc2f465
--- /dev/null
+++ b/consumer/ConsumerException.php
@@ -0,0 +1,16 @@
+<?php
+namespace com\fenqile\fsof\consumer;
+
+use Exception;
+
+class ConsumerException extends Exception
+{
+ /**
+ * @param string $message The exception message.
+ * @param Exception|null $previous The previous exception, if any.
+ */
+ public function __construct($message, Exception $previous = null)
+ {
+ parent::__construct($message, 0, $previous);
+ }
+}
\ No newline at end of file
diff --git a/consumer/Type.php b/consumer/Type.php
new file mode 100644
index 0000000..20c8123
--- /dev/null
+++ b/consumer/Type.php
@@ -0,0 +1,112 @@
+<?php
+
+namespace com\fenqile\fsof\consumer;
+
+use com\fenqile\fsof\consumer\ConsumerException;
+use Icecave\Collections\Collection;
+
+class Type
+{
+ /*
+ const SHORT = 1;
+ const INT = 2;
+ const INTEGER = 2;
+ const LONG = 3;
+ const FLOAT = 4;
+ const DOUBLE = 5;
+ const STRING = 6;
+ const BOOL = 7;
+ const BOOLEAN = 7;
+ const MAP = 8;
+ */
+ const ARRAYLIST = 9;
+ const DEFAULT_TYPE = 10;
+
+ const adapter = [
+ /*
+ Type::SHORT => 'S',
+ Type::INT => 'I',
+ Type::LONG => 'J',
+ Type::FLOAT => 'F',
+ Type::DOUBLE => 'D',
+ Type::BOOLEAN => 'Z',
+ Type::STRING => 'Ljava/lang/String;',
+ Type::MAP => 'Ljava/util/Map;',
+ */
+ Type::ARRAYLIST => 'Ljava/util/ArrayList;',
+ Type::DEFAULT_TYPE => 'Ljava/lang/Object;'
+ ];
+
+ private function __construct()
+ {
+ }
+
+ /**
+ *
+ * @param integer $value
+ * @return UniversalObject
+ */
+ public static function object($class, $properties)
+ {
+ $typeObj = new self();
+ $typeObj->className = $class;
+ $std = new \stdClass;
+ foreach ($properties as $key => $value) {
+ $std->$key = $value;
+ }
+ $typeObj->object = $std;
+ return $typeObj;
+ }
+
+ /**
+ *
+ * @param mixed $arg
+ * @return string
+ * @throws ConsumerException
+ */
+ public static function argToType($arg)
+ {
+ $type = gettype($arg);
+ switch ($type) {
+ case 'integer':
+ case 'boolean':
+ case 'double':
+ case 'string':
+ case 'NULL':
+ return self::adapter[Type::DEFAULT_TYPE];
+ case 'array':
+ if (Collection::isSequential($arg)) {
+ return self::adapter[Type::ARRAYLIST];
+ } else {
+ return self::adapter[Type::DEFAULT_TYPE];
+ }
+ case 'object':
+ if ($arg instanceof Type) {
+ $className = $arg->className;
+ } else {
+ $className = get_class($arg);
+ }
+ return 'L' . str_replace(['.', '\\'], '/', $className) . ';';
+ default:
+ throw new ConsumerException("Handler for type {$type} not implemented");
+ }
+ }
+ /**
+ *
+ * @param int $arg
+ * @return int
+ */
+
+ /*
+ private static function numToType($value)
+ {
+ if (-32768 <= $value && $value <= 32767) {
+ return Type::SHORT;
+ } elseif (-2147483648 <= $value && $value <= 2147483647) {
+ return Type::INT;
+ }
+ return Type::LONG;
+ }
+ */
+
+}
\ No newline at end of file
diff --git a/consumer/fsof/FSOFProcessor.php b/consumer/fsof/FSOFProcessor.php
index 1c1cb87..e3ec2f2 100644
--- a/consumer/fsof/FSOFProcessor.php
+++ b/consumer/fsof/FSOFProcessor.php
@@ -22,6 +22,7 @@
use com\fenqile\fsof\common\protocol\fsof\DubboRequest;
use com\fenqile\fsof\common\protocol\fsof\DubboResponse;
use com\fenqile\fsof\consumer\client\FSOFClient4Linux;
+use com\fenqile\fsof\consumer\ConsumerException;
class FSOFProcessor
{
@@ -35,6 +36,8 @@
private $logger;
+ private $iotimeout = 3;
+
public function __construct()
{
$this->logger = \Logger::getLogger(__CLASS__);
@@ -43,6 +46,7 @@
public function executeRequest(DubboRequest $request, $svrAddr, $ioTimeOut, &$providerAddr)
{
+ $this->iotimeout = $ioTimeOut;
//计算服务端个数
$svrNum = count($svrAddr);
//连接异常重试次数最多2次
@@ -64,11 +68,12 @@
//透传到服务端字段
$request->host = $host;
$request->port = $port;
- $request->setGroup($svrUrl->getGroup(FSOFConstants::FSOF_SERVICE_GROUP_ANY));
- $request->setVersion( $svrUrl->getVersion(FSOFConstants::FSOF_SERVICE_VERSION_DEFAULT));
- $request->setTimeout($ioTimeOut * 1000);
+ $request->setGroup($svrUrl->getGroup());
+ $request->setVersion( $svrUrl->getVersion());
+ $request->setTimeout($this->iotimeout * 1000);
+ $request->setSerialization($svrUrl->getSerialization(DubboParser::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON));
- $client = $this->connectProvider($host, $port, $ioTimeOut);
+ $client = $this->connectProvider($host, $port, $this->iotimeout);
if(empty($client))
{
//记录连接错误日志
@@ -125,7 +130,7 @@
$msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")";
}
$this->logger->error("send date failed:" . $msg);
- throw new \Exception("发送请求数据失败");
+ throw new ConsumerException("发送请求数据失败");
}
}
catch (\Exception $e)
@@ -138,7 +143,7 @@
$msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")";
}
$this->logger->error("send date failed:" . $msg, $e);
- throw new \Exception("发送请求数据失败");
+ throw new ConsumerException("发送请求数据失败");
}
try
@@ -156,7 +161,7 @@
}
else
{
- throw new \Exception("与服务器建立连接失败");
+ throw new ConsumerException("与服务器建立连接失败");
}
return $ret;
}
@@ -205,11 +210,11 @@
{
if (0 == $socket->getlasterror())
{
- throw new \Exception("provider端己关闭网络连接");
+ throw new ConsumerException("provider端己关闭网络连接");
}
else
{
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
}
@@ -220,7 +225,7 @@
if (($response) && ($response->getSn() != $request->getSn()))
{
$this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]");
- throw new \Exception("请求包中的sn非法");
+ throw new ConsumerException("请求包中的sn非法");
}
//接收消息体
@@ -255,25 +260,25 @@
$tmpdata = $this->Recv($socket, $cur_len);
if ($tmpdata)
{
- $recv_data = $recv_data . $tmpdata;
+ $recv_data .= $tmpdata;
$resv_len -= $cur_len;
}
else
{
if (0 == $socket->getlasterror())
{
- throw new \Exception("provider端己关闭网络连接");
+ throw new ConsumerException("provider端己关闭网络连接");
}
else
{
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
}
- //如果超过15秒就当超时处理
- if ((microtime(true) - $start_time) > 15)
+ //如果超过设置的iotimeout就当超时处理
+ if ((microtime(true) - $start_time) > $this->iotimeout)
{
$this->logger->error("Multi recv {$resv_len} bytes data timeout");
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
} while ($resv_len > 0);
@@ -284,7 +289,7 @@
{
if(DubboResponse::OK != $response->getStatus())
{
- throw new \Exception($response->getErrorMsg());
+ throw new ConsumerException($response->getErrorMsg());
}
else
{
@@ -294,7 +299,7 @@
else
{
$this->logger->error("parse response body err:".$response->__toString());
- throw new \Exception("未知异常");
+ throw new ConsumerException("未知异常");
}
}
@@ -302,21 +307,20 @@
{
try
{
+ $start_time = microtime(true);
$resv_len = $len;
$_data = '';
- $cnt = 20;//最多循环20次,防止provider端挂掉时,consumer陷入死循环
do
{
- $cnt--;
$tmp_data = $socket->recv($resv_len);
if (!$tmp_data)
{
$this->logger->warn("socket->recv faile:$resv_len");
break;
}
- $_data = $_data . $tmp_data;
+ $_data .= $tmp_data;
$resv_len -= strlen($tmp_data);
- } while (($resv_len > 0) && ($cnt > 0));
+ } while (($resv_len > 0) && ( (microtime(true) - $start_time) < $this->iotimeout)); //读取数据不能超过设置的io时长
if ($resv_len > 0)
{
@@ -331,11 +335,11 @@
$this->logger->error('recv data exception',$e);
if(self::FSOF_CONNECTION_RESET == $e->getCode())
{
- throw new \Exception("未知异常");
+ throw new ConsumerException("未知异常");
}
else
{
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
}
}
diff --git a/consumer/proxy/Proxy.php b/consumer/proxy/Proxy.php
index cecadce..3914ee4 100644
--- a/consumer/proxy/Proxy.php
+++ b/consumer/proxy/Proxy.php
@@ -18,8 +18,9 @@
namespace com\fenqile\fsof\consumer\proxy;
use com\fenqile\fsof\common\protocol\fsof\DubboRequest;
+use com\fenqile\fsof\consumer\ConsumerException;
use com\fenqile\fsof\consumer\fsof\FSOFProcessor;
-
+use com\fenqile\fsof\consumer\Type;
final class Proxy
{
@@ -81,12 +82,11 @@
return $rand_number;
}
- protected function generateParamType($num)
+ protected function generateParamType($args)
{
- $types = array();
- for($i = 0; $i < $num; $i++)
- {
- $types[] = 'Ljava/lang/Object;';
+ $types = [];
+ foreach ($args as $val) {
+ $types[] = Type::argToType($val);
}
return $types;
}
@@ -114,14 +114,14 @@
$request->setService($this->serviceInterface);
$request->setMethod($args[0]);
array_shift($args);
+ $request->setTypes($this->generateParamType($args));
$request->setParams($args);
- $request->setTypes($this->generateParamType(count($request->getParams())));
$result = $this->fsofProcessor->executeRequest($request, $this->serviceAddress, $this->ioTimeOut, $providerAddress);
}catch (\Exception $e) {
$cost_time = (int)((microtime(true) - $begin_time) * 1000000);
//记录consumer接口告警日志
$this->setAccLog($request, $cost_time, $e->getMessage());
- throw $e;
+ throw new ConsumerException($e->getMessage(), $e);
}
$cost_time = (int)((microtime(true) - $begin_time) * 1000000);
//记录consumer接口告警日志
diff --git a/consumer/proxy/ProxyFactory.php b/consumer/proxy/ProxyFactory.php
index d1c3c4e..0b4c8da 100644
--- a/consumer/proxy/ProxyFactory.php
+++ b/consumer/proxy/ProxyFactory.php
@@ -22,6 +22,7 @@
use com\fenqile\fsof\common\config\FSOFConstants;
use com\fenqile\fsof\common\config\FSOFCommonUtil;
use com\fenqile\fsof\registry\automatic\ConsumerProxy;
+use com\fenqile\fsof\consumer\ConsumerException;
final class ProxyFactory
@@ -61,6 +62,11 @@
*/
protected static $serviceConsumers = array();
+ /**
+ * @var *.consumer中配置的config信息
+ */
+ protected static $configConsumer = Array();
+
private static $logger;
public static function setConsumerConfig($configData, $consumerConfigFile, $initSettings)
@@ -89,6 +95,11 @@
self::$appVersion = $configData['consumer_config']['version'];
}
+ if(isset($configData['consumer_config']))
+ {
+ self::$configConsumer = $configData['consumer_config'];
+ }
+
if(isset($configData['consumer_services']))
{
self::$serviceConsumers = $configData['consumer_services'];
@@ -99,7 +110,7 @@
private static function getInstancByRedis($service, $ioTimeOut, $version, $group)
{
$ret = NULL;
- $providerInfo = ConsumerProxy::instance()->getProviders($service, $version, $group);
+ $providerInfo = ConsumerProxy::instance(self::$configConsumer)->getProviders($service, $version, $group);
if(!empty($providerInfo))
{
$cacheKey = $service.':'.$version.':'.$group;
@@ -164,7 +175,7 @@
return $ret;
}
- public static function getInstance($consumerInterface, $ioTimeOut = 3)
+ public static function getInstance($consumerInterface, $ioTimeOut = 3, $version = null, $group = null)
{
$ret = NULL;
$route = '';
@@ -210,7 +221,7 @@
if (empty($ret))
{
$errMsg = "current_address:".FSOFSystemUtil::getLocalIP()."|".$consumerInterface;
- throw new \Exception($errMsg);
+ throw new ConsumerException($errMsg);
}
else
{
@@ -223,6 +234,7 @@
{
self::$logger->error('consumer_app:'.self::$appName.'|app_config_file:'.self::$appConfigFile.
'|version:'.$version.'|group:'.$group.'|provider_service:'.$consumerInterface.'|errmsg:'. $e->getMessage().'|exceptionmsg:'.$e);
+ throw new ConsumerException($e->getMessage(), $e);
}
return $ret;
}
diff --git a/registry/automatic/ConsumerProxy.php b/registry/automatic/ConsumerProxy.php
index 2533821..b86fc89 100644
--- a/registry/automatic/ConsumerProxy.php
+++ b/registry/automatic/ConsumerProxy.php
@@ -25,19 +25,21 @@
{
private static $_instance;
private $logger;
+ private $config = [];
- public static function instance()
+ public static function instance($config)
{
if (!isset(ConsumerProxy::$_instance))
{
- ConsumerProxy::$_instance = new ConsumerProxy();
+ ConsumerProxy::$_instance = new ConsumerProxy($config);
}
return ConsumerProxy::$_instance;
}
- public function __construct()
+ public function __construct($config)
{
$this->logger = \Logger::getLogger(__CLASS__);
+ $this->config = $config;
}
/**
@@ -48,7 +50,7 @@
try
{
//获取路由信息
- $providerInfo = FSOFRedis::instance()->getProviderInfo($service);
+ $providerInfo = FSOFRedis::instance($this->config)->getProviderInfo($service);
return $this->filterProviderUrls($providerInfo, $version, $group, $service);
}
catch (\Exception $e)