把读写操作转化为非阻塞
diff --git a/dubbo/connection/connections.py b/dubbo/connection/connections.py
index 26a9e6f..3a62199 100644
--- a/dubbo/connection/connections.py
+++ b/dubbo/connection/connections.py
@@ -45,7 +45,7 @@
         event = threading.Event()
         self.conn_events[invoke_id] = event
         # 发送数据
-        conn.write(request_data)
+        conn.send(request_data)
         event.wait(timeout)
         del self.conn_events[invoke_id]
 
@@ -99,69 +99,82 @@
         """
         raise NotImplementedError()
 
-    def _read(self, conn):
+    def _callback(self, data, conn, data_type, invoke_id):
         """
-        从指定的连接读取数据
-        :param conn:
+        读取的数据满足之后触发的回调函数,由于connection是共有
+        的,所以我们要把这一大坨和连接相关的状态保存在各自连接中
+        :param data: 收到的数据
+        :param conn: 对应的连接
+        :param data_type:
+                1 头部
+                2 因为头部的解析错误,需要被读取的错误body
+                3 正确的body
+        :param invoke_id
         :return:
+            next_read_length 下一次读取需要读取的数据长度
+            next_read_type   下一次读取需要读取的数据类型
+            invoke_id        此次调用的id
         """
         host = conn.remote_host()
 
-        # 数据的头部大小为16个字节
-        head = conn.read(16)
-        if not head:  # 连接已关闭
+        # 关闭连接
+        if not data:
             logger.debug('{} closed by remote server'.format(host))
             self._delete_connection(conn)
-            return
+            return 0, 0, 0
 
-        try:
-            heartbeat, body_length = parse_response_head(head)
-        except DubboResponseException as e:  # 这里是dubbo的内部异常,与response中的业务异常不一样
-            logger.exception(e)
-            body_length = unpack('!i', head[12:])[0]
-            body = conn.read(body_length)
-            res = Response(body)
+        if data_type == 1:
+            try:
+                heartbeat, body_length = parse_response_head(data)
+            except DubboResponseException as e:  # 这里是dubbo的内部异常,与response中的业务异常不一样
+                logger.exception(e)
+                body_length = unpack('!i', data[12:])[0]
+                # 重新设置invoke_id
+                invoke_id = unpack('!q', data[4:12])[0]
+                # 下一次需要读取错误的响应体
+                return body_length, 2, invoke_id
+            # 远程主机发送的心跳请求数据包
+            if heartbeat == 2:
+                logger.debug('❤ request  -> {}'.format(conn.remote_host()))
+                msg_id = data[4:12]
+                heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
+                conn.write(bytearray(heartbeat_response))
+                # 下一次继续读取新的头部数据
+                return 16, 1, None
+            # 远程主机发送的心跳响应数据包
+            elif heartbeat == 1:
+                logger.debug('❤ response -> {}'.format(conn.remote_host()))
+                self.client_heartbeats[host] -= 1
+                # 下一次继续读取新的头部数据
+                return 16, 1, None
+            # 普通的数据包
+            else:
+                # 重新设置invoke_id
+                invoke_id = unpack('!q', data[4:12])[0]
+                # 下一次读取正常的响应体
+                return body_length, 3, invoke_id
+        elif data_type == 2:
+            res = Response(data)
             error = res.read_next()
-            invoke_id = unpack('!q', head[4:12])[0]
-            self.results[invoke_id] = DubboResponseException('\n{}\n{}'.format(e.message, error))
+
+            self.results[invoke_id] = DubboResponseException('\n{}'.format(error))
             self.conn_events[invoke_id].set()
-            return
-        body = conn.read(body_length)
-        self._parse_remote_data(head, body, heartbeat, conn, host)
-
-    def _parse_remote_data(self, head, body, heartbeat, conn, host):
-        """
-        对从远程主机读取到的数据进行解析
-        :param head:
-        :param body:
-        :param heartbeat:
-        :param conn:
-        :param host:
-        :return:
-        """
-        # 远程主机发送的心跳请求数据包
-        if heartbeat == 2:
-            logger.debug('❤ request  -> {}'.format(conn.remote_host()))
-            msg_id = head[4:12]
-            heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
-            conn.write(bytearray(heartbeat_response))
-        # 远程主机发送的心跳响应数据包
-        elif heartbeat == 1:
-            logger.debug('❤ response -> {}'.format(conn.remote_host()))
-            self.client_heartbeats[host] -= 1
-        # 普通的数据包
+            # 下一次继续读取新的头部数据
+            return 16, 1, None
+        elif data_type == 3:
+            self._parse_response(invoke_id, data)
+            # 下一次继续读取新的头部数据
+            return 16, 1, None
         else:
-            self._parse_response(head, body)
+            raise RuntimeError('Unknown data type {}.'.format(data_type))
 
-    def _parse_response(self, head, body):
+    def _parse_response(self, invoke_id, body):
         """
         对dubbo的响应数据进行解析
-        :param head:
+        :param invoke_id:
         :param body:
         :return:
         """
-        # 请求的调用id,目的是将请求和请求所对应的响应对应起来
-        invoke_id = unpack('!q', head[4:12])[0]
         try:
             res = Response(body)
             flag = res.read_int()
@@ -219,38 +232,6 @@
                 time.sleep(TIMEOUT_CHECK_INTERVAL - time_delta)
 
 
-class EpollConnectionPool(BaseConnectionPool):
-    """
-    epoll模型只支持Linux及其发行版
-    """
-
-    def __init__(self):
-        self.__fds = {}  # 文件描述符所对应的连接
-        self.__epoll = select.epoll()
-        BaseConnectionPool.__init__(self)
-
-    def _read_from_server(self):
-        while 1:
-            events = self.__epoll.poll(1)
-            for fd, event in events:
-                if event & select.EPOLLIN:
-                    conn = self.__fds[fd]
-                    self._read(conn)
-
-    def _new_connection(self, host):
-        ip, port = host.split(':')
-        conn = Connection(ip, int(port))
-        self.__epoll.register(conn.fileno(), select.EPOLLIN)
-        self.__fds[conn.fileno()] = conn
-
-        self._connection_pool[host] = conn
-
-    def _delete_connection(self, conn):
-        self.__epoll.unregister(conn.fileno())
-        host = conn.remote_host()
-        del self._connection_pool[host]
-
-
 class SelectConnectionPool(BaseConnectionPool):
     """
     select模型支持大多数的现代操作系统
@@ -264,12 +245,14 @@
         while 1:
             try:
                 conns = self._connection_pool.values()
-                readable, writeable, exceptional = select.select(conns, [], [], self.select_timeout)
+                readable, writeable, exceptional = select.select(conns, conns, [], self.select_timeout)
             except select.error as e:
                 logger.exception(e)
                 break
+            for conn in writeable:
+                conn.write()
             for conn in readable:
-                self._read(conn)
+                conn.read(self._callback)
 
     def _new_connection(self, host):
         ip, port = host.split(':')
@@ -282,10 +265,6 @@
 
 
 # connection_pool在整个进程中是单例的
-# if hasattr(select, 'epoll'):
-#     connection_pool = EpollConnectionPool()
-# else:
-#     connection_pool = SelectConnectionPool()
 connection_pool = SelectConnectionPool()
 
 
@@ -297,8 +276,20 @@
     def __init__(self, host, port):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect((host, port))
+        # 在创建好连接之后设置IO为非阻塞
+        sock.setblocking(False)
         self.__sock = sock
         self.__host = '{0}:{1}'.format(host, port)
+
+        # 数据的头部大小为16个字节
+        self.read_length = 16
+        # 读取的数据类型:1 head; 2 error_body; 3 common_body;
+        self.read_type = 1
+        self.invoke_id = None
+        self.read_buffer = []
+
+        self.write_buffer = []
+
         self.last_active = time.time()
 
     def fileno(self):
@@ -308,23 +299,44 @@
         """
         return self.__sock.fileno()
 
-    def write(self, data):
+    def send(self, data):
         """
-        向远程主机写数据
+        客户端执行发送操作
         :param data:
         :return:
         """
-        self.last_active = time.time()
-        self.__sock.sendall(data)
+        self.write_buffer.extend(list(data))
 
-    def read(self, length):
+    def write(self):
+        """
+        向远程主机写数据
+        :return:
+        """
+        if len(self.write_buffer) > 0:
+            length = self.__sock.send(bytearray(self.write_buffer))
+            self.write_buffer = self.write_buffer[length:]
+            self.last_active = time.time()
+
+    def read(self, callback):
         """
         读取远程主机的数据
-        :param length:
+        :param callback:
         :return:
         """
         self.last_active = time.time()
-        return bytearray(self.__sock.recv(length, socket.MSG_WAITALL))
+
+        data = list(bytearray(self.__sock.recv(self.read_length - len(self.read_buffer))))
+        # 断开连接
+        if not data:
+            callback([], self, None, None)
+            return
+
+        self.read_buffer.extend(data)
+        # 数据读取已经满足要求
+        if len(self.read_buffer) == self.read_length:
+            self.read_length, self.read_type, self.invoke_id \
+                = callback(bytearray(self.read_buffer), self, self.read_type, self.invoke_id)
+            self.read_buffer = []
 
     def close(self):
         """