取消在select中检测writeable的fd,因为这有可能一直在发生,导致CPU100%
diff --git a/dubbo/connection/connections.py b/dubbo/connection/connections.py
index c2726fa..75d8cae 100644
--- a/dubbo/connection/connections.py
+++ b/dubbo/connection/connections.py
@@ -54,7 +54,7 @@
self.conn_events[invoke_id] = event
# 发送数据
logger.debug('Request has been send for request id {}'.format(invoke_id))
- conn.send(request_data)
+ conn.write(request_data)
event.wait(timeout)
del self.conn_events[invoke_id]
@@ -171,7 +171,7 @@
logger.debug('❤ request -> {}'.format(conn.remote_host()))
msg_id = data[4:12]
heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
- conn.send(bytearray(heartbeat_response))
+ conn.write(bytearray(heartbeat_response))
return body_length, 3, None if body_length > 0 else DEFAULT_READ_PARAMS
elif heartbeat == 1:
logger.debug('❤ response -> {}'.format(conn.remote_host()))
@@ -263,7 +263,7 @@
self.client_heartbeats[host] += 1
invoke_id = list(bytearray(pack('!q', get_invoke_id())))
req = CLI_HEARTBEAT_REQ_HEAD + invoke_id + CLI_HEARTBEAT_TAIL
- conn.send(bytearray(req))
+ conn.write(bytearray(req))
logger.debug('Head has been send for request id {}'.format(invoke_id))
@@ -280,12 +280,10 @@
while 1:
try:
conns = self._connection_pool.values()
- readable, writeable, exceptional = select.select(conns, conns, [], self.select_timeout)
+ readable, writeable, exceptional = select.select(conns, [], [], self.select_timeout)
except select.error as e:
logger.exception(e)
break
- for conn in writeable:
- conn.write()
for conn in readable:
conn.read(self._callback)
@@ -318,8 +316,6 @@
self.read_length, self.read_type, self.invoke_id = DEFAULT_READ_PARAMS
self.read_buffer = []
- self.write_buffer = []
- self.write_lock = threading.Lock()
self.last_active = time.time()
@@ -331,33 +327,13 @@
"""
return self.__sock.fileno()
- def send(self, data):
- """
- 客户端执行发送操作
- :param data:
- :return:
- """
- self.write_lock.acquire()
- try:
- self.write_buffer.extend(list(data))
- finally:
- self.write_lock.release()
-
- def write(self):
+ def write(self, data):
"""
向远程主机写数据
:return:
"""
- if len(self.write_buffer) == 0:
- return
- self.write_lock.acquire()
- try:
- if len(self.write_buffer) > 0:
- length = self.__sock.send(bytearray(self.write_buffer))
- self.write_buffer = self.write_buffer[length:]
- self.last_active = time.time()
- finally:
- self.write_lock.release()
+ # TODO 这里有可能存在一次无法发送完毕的情况
+ self.__sock.send(data)
def read(self, callback):
"""