把读写操作转化为非阻塞
diff --git a/dubbo/connection/connections.py b/dubbo/connection/connections.py
index 26a9e6f..3a62199 100644
--- a/dubbo/connection/connections.py
+++ b/dubbo/connection/connections.py
@@ -45,7 +45,7 @@
event = threading.Event()
self.conn_events[invoke_id] = event
# 发送数据
- conn.write(request_data)
+ conn.send(request_data)
event.wait(timeout)
del self.conn_events[invoke_id]
@@ -99,69 +99,82 @@
"""
raise NotImplementedError()
- def _read(self, conn):
+ def _callback(self, data, conn, data_type, invoke_id):
"""
- 从指定的连接读取数据
- :param conn:
+ 读取的数据满足之后触发的回调函数,由于connection是共有
+ 的,所以我们要把这一大坨和连接相关的状态保存在各自连接中
+ :param data: 收到的数据
+ :param conn: 对应的连接
+ :param data_type:
+ 1 头部
+ 2 因为头部的解析错误,需要被读取的错误body
+ 3 正确的body
+ :param invoke_id
:return:
+ next_read_length 下一次读取需要读取的数据长度
+ next_read_type 下一次读取需要读取的数据类型
+ invoke_id 此次调用的id
"""
host = conn.remote_host()
- # 数据的头部大小为16个字节
- head = conn.read(16)
- if not head: # 连接已关闭
+ # 关闭连接
+ if not data:
logger.debug('{} closed by remote server'.format(host))
self._delete_connection(conn)
- return
+ return 0, 0, 0
- try:
- heartbeat, body_length = parse_response_head(head)
- except DubboResponseException as e: # 这里是dubbo的内部异常,与response中的业务异常不一样
- logger.exception(e)
- body_length = unpack('!i', head[12:])[0]
- body = conn.read(body_length)
- res = Response(body)
+ if data_type == 1:
+ try:
+ heartbeat, body_length = parse_response_head(data)
+ except DubboResponseException as e: # 这里是dubbo的内部异常,与response中的业务异常不一样
+ logger.exception(e)
+ body_length = unpack('!i', data[12:])[0]
+ # 重新设置invoke_id
+ invoke_id = unpack('!q', data[4:12])[0]
+ # 下一次需要读取错误的响应体
+ return body_length, 2, invoke_id
+ # 远程主机发送的心跳请求数据包
+ if heartbeat == 2:
+ logger.debug('❤ request -> {}'.format(conn.remote_host()))
+ msg_id = data[4:12]
+ heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
+ conn.write(bytearray(heartbeat_response))
+ # 下一次继续读取新的头部数据
+ return 16, 1, None
+ # 远程主机发送的心跳响应数据包
+ elif heartbeat == 1:
+ logger.debug('❤ response -> {}'.format(conn.remote_host()))
+ self.client_heartbeats[host] -= 1
+ # 下一次继续读取新的头部数据
+ return 16, 1, None
+ # 普通的数据包
+ else:
+ # 重新设置invoke_id
+ invoke_id = unpack('!q', data[4:12])[0]
+ # 下一次读取正常的响应体
+ return body_length, 3, invoke_id
+ elif data_type == 2:
+ res = Response(data)
error = res.read_next()
- invoke_id = unpack('!q', head[4:12])[0]
- self.results[invoke_id] = DubboResponseException('\n{}\n{}'.format(e.message, error))
+
+ self.results[invoke_id] = DubboResponseException('\n{}'.format(error))
self.conn_events[invoke_id].set()
- return
- body = conn.read(body_length)
- self._parse_remote_data(head, body, heartbeat, conn, host)
-
- def _parse_remote_data(self, head, body, heartbeat, conn, host):
- """
- 对从远程主机读取到的数据进行解析
- :param head:
- :param body:
- :param heartbeat:
- :param conn:
- :param host:
- :return:
- """
- # 远程主机发送的心跳请求数据包
- if heartbeat == 2:
- logger.debug('❤ request -> {}'.format(conn.remote_host()))
- msg_id = head[4:12]
- heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
- conn.write(bytearray(heartbeat_response))
- # 远程主机发送的心跳响应数据包
- elif heartbeat == 1:
- logger.debug('❤ response -> {}'.format(conn.remote_host()))
- self.client_heartbeats[host] -= 1
- # 普通的数据包
+ # 下一次继续读取新的头部数据
+ return 16, 1, None
+ elif data_type == 3:
+ self._parse_response(invoke_id, data)
+ # 下一次继续读取新的头部数据
+ return 16, 1, None
else:
- self._parse_response(head, body)
+ raise RuntimeError('Unknown data type {}.'.format(data_type))
- def _parse_response(self, head, body):
+ def _parse_response(self, invoke_id, body):
"""
对dubbo的响应数据进行解析
- :param head:
+ :param invoke_id:
:param body:
:return:
"""
- # 请求的调用id,目的是将请求和请求所对应的响应对应起来
- invoke_id = unpack('!q', head[4:12])[0]
try:
res = Response(body)
flag = res.read_int()
@@ -219,38 +232,6 @@
time.sleep(TIMEOUT_CHECK_INTERVAL - time_delta)
-class EpollConnectionPool(BaseConnectionPool):
- """
- epoll模型只支持Linux及其发行版
- """
-
- def __init__(self):
- self.__fds = {} # 文件描述符所对应的连接
- self.__epoll = select.epoll()
- BaseConnectionPool.__init__(self)
-
- def _read_from_server(self):
- while 1:
- events = self.__epoll.poll(1)
- for fd, event in events:
- if event & select.EPOLLIN:
- conn = self.__fds[fd]
- self._read(conn)
-
- def _new_connection(self, host):
- ip, port = host.split(':')
- conn = Connection(ip, int(port))
- self.__epoll.register(conn.fileno(), select.EPOLLIN)
- self.__fds[conn.fileno()] = conn
-
- self._connection_pool[host] = conn
-
- def _delete_connection(self, conn):
- self.__epoll.unregister(conn.fileno())
- host = conn.remote_host()
- del self._connection_pool[host]
-
-
class SelectConnectionPool(BaseConnectionPool):
"""
select模型支持大多数的现代操作系统
@@ -264,12 +245,14 @@
while 1:
try:
conns = self._connection_pool.values()
- readable, writeable, exceptional = select.select(conns, [], [], self.select_timeout)
+ readable, writeable, exceptional = select.select(conns, conns, [], self.select_timeout)
except select.error as e:
logger.exception(e)
break
+ for conn in writeable:
+ conn.write()
for conn in readable:
- self._read(conn)
+ conn.read(self._callback)
def _new_connection(self, host):
ip, port = host.split(':')
@@ -282,10 +265,6 @@
# connection_pool在整个进程中是单例的
-# if hasattr(select, 'epoll'):
-# connection_pool = EpollConnectionPool()
-# else:
-# connection_pool = SelectConnectionPool()
connection_pool = SelectConnectionPool()
@@ -297,8 +276,20 @@
def __init__(self, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
+ # 在创建好连接之后设置IO为非阻塞
+ sock.setblocking(False)
self.__sock = sock
self.__host = '{0}:{1}'.format(host, port)
+
+ # 数据的头部大小为16个字节
+ self.read_length = 16
+ # 读取的数据类型:1 head; 2 error_body; 3 common_body;
+ self.read_type = 1
+ self.invoke_id = None
+ self.read_buffer = []
+
+ self.write_buffer = []
+
self.last_active = time.time()
def fileno(self):
@@ -308,23 +299,44 @@
"""
return self.__sock.fileno()
- def write(self, data):
+ def send(self, data):
"""
- 向远程主机写数据
+ 客户端执行发送操作
:param data:
:return:
"""
- self.last_active = time.time()
- self.__sock.sendall(data)
+ self.write_buffer.extend(list(data))
- def read(self, length):
+ def write(self):
+ """
+ 向远程主机写数据
+ :return:
+ """
+ if len(self.write_buffer) > 0:
+ length = self.__sock.send(bytearray(self.write_buffer))
+ self.write_buffer = self.write_buffer[length:]
+ self.last_active = time.time()
+
+ def read(self, callback):
"""
读取远程主机的数据
- :param length:
+ :param callback:
:return:
"""
self.last_active = time.time()
- return bytearray(self.__sock.recv(length, socket.MSG_WAITALL))
+
+ data = list(bytearray(self.__sock.recv(self.read_length - len(self.read_buffer))))
+ # 断开连接
+ if not data:
+ callback([], self, None, None)
+ return
+
+ self.read_buffer.extend(data)
+ # 数据读取已经满足要求
+ if len(self.read_buffer) == self.read_length:
+ self.read_length, self.read_type, self.invoke_id \
+ = callback(bytearray(self.read_buffer), self, self.read_type, self.invoke_id)
+ self.read_buffer = []
def close(self):
"""