修改线程同步的方式
diff --git a/dubbo/client.py b/dubbo/client.py
index 07e571e..99cb17c 100644
--- a/dubbo/client.py
+++ b/dubbo/client.py
@@ -54,9 +54,7 @@
* double
* java.lang.String
* java.lang.Object
- :param timeout: 请求超时时间(秒),不设置则不会超时。默认不设置,如无特殊需求不建议设置
- * 不设置超时时间在某些极限情况下可能导致此连接一直阻塞;
- * 设置超时时间会增加远程调用的时间;
+ :param timeout: 请求超时时间(秒),不设置则不会超时
:return:
"""
if not isinstance(args, (list, tuple)):
diff --git a/dubbo/connection/connections.py b/dubbo/connection/connections.py
index 9f6ca83..718e2b4 100644
--- a/dubbo/connection/connections.py
+++ b/dubbo/connection/connections.py
@@ -26,7 +26,7 @@
self.client_heartbeats = {}
# 创建连接的锁
self.__conn_lock = threading.Lock()
- self.__event = threading.Event()
+ self.__events = {}
reading_thread = threading.Thread(target=self._read_from_server)
reading_thread.setDaemon(True) # 当主线程退出时此线程同时退出
@@ -40,20 +40,16 @@
conn = self._get_connection(host)
request = Request(request_param)
request_data = request.encode()
- request_invoke_id = request.invoke_id
+ invoke_id = request.invoke_id
+ event = threading.Event()
+ self.__events[invoke_id] = event
+ # 发送数据
conn.write(request_data)
- since_request = time.time() # 从发出请求开始计时
-
- # 这里的实现很不严谨,我试图使用别的线程的唤醒事件来避免某个未设置超时的线程永远阻塞
- default_timeout = timeout or 60
- while request_invoke_id not in self.results:
- if time.time() - since_request > default_timeout:
- raise DubboRequestTimeoutException(
- "Socket(host='{}'): Read timed out. (read timeout={})".format(host, default_timeout))
- self.__event.clear()
- self.__event.wait(timeout=timeout)
- result = self.results.pop(request_invoke_id)
+ event.wait(timeout)
+ # 此event已经无效,应该删除
+ del self.__events[invoke_id]
+ result = self.results.pop(invoke_id)
if isinstance(result, Exception):
raise result
@@ -125,7 +121,7 @@
error = res.read_next()
invoke_id = unpack('!q', head[4:12])[0]
self.results[invoke_id] = DubboResponseException('\n{}\n{}'.format(e.message, error))
- self.__event.set()
+ self.__events[invoke_id].set()
return
body = conn.read(body_length)
self._parse_remote_data(head, body, heartbeat, conn, host)
@@ -179,7 +175,7 @@
logger.exception(e)
self.results[invoke_id] = e
finally:
- self.__event.set() # 唤醒请求线程
+ self.__events[invoke_id].set() # 唤醒请求线程
@staticmethod
def _parse_error(res):
diff --git a/tests/run_test.py b/tests/run_test.py
index ac3ecef..82e8580 100644
--- a/tests/run_test.py
+++ b/tests/run_test.py
@@ -16,9 +16,6 @@
self.dubbo = DubboClient('com.qianmi.pc.es.api.EsGoodsQueryProvider', zk_register=zk)
def test_run(self):
- # for i in xrange(10):
- # thread = threading.Thread(target=run, args=(self.dubbo,))
- # thread.start()
# goods_query_request = Object('com.qianmi.pc.es.api.request.EsGoodsQueryRequest', values={
# 'chainMasterId': 'A859315',
# 'fromSys': 2,
@@ -33,14 +30,20 @@
# result = self.dubbo.call('listByIdString', goods_list_by_id_request)
# pretty_print(result)
- zk = ZkRegister('172.19.71.7:2181')
- dubbo_cli = DubboClient('com.qianmi.pc.es.api.EsGoodsQueryProvider', zk_register=zk)
- product_request = Object('com.qianmi.pc.es.api.request.EsGoodsListByIdStringRequest', values={
- 'chainMasterId': 'A000000',
- 'idString': 'NotUsed:g10529'
- })
- result = dubbo_cli.call('listByIdString', product_request)
- pretty_print(result)
+ # zk = ZkRegister('172.19.71.7:2181')
+ # dubbo_cli = DubboClient('com.qianmi.pc.es.api.EsGoodsQueryProvider', zk_register=zk)
+ # product_request = Object('com.qianmi.pc.es.api.request.EsGoodsListByIdStringRequest', values={
+ # 'chainMasterId': 'A000000',
+ # 'idString': 'NotUsed:g10529'
+ # })
+ # result = dubbo_cli.call('listByIdString', product_request)
+ # pretty_print(result)
+
+ zk = ZkRegister('127.0.0.1:2181')
+ dubbo_cli = DubboClient('me.hourui.echo.provider.Echo', zk_register=zk)
+ for i in xrange(4):
+ thread = threading.Thread(target=run, args=(dubbo_cli,))
+ thread.start()
def pretty_print(value):
@@ -48,8 +51,8 @@
def run(_dubbo):
- for j in xrange(1000):
- _dubbo.call('echo18', timeout=1)
+ for j in xrange(100000):
+ _dubbo.call('echo18')
if __name__ == '__main__':