重构代码
diff --git a/dubbo/common/constants.py b/dubbo/common/constants.py
index 4e42745..5223bac 100644
--- a/dubbo/common/constants.py
+++ b/dubbo/common/constants.py
@@ -40,3 +40,8 @@
TIMEOUT_IDLE = 60
# 连接允许的最多的超时次数
TIMEOUT_MAX_TIMES = 3
+
+# 数据的头部大小为16个字节
+# 读取的数据类型:1 head; 2 error_body; 3 common_body;
+# 头部信息不存在invoke_id,所以为None
+DEFAULT_READ_PARAMS = 16, 1, None
diff --git a/dubbo/connection/connections.py b/dubbo/connection/connections.py
index 9cabf3e..8899750 100644
--- a/dubbo/connection/connections.py
+++ b/dubbo/connection/connections.py
@@ -9,7 +9,7 @@
from dubbo.codec.encoder import Request
from dubbo.codec.decoder import Response, parse_response_head
from dubbo.common.constants import CLI_HEARTBEAT_RES_HEAD, CLI_HEARTBEAT_TAIL, CLI_HEARTBEAT_REQ_HEAD, \
- TIMEOUT_CHECK_INTERVAL, TIMEOUT_IDLE, TIMEOUT_MAX_TIMES
+ TIMEOUT_CHECK_INTERVAL, TIMEOUT_IDLE, TIMEOUT_MAX_TIMES, DEFAULT_READ_PARAMS
from dubbo.common.exceptions import DubboResponseException, DubboRequestTimeoutException
from dubbo.common.util import get_invoke_id
@@ -26,6 +26,7 @@
self.client_heartbeats = {}
# 创建连接的锁
self.conn_lock = threading.Lock()
+ # 用于在数据读取完毕之后唤醒主线程
self.conn_events = {}
reading_thread = threading.Thread(target=self._read_from_server)
@@ -37,6 +38,13 @@
scanning_thread.start()
def get(self, host, request_param, timeout=None):
+ """
+ 执行远程调用获取数据
+ :param host:
+ :param request_param:
+ :param timeout:
+ :return:
+ """
conn = self._get_connection(host)
request = Request(request_param)
request_data = request.encode()
@@ -115,44 +123,15 @@
next_read_type 下一次读取需要读取的数据类型
invoke_id 此次调用的id
"""
- host = conn.remote_host()
-
# 关闭连接
if not data:
+ host = conn.remote_host()
logger.debug('{} closed by remote server'.format(host))
self._delete_connection(conn)
return 0, 0, 0
if data_type == 1:
- try:
- heartbeat, body_length = parse_response_head(data)
- except DubboResponseException as e: # 这里是dubbo的内部异常,与response中的业务异常不一样
- logger.exception(e)
- body_length = unpack('!i', data[12:])[0]
- # 重新设置invoke_id
- invoke_id = unpack('!q', data[4:12])[0]
- # 下一次需要读取错误的响应体
- return body_length, 2, invoke_id
- # 远程主机发送的心跳请求数据包
- if heartbeat == 2:
- logger.debug('❤ request -> {}'.format(conn.remote_host()))
- msg_id = data[4:12]
- heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
- conn.write(bytearray(heartbeat_response))
- # 下一次继续读取新的头部数据
- return 16, 1, None
- # 远程主机发送的心跳响应数据包
- elif heartbeat == 1:
- logger.debug('❤ response -> {}'.format(conn.remote_host()))
- self.client_heartbeats[host] -= 1
- # 下一次继续读取新的头部数据
- return 16, 1, None
- # 普通的数据包
- else:
- # 重新设置invoke_id
- invoke_id = unpack('!q', data[4:12])[0]
- # 下一次读取正常的响应体
- return body_length, 3, invoke_id
+ return self._parse_head(data, conn)
elif data_type == 2:
res = Response(data)
error = res.read_next()
@@ -160,14 +139,52 @@
self.results[invoke_id] = DubboResponseException('\n{}'.format(error))
self.conn_events[invoke_id].set()
# 下一次继续读取新的头部数据
- return 16, 1, None
+ return DEFAULT_READ_PARAMS
elif data_type == 3:
self._parse_response(invoke_id, data)
# 下一次继续读取新的头部数据
- return 16, 1, None
+ return DEFAULT_READ_PARAMS
else:
raise RuntimeError('Unknown data type {}.'.format(data_type))
+ def _parse_head(self, data, conn):
+ """
+ 对dubbo响应的头部信息进行解析
+ :param data:
+ :param conn:
+ :return:
+ """
+ try:
+ heartbeat, body_length = parse_response_head(data)
+ except DubboResponseException as e: # 这里是dubbo的内部异常,与response中的业务异常不一样
+ logger.exception(e)
+ body_length = unpack('!i', data[12:])[0]
+ # 获取响应的invoke_id
+ invoke_id = unpack('!q', data[4:12])[0]
+ # 下一次需要读取错误的响应体
+ return body_length, 2, invoke_id
+ # 远程主机发送的心跳请求数据包
+ if heartbeat == 2:
+ logger.debug('❤ request -> {}'.format(conn.remote_host()))
+ msg_id = data[4:12]
+ heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
+ conn.write(bytearray(heartbeat_response))
+ # 下一次继续读取新的头部数据
+ return DEFAULT_READ_PARAMS
+ # 远程主机发送的心跳响应数据包
+ elif heartbeat == 1:
+ logger.debug('❤ response -> {}'.format(conn.remote_host()))
+ host = conn.remote_host()
+ self.client_heartbeats[host] -= 1
+ # 下一次继续读取新的头部数据
+ return DEFAULT_READ_PARAMS
+ # 普通的数据包
+ else:
+ # 获取响应的invoke_id
+ invoke_id = unpack('!q', data[4:12])[0]
+ # 下一次读取正常的响应体
+ return body_length, 3, invoke_id
+
def _parse_response(self, invoke_id, body):
"""
对dubbo的响应数据进行解析
@@ -215,22 +232,36 @@
while 1:
starting = time.time()
for host in self._connection_pool.keys():
- conn = self._connection_pool[host]
- if time.time() - conn.last_active > TIMEOUT_IDLE:
- if self.client_heartbeats[host] >= TIMEOUT_MAX_TIMES:
- self._delete_connection(conn)
- conn.close() # 客户端主动关闭连接
- logger.debug('{} closed by client'.format(host))
- continue
- self.client_heartbeats[host] += 1
- invoke_id = list(bytearray(pack('!q', get_invoke_id())))
- req = CLI_HEARTBEAT_REQ_HEAD + invoke_id + CLI_HEARTBEAT_TAIL
- conn.write(bytearray(req))
+ self._check_conn(host)
ending = time.time()
time_delta = ending - starting
if time_delta < TIMEOUT_CHECK_INTERVAL:
time.sleep(TIMEOUT_CHECK_INTERVAL - time_delta)
+ def _check_conn(self, host):
+ """
+ 对连接进行检查,查看是否超时或者已经达到最大的超时次数
+ :param host:
+ :return:
+ """
+ conn = self._connection_pool[host]
+ # 如果未达到最大的超时时间,则不进行任何操作
+ if time.time() - conn.last_active <= TIMEOUT_IDLE:
+ return
+
+ # 达到最大的超时次数,移除此连接
+ if self.client_heartbeats[host] >= TIMEOUT_MAX_TIMES:
+ self._delete_connection(conn)
+ conn.close() # 客户端主动关闭连接
+ logger.debug('{} closed by client'.format(host))
+
+ # 未达到最大的超时次数,超时次数+1且发送心跳包
+ else:
+ self.client_heartbeats[host] += 1
+ invoke_id = list(bytearray(pack('!q', get_invoke_id())))
+ req = CLI_HEARTBEAT_REQ_HEAD + invoke_id + CLI_HEARTBEAT_TAIL
+ conn.write(bytearray(req))
+
class SelectConnectionPool(BaseConnectionPool):
"""
@@ -281,21 +312,16 @@
self.__sock = sock
self.__host = '{0}:{1}'.format(host, port)
- # 数据的头部大小为16个字节
- self.read_length = 16
- # 读取的数据类型:1 head; 2 error_body; 3 common_body;
- self.read_type = 1
- self.invoke_id = None
+ self.read_length, self.read_type, self.invoke_id = DEFAULT_READ_PARAMS
self.read_buffer = []
-
self.write_buffer = []
+ self.write_lock = threading.Lock()
self.last_active = time.time()
- self.write_lock = threading.Lock()
-
def fileno(self):
"""
+ Get file descriptor
https://stackoverflow.com/a/39328021/4614538
:return:
"""