Merge pull request #22 from crazyxman/dev

新增hessian序列化及其他功能,功能点已列到评论处
diff --git a/agent/src/c/Makefile b/agent/src/c/Makefile
index a228884..274f73a 100644
--- a/agent/src/c/Makefile
+++ b/agent/src/c/Makefile
@@ -1,5 +1,5 @@
 TARGET = release/agent
-CFLAGS = -g -Wall -lpthread -lzookeeper_mt   -L./lib -lhiredis -lzlog  -I./include
+CFLAGS = -g -Wall -lpthread -lzookeeper_mt -L./lib -lhiredis -lzlog  -I./include
 CC = gcc
 $(TARGET):service/fsof_mq.c src/fsof_main.c service/fsof_zookeeper.c common/strmap.c service/fsof_redis.c service/fsof_log.c common/fsof_url.c common/fsof_util.c service/mt_array.c
 	$(CC) $^ $(CFLAGS)  -o $@ 
diff --git a/agent/src/c/release/agent b/agent/src/c/release/agent
new file mode 100755
index 0000000..b6ff71d
--- /dev/null
+++ b/agent/src/c/release/agent
Binary files differ
diff --git a/agent/src/c/release/agent_server b/agent/src/c/release/agent_server
new file mode 100755
index 0000000..b6ff71d
--- /dev/null
+++ b/agent/src/c/release/agent_server
Binary files differ
diff --git a/common/config/FSOFConstants.php b/common/config/FSOFConstants.php
index 0b38d0c..aa880da 100644
--- a/common/config/FSOFConstants.php
+++ b/common/config/FSOFConstants.php
@@ -29,6 +29,13 @@
 	//monitor定时监控时间,暂定5分钟
 	const FSOF_MONITOR_TIMER = 300000;
 
-	//所使用的redis端口号
+	//默认所使用的redis端口号
 	const FSOF_SERVICE_REDIS_PORT =6379;
+    //默认所使用的redis地址
+	const FSOF_SERVICE_REDIS_HOST ='127.0.0.1';
+
+    const FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP = 'TCP';
+
+    const FSOF_SERVICE_REDIS_CONNECT_TYPE_SOCK = 'SOCK';
+
 }
\ No newline at end of file
diff --git a/common/file/FSOFRedis.php b/common/file/FSOFRedis.php
index 9ec56a1..42a162b 100644
--- a/common/file/FSOFRedis.php
+++ b/common/file/FSOFRedis.php
@@ -22,21 +22,33 @@
 
 class FSOFRedis
 {
-    const REDIS_TIME_OUT = 1;
 
     private static $_instance;
         
     private $m_redis = null;
 
     private $logger;
+
+    private $connect_timeout = 1;
+
+    private $read_timeout = 2;
+
+    private $retry_count = 1;
+
+    private $connect_type = FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP;
+
+    private $hosts = [
+        [FSOFConstants::FSOF_SERVICE_REDIS_HOST, FSOFConstants::FSOF_SERVICE_REDIS_PORT],
+    ];
         
-    public static function instance()
+    public static function instance($config = [])
     {
         if (extension_loaded('redis'))
         {
             if (!isset(FSOFRedis::$_instance))
             {
-                FSOFRedis::$_instance = new FSOFRedis();
+                FSOFRedis::$_instance = new FSOFRedis($config);
+                FSOFRedis::$_instance->get_redis();
             }
             return FSOFRedis::$_instance;
         }
@@ -47,33 +59,68 @@
         return NULL;
     }
     
-    public function  __construct()
+    public function  __construct($config = [])
     {
         $this->logger = \Logger::getLogger(__CLASS__);
-        $this->get_redis();
+        if(isset($config['redis_hosts']))
+        {
+            $this->hosts = [];
+            $address = explode(',', $config['redis_hosts']);
+            foreach ($address as $node){
+                list($host, $port) = explode(':', $node);
+                $this->hosts[] = [$host, $port??FSOFConstants::FSOF_SERVICE_REDIS_PORT];
+            }
+        }
+        if(isset($config['redis_connect_timeout']))
+        {
+            $this->connect_timeout = $config['redis_connect_timeout'];
+        }
+        if(isset($config['redis_read_timeout']))
+        {
+            $this->read_timeout = $config['redis_read_timeout'];
+        }
+        if(isset($config['redis_connect_type']))
+        {
+            $this->connect_type = $config['redis_connect_type'];
+        }
+        if(isset($config['redis_retry_count']))
+        {
+            $this->retry = min($config['redis_retry_count'], 1);
+        }
     }
 
     public function get_redis()
     {
         if (!isset($this->m_redis))
         {
-            try
-            {
-                $redis_cli = new \Redis();
-				$ret = $redis_cli->connect("127.0.0.1",FSOFConstants::FSOF_SERVICE_REDIS_PORT,self::REDIS_TIME_OUT);
-				if (!$ret)
-				{
-                    $this->logger->warn("connect redis failed[127.0.0.1:6379]");
-					$ret = $redis_cli->connect("/var/fsof/redis.sock",-1,self::REDIS_TIME_OUT);
-				}
-            }
-            catch (\Exception $e)
-            {
-                $ret = false;
-                $this->logger->error('connect redis excepiton:'.$e->getMessage().', errno:' . $e->getCode());
-                throw new \Exception($e->getMessage());
-            }
-
+            $hosts_count = count($this->hosts);
+            $retry = $this->retry_count;
+            $rand_num = rand() % $hosts_count;
+            $ret = false;
+            do{
+                try{
+                    $redis_cli = new \Redis();
+                    if($this->connect_type == FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP)
+                    {
+                        $node = $this->hosts[$rand_num];
+                        $ret = $redis_cli->connect($node[0],$node[1],$this->connect_timeout);
+                        $redis_cli->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout);
+                        $rand_num = ($rand_num + 1)%$hosts_count;
+                        if (!$ret)
+                        {
+                            $this->logger->warn("connect redis failed[{$node[0]}:{$node[1]}]");
+                        }
+                    }else{
+                        $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,FSOFConstants::FSOF_SERVICE_REDIS_PORT,$this->connect_timeout);
+                    }
+                    if($ret)
+                    {
+                        break;
+                    }
+                }catch (\Exception $e){
+                    $this->logger->error('connect redis excepiton:'.$e->getMessage().', errno:' . $e->getCode());
+                }
+            }while($retry-- > 0);
             if($ret)
             {
                 $this->m_redis = $redis_cli;
@@ -81,7 +128,7 @@
             else
             {
                 $this->logger->error('connect redis failed:|errno:' . $redis_cli->getLastError());
-                throw new \Exception("连接本地redis异常");
+                throw new \Exception("连接redis异常");
             }
         }
 
@@ -120,27 +167,22 @@
 
     public function getlist($key)
     {
-		$ret = NULL;
-        if (!empty($key)) 
+        if (!empty($key) && isset($this->m_redis))
         {
-			try
-			{
-				if(!isset($this->m_redis))
-				{
-					$this->get_redis();
-				}
-				$ret = $this->getlRange($key);
-			}
-			catch (\Exception $e)
-			{
+            try{
+                return $this->getlRange($key);
+            }catch (\Exception $e){
                 $this->logger->warn('redis current connect excepiton'.' |errcode:'.$e->getCode().' |errmsg:'.$e->getMessage());
-				$this->close();
-				//重试一次
-				$this->get_redis();
-				$ret = $this->getlRange($key);
-			}
-        } 
- 		return $ret;
+                $this->close();
+                //重试一次防止连接成功后,连接断开
+                $this->get_redis();
+                return $this->getlRange($key);
+            }
+        }
+        else
+        {
+            return null;
+        }
     }
 
     public function set($key, $value)
diff --git a/common/protocol/fsof/DubboParser.php b/common/protocol/fsof/DubboParser.php
index 0aee201..0497f5f 100644
--- a/common/protocol/fsof/DubboParser.php
+++ b/common/protocol/fsof/DubboParser.php
@@ -1,6 +1,11 @@
 <?php
 namespace com\fenqile\fsof\common\protocol\fsof;
 
+use com\fenqile\fsof\consumer\Type;
+use Icecave\Flax\Serialization\Encoder;
+use Icecave\Flax\DubboParser as Decoder;
+use com\fenqile\fsof\consumer\ConsumerException;
+
 /**
  *
  * Dubbo网络协议
@@ -36,7 +41,14 @@
     const DUBBO_PROTOCOL_MAGIC = 0xdabb;
 
     //serialize 方案编号
-    const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6;     //serialization code
+    const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6;     //fastjson serialization code
+
+    const DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 = 2;     //hessian2 serialization code
+
+    const DUBBO_PROTOCOL_NAME_MAP_CODE = [
+        'hessian2' => self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2,
+        'fastjson' => self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON
+    ];
 
 
     //fsof协议包头cmd字段含义
@@ -54,37 +66,23 @@
 
     private $logger;
 
-    public function  __construct()
+    public function __construct()
     {
         $this->logger = \Logger::getLogger(__CLASS__);
     }
 
-    public function packRequest(DubboRequest &$request)
+    public function packRequest(DubboRequest $request)
     {
-        $reqData = json_encode($request->getDubboVersion()) . PHP_EOL .
-            json_encode($request->getService()) . PHP_EOL .
-            json_encode($request->getVersion()) . PHP_EOL .
-            json_encode($request->getMethod()) . PHP_EOL ;
-        $typeStr = "";
-        for($i=0;$i < count($request->getTypes());$i++){
-           $typeStr = $typeStr.$request->getTypes()[$i];
+        if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == (self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]??null)) {
+            $reqData = $this->buildBodyForHessian2($request);
+            $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2;
+        } else {
+            $reqData = $this->buildBodyForFastJson($request);
+            $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON;
         }
-        $reqData = $reqData.json_encode($typeStr) . PHP_EOL;
-        for ($x = 0; $x < count($request->getParams()); $x++) {
-            $reqData = $reqData . json_encode($request->getParams()[$x]) . PHP_EOL;
-        }
-        $attach = array();
-        $attach['path'] = $request->getService();
-        $attach['interface'] = $request->getService();
-        $attach['group'] = $request->getGroup();
-        $attach['timeout'] = $request->getTimeout();
-        $attach['version'] = $request->getVersion();
-        $request->setAttach($attach);
-        $reqData = $reqData . json_encode($request->getAttach());
-
         $upper = ($request->getSn() & self::UPPER_MASK) >> 32;
         $lower = $request->getSn() & self::LOWER_MASK;
-        $flag = (self::FLAG_REQUEST | self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
+        $flag = (self::FLAG_REQUEST | $serialize_type);
         if ($request->isTwoWay()) $flag |= self::FLAG_TWOWAY;
         if ($request->isHeartbeatEvent()) $flag |= self::FLAG_HEARTBEAT_EVENT;
         $out = pack("n1C1a1N1N1N1",
@@ -97,8 +95,74 @@
         return $out . $reqData;
     }
 
+    public function buildBodyForFastJson(DubboRequest $request)
+    {
+        $reqData = json_encode($request->getDubboVersion()) . PHP_EOL .
+            json_encode($request->getService()) . PHP_EOL;
+        if ($request->getVersion()) {
+            $reqData .= json_encode($request->getVersion()) . PHP_EOL;
+        } else {
+            $reqData .= '""' . PHP_EOL;
+        }
+        $reqData .= json_encode($request->getMethod()) . PHP_EOL;
+        $reqData .= json_encode($this->typeRefs($request)) . PHP_EOL;
+        foreach ($request->getParams() as $value) {
+            $reqData .= json_encode($value) . PHP_EOL;
+        }
+        $attach = array();
+        $attach['path'] = $request->getService();
+        $attach['interface'] = $request->getService();
+        if ($request->getGroup()) {
+            $attach['group'] = $request->getGroup();
+        }
+        if ($request->getVersion()) {
+            $attach['version'] = $request->getVersion();
+        }
+        $attach['timeout'] = $request->getTimeout();
+        $request->setAttach($attach);
+        $reqData .= json_encode($request->getAttach());
+        return $reqData;
 
-    public function parseResponseHeader(DubboResponse &$response)
+    }
+
+    public function buildBodyForHessian2(DubboRequest $request)
+    {
+        $encode = new Encoder();
+        $reqData = '';
+        $reqData .= $encode->encode($request->getDubboVersion());
+        $reqData .= $encode->encode($request->getService());
+        if ($request->getVersion()) {
+            $reqData .= $encode->encode($request->getVersion());
+        } else {
+            $reqData .= $encode->encode('');
+        }
+        $reqData .= $encode->encode($request->getMethod());
+        $reqData .= $encode->encode($this->typeRefs($request));
+        foreach ($request->getParams() as $value) {
+            $reqData .= $encode->encode($value);
+        }
+        $attach = ['path' => $request->getService(), 'interface' => $request->getService(), 'timeout' => $request->getTimeout()];
+        if ($request->getGroup()) {
+            $attach['group'] = $request->getGroup();
+        }
+        if ($request->getVersion()) {
+            $attach['version'] = $request->getVersion();
+        }
+        $reqData .= $encode->encode($attach);
+        return $reqData;
+    }
+
+    private function typeRefs(DubboRequest $request)
+    {
+        $typeRefs = '';
+        foreach ($request->getTypes() as $type) {
+            $typeRefs .= $type;
+        }
+        return $typeRefs;
+    }
+
+
+    public function parseResponseHeader(DubboResponse $response)
     {
         $res_header = substr($response->getFullData(), 0, self::PACKAGE_HEDA_LEN);
         $format = 'n1magic/C1flag/C1status/N1upper/N1lower/N1len';
@@ -106,55 +170,70 @@
         $response->setStatus($_arr['status']);
         $response->setSn($_arr["upper"] << 32 | $_arr["lower"]);
         $flag = $_arr["flag"];
-        if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
-        {
+        if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
             $response->setHeartbeatEvent(true);
         }
-        $response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
+        $response->setSerialization($flag & self::SERIALIZATION_MASK);
         $response->setLen($_arr["len"]);
         return $response;
     }
 
-    public function parseResponseBody(DubboResponse &$response)
+    public function parseResponseBody(DubboResponse $response)
+    {
+        if (DubboResponse::OK == $response->getStatus()) {
+            if (self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON == $response->getSerialization()) {
+                $this->parseResponseBodyForFastJson($response);
+            } else if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == $response->getSerialization()) {
+                $this->parseResponseBodyForHessian2($response);
+            } else {
+                throw new ConsumerException(sprintf('返回的序列化类型:(%s), 不支持解析!', $response->getSerialization()));
+            }
+        } else {
+            throw new ConsumerException($response->getFullData());
+        }
+        return $response;
+    }
+
+    private function parseResponseBodyForFastJson(DubboResponse $response)
     {
         $_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN);
         $response->setResponseBody($_data);
-        $dataArr = array();
-        if ($_data){
-            $dataArr = explode(PHP_EOL,$_data);
-        }
-        if ($response->getStatus() == DubboResponse::OK)
-        {
-            if ($response->isHeartbeatEvent())
-            {
-                $response->setResult(json_decode($dataArr[0], true));
-            } else
-            {
-                switch ($dataArr[0])
-                {
-                    case self::RESPONSE_NULL_VALUE:
-                        break;
-                    case self::RESPONSE_VALUE:
-                        $response->setResult(json_decode($dataArr[1], true));
-                        break;
-                    case self::RESPONSE_WITH_EXCEPTION:
-                        $exception = json_decode($dataArr[1], true);
-                        if(is_array($exception) && array_key_exists('message', $exception)){
-                            throw new \Exception($exception['message']);
-                        }else if(is_string($exception)){
-                            throw new \Exception($exception);
-                        }else{
-                            throw new \Exception("provider occur error");
-                        }
-                        break;
-                    default:
-                        return false;
-                }
-            }
-            return $response;
+        list($status, $content) = explode(PHP_EOL, $_data);
+        if ($response->isHeartbeatEvent()) {
+            $response->setResult(json_decode($status, true));
         } else {
-            throw new \Exception(json_decode($dataArr[0], true));
+            switch ($status) {
+                case self::RESPONSE_NULL_VALUE:
+                    break;
+                case self::RESPONSE_VALUE:
+                    $response->setResult(json_decode($content, true));
+                    break;
+                case self::RESPONSE_WITH_EXCEPTION:
+                    $exception = json_decode($content, true);
+                    if (is_array($exception) && array_key_exists('message', $exception)) {
+                        throw new ConsumerException($exception['message']);
+                    } else if (is_string($exception)) {
+                        throw new ConsumerException($exception);
+                    } else {
+                        throw new ConsumerException("provider occur error");
+                    }
+                    break;
+                default:
+                    return false;
+            }
         }
+        return $response;
+    }
+
+    private function parseResponseBodyForHessian2(DubboResponse $response)
+    {
+        if (!$response->isHeartbeatEvent()) {
+            $_data = $response->getFullData();
+            $decode = new Decoder($_data);
+            $content = $decode->getData($_data);
+            $response->setResult($content);
+        }
+        return $response;
     }
 
 
@@ -165,8 +244,7 @@
         $_arr = unpack($format, $_data);
         $flag = $_arr['flag'];
         $request->setTwoWay(($flag & self::FLAG_TWOWAY) != 0);
-        if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
-        {
+        if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
             $request->setHeartbeatEvent(true);
         }
         $request->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
@@ -178,57 +256,48 @@
 
     public function parseRequestBody(DubboRequest &$request)
     {
-        if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON)
-        {
+        if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON) {
             $this->logger->error("unknown serialization type :" . $request->getSerialization());
             return false;
         }
         $cliData = substr($request->getFullData(), self::PACKAGE_HEDA_LEN);
-        if ($cliData)
-        {
-            if ($request->isHeartbeatEvent())
-            {
+        if ($cliData) {
+            if ($request->isHeartbeatEvent()) {
                 //心跳请求,不需要数据回送
-            } else
-            {
+            } else {
                 $dataArr = explode(PHP_EOL, $cliData);
                 $request->setDubboVersion(json_decode($dataArr[0], true));
                 $request->setService(json_decode($dataArr[1], true));
                 $request->setVersion(json_decode($dataArr[2], true));
-                $methodName = json_decode($dataArr[3],true);
-                if ($methodName == "\$invoke")
-                {
+                $methodName = json_decode($dataArr[3], true);
+                if ($methodName == "\$invoke") {
                     //泛化调用
-                    $request->setMethod(json_decode($dataArr[5],true));
-                    $request->setParams(json_decode($dataArr[7],true));
-                    $attach = json_decode($dataArr[8],true);
-                } else
-                {
+                    $request->setMethod(json_decode($dataArr[5], true));
+                    $request->setParams(json_decode($dataArr[7], true));
+                    $attach = json_decode($dataArr[8], true);
+                } else {
                     //非泛化调用
                     $request->setMethod($methodName);
                     $paramTypes = json_decode($dataArr[4], true);
-                    if ($paramTypes == "")
-                    {
+                    if ($paramTypes == "") {
                         //调用没有参数的方法
                         $request->setTypes(NULL);
                         $request->setParams(NULL);
-                        $attach = json_decode($dataArr[5],true);
-                    } else
-                    {
+                        $attach = json_decode($dataArr[5], true);
+                    } else {
                         $typeArr = explode(";", $paramTypes);
                         $typeArrLen = count($typeArr);
                         $request->setParamNum($typeArrLen - 1);
                         $params = array();
                         for ($i = 0; $i < $typeArrLen - 1; $i++) {
-                            $params[$i] = json_decode($dataArr[5 + $i],true);
+                            $params[$i] = json_decode($dataArr[5 + $i], true);
                         }
                         $request->setParams($params);
-                        $attach = json_decode($dataArr[5 + ($typeArrLen - 1)],true);
+                        $attach = json_decode($dataArr[5 + ($typeArrLen - 1)], true);
                     }
                 }
                 $request->setAttach($attach);
-                if (array_key_exists('group', $attach))
-                {
+                if (array_key_exists('group', $attach)) {
                     $request->setGroup($attach['group']);
                 }
                 return $request;
@@ -241,17 +310,17 @@
     public function packResponse(DubboResponse &$response)
     {
         if ($response->getStatus() != DubboResponse::OK) {
-            $resData = json_encode($response->getErrorMsg()) ;
+            $resData = json_encode($response->getErrorMsg());
         } else {
-            if($response->getErrorMsg() != NULL && $response->getErrorMsg() != ""){
+            if ($response->getErrorMsg() != NULL && $response->getErrorMsg() != "") {
                 $resData = json_encode(self::RESPONSE_WITH_EXCEPTION) . PHP_EOL . json_encode($response->getErrorMsg());
-            }else if ($response->getResult() == NULL) {
+            } else if ($response->getResult() == NULL) {
                 $resData = json_encode(self::RESPONSE_NULL_VALUE);
             } else {
                 $resData = json_encode(self::RESPONSE_VALUE) . PHP_EOL . json_encode($response->getResult());
             }
         }
-        $resData =  $resData.PHP_EOL;
+        $resData = $resData . PHP_EOL;
         $upper = ($response->getSn() & self::UPPER_MASK) >> 32;
         $lower = $response->getSn() & self::LOWER_MASK;
         $flag = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON;
diff --git a/common/protocol/fsof/DubboRequest.php b/common/protocol/fsof/DubboRequest.php
index cb30cb7..2d64f8c 100644
--- a/common/protocol/fsof/DubboRequest.php
+++ b/common/protocol/fsof/DubboRequest.php
@@ -15,8 +15,11 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 namespace com\fenqile\fsof\common\protocol\fsof;
 
+use com\fenqile\fsof\consumer\Type;
+
 class DubboRequest
 {
     //包头字段
@@ -89,7 +92,6 @@
     }
 
 
-
     /**
      * @return boolean
      */
@@ -107,8 +109,6 @@
     }
 
 
-
-
     /**
      * @return mixed
      */
@@ -142,7 +142,6 @@
     }
 
 
-
     /**
      * @return mixed
      */
@@ -269,6 +268,16 @@
     public function setParams($params)
     {
         $this->params = $params;
+        foreach ($this->params as &$value) {
+            if ($value instanceof Type) {
+                $value = $value->object;
+            } elseif (is_object($value)) {
+                $value = new \stdClass();
+                foreach ($value as $property => $val) {
+                    $value->$property = $val;
+                }
+            }
+        }
     }
 
     /**
@@ -320,7 +329,6 @@
     }
 
 
-
     //用于监控service中各方法的性能
     public $startTime;            //开始处理请求的时间
     public $endTime;            //请求处理结束的时间
diff --git a/common/url/FSOFUrl.php b/common/url/FSOFUrl.php
index 7b5637a..bb9fcb4 100644
--- a/common/url/FSOFUrl.php
+++ b/common/url/FSOFUrl.php
@@ -35,6 +35,7 @@
     const URL_CATEGORY = 'category';
 
 	const URL_WEIGHT = 'weight';
+	const URL_SERIALIZATION = 'serialization';
 
     private $originUrl = NULL;
     private $encodedUrl = NULL;
@@ -50,6 +51,7 @@
     private $version = NULL;
     private $params = NULL;
 	private $weight = NULL;
+	private $serialization = NULL;
 
     private $logger;
 
@@ -147,6 +149,13 @@
 			{
 				$this->service = $getArgs[self::URL_SERVICE];
 			}
+
+			if(isset($getArgs[self::URL_SERIALIZATION]))
+            {
+                $this->serialization = $getArgs[self::URL_SERIALIZATION];
+            } else {
+                $this->serialization = 'hessian2';
+            }
 		}
 
         $this->joinUrlStr();
@@ -270,6 +279,23 @@
 		$this->weight = $weight;
 	}
 
+	public function getSerialization($defaultValue = NULL)
+    {
+        if(empty($this->serialization))
+        {
+            return $defaultValue;
+        }
+        else
+        {
+            return $this->serialization;
+        }
+    }
+
+	public function setSerialization($serialization)
+    {
+        $this->serialization = $serialization;
+    }
+
 	public function getParams($key)
 	{
 		return $this->getparam($key);
diff --git a/composer.json b/composer.json
index ef301f0..dff3f4e 100644
--- a/composer.json
+++ b/composer.json
@@ -1,5 +1,6 @@
 {
     "require": {
-        "apache/log4php": "^2.3"
+        "apache/log4php": "^2.3",
+        "crazyxman/hessian-parser": "dev-master"
     }
 }
diff --git a/consumer/ConsumerException.php b/consumer/ConsumerException.php
new file mode 100644
index 0000000..dc2f465
--- /dev/null
+++ b/consumer/ConsumerException.php
@@ -0,0 +1,16 @@
+<?php
+namespace com\fenqile\fsof\consumer;
+
+use Exception;
+
+class ConsumerException extends Exception
+{
+    /**
+     * @param string         $message  The exception message.
+     * @param Exception|null $previous The previous exception, if any.
+     */
+    public function __construct($message, Exception $previous = null)
+    {
+        parent::__construct($message, 0, $previous);
+    }
+}
\ No newline at end of file
diff --git a/consumer/Type.php b/consumer/Type.php
new file mode 100644
index 0000000..20c8123
--- /dev/null
+++ b/consumer/Type.php
@@ -0,0 +1,112 @@
+<?php
+
+namespace com\fenqile\fsof\consumer;
+
+use com\fenqile\fsof\consumer\ConsumerException;
+use Icecave\Collections\Collection;
+
+class Type
+{
+    /*
+    const SHORT = 1;
+    const INT = 2;
+    const INTEGER = 2;
+    const LONG = 3;
+    const FLOAT = 4;
+    const DOUBLE = 5;
+    const STRING = 6;
+    const BOOL = 7;
+    const BOOLEAN = 7;
+    const MAP = 8;
+    */
+    const ARRAYLIST = 9;
+    const DEFAULT_TYPE = 10;
+
+    const adapter = [
+        /*
+        Type::SHORT => 'S',
+        Type::INT => 'I',
+        Type::LONG => 'J',
+        Type::FLOAT => 'F',
+        Type::DOUBLE => 'D',
+        Type::BOOLEAN => 'Z',
+        Type::STRING => 'Ljava/lang/String;',
+        Type::MAP => 'Ljava/util/Map;',
+        */
+        Type::ARRAYLIST => 'Ljava/util/ArrayList;',
+        Type::DEFAULT_TYPE => 'Ljava/lang/Object;'
+    ];
+
+    private function __construct()
+    {
+    }
+
+    /**
+     *
+     * @param integer $value
+     * @return UniversalObject
+     */
+    public static function object($class, $properties)
+    {
+        $typeObj = new self();
+        $typeObj->className = $class;
+        $std = new \stdClass;
+        foreach ($properties as $key => $value) {
+            $std->$key = $value;
+        }
+        $typeObj->object = $std;
+        return $typeObj;
+    }
+
+    /**
+     *
+     * @param mixed $arg
+     * @return string
+     * @throws ConsumerException
+     */
+    public static function argToType($arg)
+    {
+        $type = gettype($arg);
+        switch ($type) {
+            case 'integer':
+            case 'boolean':
+            case 'double':
+            case 'string':
+            case 'NULL':
+                return self::adapter[Type::DEFAULT_TYPE];
+            case 'array':
+                if (Collection::isSequential($arg)) {
+                    return self::adapter[Type::ARRAYLIST];
+                } else {
+                    return self::adapter[Type::DEFAULT_TYPE];
+                }
+            case 'object':
+                if ($arg instanceof Type) {
+                    $className = $arg->className;
+                } else {
+                    $className = get_class($arg);
+                }
+                return 'L' . str_replace(['.', '\\'], '/', $className) . ';';
+            default:
+                throw new ConsumerException("Handler for type {$type} not implemented");
+        }
+    }
+    /**
+     *
+     * @param int $arg
+     * @return int
+     */
+
+    /*
+    private static function numToType($value)
+    {
+        if (-32768 <= $value && $value <= 32767) {
+            return Type::SHORT;
+        } elseif (-2147483648 <= $value && $value <= 2147483647) {
+            return Type::INT;
+        }
+        return Type::LONG;
+    }
+    */
+
+}
\ No newline at end of file
diff --git a/consumer/fsof/FSOFProcessor.php b/consumer/fsof/FSOFProcessor.php
index 1c1cb87..e3ec2f2 100644
--- a/consumer/fsof/FSOFProcessor.php
+++ b/consumer/fsof/FSOFProcessor.php
@@ -22,6 +22,7 @@
 use com\fenqile\fsof\common\protocol\fsof\DubboRequest;
 use com\fenqile\fsof\common\protocol\fsof\DubboResponse;
 use com\fenqile\fsof\consumer\client\FSOFClient4Linux;
+use com\fenqile\fsof\consumer\ConsumerException;
 
 class FSOFProcessor
 {
@@ -35,6 +36,8 @@
 
     private  $logger;
 
+    private $iotimeout = 3;
+
     public function __construct()
     {
         $this->logger = \Logger::getLogger(__CLASS__);
@@ -43,6 +46,7 @@
 
     public function executeRequest(DubboRequest $request, $svrAddr, $ioTimeOut, &$providerAddr)
     {
+        $this->iotimeout = $ioTimeOut;
         //计算服务端个数
         $svrNum = count($svrAddr);
         //连接异常重试次数最多2次
@@ -64,11 +68,12 @@
                 //透传到服务端字段
                 $request->host = $host;
                 $request->port = $port;
-                $request->setGroup($svrUrl->getGroup(FSOFConstants::FSOF_SERVICE_GROUP_ANY));
-                $request->setVersion( $svrUrl->getVersion(FSOFConstants::FSOF_SERVICE_VERSION_DEFAULT));
-                $request->setTimeout($ioTimeOut * 1000);
+                $request->setGroup($svrUrl->getGroup());
+                $request->setVersion( $svrUrl->getVersion());
+                $request->setTimeout($this->iotimeout * 1000);
+                $request->setSerialization($svrUrl->getSerialization(DubboParser::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON));
 
-                $client = $this->connectProvider($host, $port, $ioTimeOut);
+                $client = $this->connectProvider($host, $port, $this->iotimeout);
                 if(empty($client))
                 {
                     //记录连接错误日志
@@ -125,7 +130,7 @@
                         $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")";
                     }
                     $this->logger->error("send date failed:" . $msg);
-                    throw new \Exception("发送请求数据失败");
+                    throw new ConsumerException("发送请求数据失败");
                 }
             }
             catch (\Exception $e)
@@ -138,7 +143,7 @@
                     $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")";
                 }
                 $this->logger->error("send date failed:" . $msg, $e);
-                throw new \Exception("发送请求数据失败");
+                throw new ConsumerException("发送请求数据失败");
             }
 
             try
@@ -156,7 +161,7 @@
         }
         else
         {
-            throw new \Exception("与服务器建立连接失败");
+            throw new ConsumerException("与服务器建立连接失败");
         }
         return $ret;
     }
@@ -205,11 +210,11 @@
         {
             if (0 == $socket->getlasterror())
             {
-                throw new \Exception("provider端己关闭网络连接");
+                throw new ConsumerException("provider端己关闭网络连接");
             }
             else
             {
-                throw new \Exception("接收应答数据超时");
+                throw new ConsumerException("接收应答数据超时");
             }
         }
 
@@ -220,7 +225,7 @@
         if (($response) && ($response->getSn() != $request->getSn()))
         {
             $this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]");
-            throw new \Exception("请求包中的sn非法");
+            throw new ConsumerException("请求包中的sn非法");
         }
 
         //接收消息体
@@ -255,25 +260,25 @@
                 $tmpdata = $this->Recv($socket, $cur_len);
                 if ($tmpdata)
                 {
-                    $recv_data = $recv_data . $tmpdata;
+                    $recv_data .= $tmpdata;
                     $resv_len -= $cur_len;
                 }
                 else
                 {
                     if (0 == $socket->getlasterror())
                     {
-                        throw new \Exception("provider端己关闭网络连接");
+                        throw new ConsumerException("provider端己关闭网络连接");
                     }
                     else
                     {
-                        throw new \Exception("接收应答数据超时");
+                        throw new ConsumerException("接收应答数据超时");
                     }
                 }
-                //如果超过15秒就当超时处理
-                if ((microtime(true) - $start_time) > 15)
+                //如果超过设置的iotimeout就当超时处理
+                if ((microtime(true) - $start_time) > $this->iotimeout)
                 {
                     $this->logger->error("Multi recv {$resv_len} bytes data timeout");
-                    throw new \Exception("接收应答数据超时");
+                    throw new ConsumerException("接收应答数据超时");
                 }
             } while ($resv_len > 0);
 
@@ -284,7 +289,7 @@
         {
             if(DubboResponse::OK != $response->getStatus())
             {
-                throw new \Exception($response->getErrorMsg());
+                throw new ConsumerException($response->getErrorMsg());
             }
             else
             {
@@ -294,7 +299,7 @@
         else
         {
             $this->logger->error("parse response body err:".$response->__toString());
-            throw new \Exception("未知异常");
+            throw new ConsumerException("未知异常");
         }
     }
 
@@ -302,21 +307,20 @@
     {
         try
         {
+            $start_time = microtime(true);
             $resv_len = $len;
             $_data = '';
-            $cnt = 20;//最多循环20次,防止provider端挂掉时,consumer陷入死循环
             do
             {
-                $cnt--;
                 $tmp_data = $socket->recv($resv_len);
                 if (!$tmp_data)
                 {
                     $this->logger->warn("socket->recv faile:$resv_len");
                     break;
                 }
-                $_data = $_data . $tmp_data;
+                $_data .= $tmp_data;
                 $resv_len -= strlen($tmp_data);
-            } while (($resv_len > 0) && ($cnt > 0));
+            } while (($resv_len > 0) && ( (microtime(true) - $start_time) < $this->iotimeout)); //读取数据不能超过设置的io时长
 
             if ($resv_len > 0)
             {
@@ -331,11 +335,11 @@
             $this->logger->error('recv data exception',$e);
             if(self::FSOF_CONNECTION_RESET == $e->getCode())
             {
-                throw new \Exception("未知异常");
+                throw new ConsumerException("未知异常");
             }
             else
             {
-                throw new \Exception("接收应答数据超时");
+                throw new ConsumerException("接收应答数据超时");
             }
         }
     }
diff --git a/consumer/proxy/Proxy.php b/consumer/proxy/Proxy.php
index cecadce..3914ee4 100644
--- a/consumer/proxy/Proxy.php
+++ b/consumer/proxy/Proxy.php
@@ -18,8 +18,9 @@
 namespace com\fenqile\fsof\consumer\proxy;
 
 use com\fenqile\fsof\common\protocol\fsof\DubboRequest;
+use com\fenqile\fsof\consumer\ConsumerException;
 use com\fenqile\fsof\consumer\fsof\FSOFProcessor;
-
+use com\fenqile\fsof\consumer\Type;
 
 final class Proxy
 {
@@ -81,12 +82,11 @@
         return $rand_number;
     }
 
-    protected function generateParamType($num)
+    protected function generateParamType($args)
     {
-        $types = array();
-        for($i = 0; $i < $num; $i++)
-        {
-            $types[] = 'Ljava/lang/Object;';
+        $types = [];
+        foreach ($args as $val) {
+            $types[] = Type::argToType($val);
         }
         return $types;
     }
@@ -114,14 +114,14 @@
             $request->setService($this->serviceInterface);
             $request->setMethod($args[0]);
             array_shift($args);
+            $request->setTypes($this->generateParamType($args));
             $request->setParams($args);
-            $request->setTypes($this->generateParamType(count($request->getParams())));
             $result = $this->fsofProcessor->executeRequest($request, $this->serviceAddress, $this->ioTimeOut, $providerAddress);
         }catch (\Exception $e) {
             $cost_time = (int)((microtime(true) - $begin_time) * 1000000);
             //记录consumer接口告警日志
             $this->setAccLog($request, $cost_time, $e->getMessage());
-            throw $e;
+            throw new ConsumerException($e->getMessage(), $e);
         }
         $cost_time = (int)((microtime(true) - $begin_time) * 1000000);
         //记录consumer接口告警日志
diff --git a/consumer/proxy/ProxyFactory.php b/consumer/proxy/ProxyFactory.php
index d1c3c4e..0b4c8da 100644
--- a/consumer/proxy/ProxyFactory.php
+++ b/consumer/proxy/ProxyFactory.php
@@ -22,6 +22,7 @@
 use com\fenqile\fsof\common\config\FSOFConstants;
 use com\fenqile\fsof\common\config\FSOFCommonUtil;
 use com\fenqile\fsof\registry\automatic\ConsumerProxy;
+use com\fenqile\fsof\consumer\ConsumerException;
 
 
 final class ProxyFactory
@@ -61,6 +62,11 @@
 	 */
 	protected static $serviceConsumers = array();
 
+    /**
+     * @var  *.consumer中配置的config信息
+     */
+	protected static $configConsumer = Array();
+
     private static $logger;
 
 	public static function setConsumerConfig($configData, $consumerConfigFile, $initSettings)
@@ -89,6 +95,11 @@
 			self::$appVersion = $configData['consumer_config']['version'];
 		}
 
+        if(isset($configData['consumer_config']))
+        {
+            self::$configConsumer = $configData['consumer_config'];
+        }
+
 		if(isset($configData['consumer_services']))
 		{
 			self::$serviceConsumers = $configData['consumer_services'];
@@ -99,7 +110,7 @@
     private static function getInstancByRedis($service, $ioTimeOut, $version, $group)
 	{
         $ret = NULL;
-        $providerInfo = ConsumerProxy::instance()->getProviders($service, $version, $group);
+        $providerInfo = ConsumerProxy::instance(self::$configConsumer)->getProviders($service, $version, $group);
         if(!empty($providerInfo))
         {
             $cacheKey = $service.':'.$version.':'.$group;
@@ -164,7 +175,7 @@
         return $ret;         
     }
 
-    public static function getInstance($consumerInterface, $ioTimeOut = 3)
+    public static function getInstance($consumerInterface, $ioTimeOut = 3, $version = null, $group = null)
     {
     	$ret = NULL;
         $route = '';
@@ -210,7 +221,7 @@
 			if (empty($ret))
 			{
 				$errMsg = "current_address:".FSOFSystemUtil::getLocalIP()."|".$consumerInterface;
-                throw new \Exception($errMsg);
+                throw new ConsumerException($errMsg);
 			}
 			else
 			{
@@ -223,6 +234,7 @@
         {
             self::$logger->error('consumer_app:'.self::$appName.'|app_config_file:'.self::$appConfigFile.
                 '|version:'.$version.'|group:'.$group.'|provider_service:'.$consumerInterface.'|errmsg:'. $e->getMessage().'|exceptionmsg:'.$e);
+            throw new ConsumerException($e->getMessage(), $e);
         }
     	return $ret;
     }
diff --git a/registry/automatic/ConsumerProxy.php b/registry/automatic/ConsumerProxy.php
index 2533821..b86fc89 100644
--- a/registry/automatic/ConsumerProxy.php
+++ b/registry/automatic/ConsumerProxy.php
@@ -25,19 +25,21 @@
 {
     private static $_instance;
     private $logger;
+    private $config = [];
 
-    public static function instance()
+    public static function instance($config)
     {
         if (!isset(ConsumerProxy::$_instance)) 
         {
-            ConsumerProxy::$_instance = new ConsumerProxy();
+            ConsumerProxy::$_instance = new ConsumerProxy($config);
         }
         return ConsumerProxy::$_instance;
     }
 
-    public function __construct()
+    public function __construct($config)
     {
         $this->logger = \Logger::getLogger(__CLASS__);
+        $this->config = $config;
     }
 
     /**
@@ -48,7 +50,7 @@
         try
 		{
 			//获取路由信息
-            $providerInfo = FSOFRedis::instance()->getProviderInfo($service);
+            $providerInfo = FSOFRedis::instance($this->config)->getProviderInfo($service);
 			return $this->filterProviderUrls($providerInfo, $version, $group, $service);
         }
         catch (\Exception $e)