IGNITE-14167 Simplify reconnecting, fix affinity topology change detection

This closes #16
diff --git a/.travis.yml b/.travis.yml
index f884bdb..3095941 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+language: python
 sudo: required
 
 addons:
@@ -21,7 +22,9 @@
       - openjdk-8-jdk
 
 env:
-  - IGNITE_VERSION=2.9.1 IGNITE_HOME=/opt/ignite
+  global:
+    - IGNITE_VERSION=2.9.1
+    - IGNITE_HOME=/opt/ignite
 
 before_install:
   - curl -L https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip > ignite.zip
@@ -29,10 +32,17 @@
   - mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
   - mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
 
-language: python
-python:
-  - "3.6"
-  - "3.7"
-  - "3.8"
-install: pip install tox-travis
+jobs:
+  include:
+    - python: '3.6'
+      arch: amd64
+      env: TOXENV=py36-no-ssl,py36-ssl,py36-ssl-password
+    - python: '3.7'
+      arch: amd64
+      env: TOXENV=py37-no-ssl,py37-ssl,py37-ssl-password
+    - python: '3.8'
+      arch: amd64
+      env: TOXENV=py38-no-ssl,py38-ssl,py38-ssl-password
+
+install: pip install tox
 script: tox
\ No newline at end of file
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
index 16148a1..7d09517 100644
--- a/pyignite/api/affinity.py
+++ b/pyignite/api/affinity.py
@@ -55,11 +55,13 @@
 partition_mapping = StructArray([
     ('is_applicable', Bool),
 
-    ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+    ('cache_mapping', Conditional(['is_applicable'],
+                                  lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
                                   lambda ctx: ctx['is_applicable'],
                                   cache_mapping, empty_cache_mapping)),
 
-    ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+    ('node_mapping', Conditional(['is_applicable'],
+                                 lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
                                  lambda ctx: ctx['is_applicable'],
                                  node_mapping, empty_node_mapping)),
 ])
diff --git a/pyignite/cache.py b/pyignite/cache.py
index dd7dac4..ea672a8 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -283,6 +283,10 @@
                         parts += len(p)
 
                 self.affinity['number_of_partitions'] = parts
+
+                for conn in self.client._nodes:
+                    if not conn.alive:
+                        conn.reconnect()
             else:
                 # get number of partitions
                 parts = self.affinity.get('number_of_partitions')
diff --git a/pyignite/client.py b/pyignite/client.py
index 83cb196..77c6373 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -182,15 +182,9 @@
                     if not self.partition_aware:
                         # do not try to open more nodes
                         self._current_node = i
-                    else:
-                        # take a chance to schedule the reconnection
-                        # for all the failed connections, that was probed
-                        # before this
-                        for failed_node in self._nodes[:i]:
-                            failed_node.reconnect()
 
             except connection_errors:
-                conn._fail()
+                conn.failed = True
                 if self.partition_aware:
                     # schedule the reconnection
                     conn.reconnect()
diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py
index 0e793f8..1114594 100644
--- a/pyignite/connection/__init__.py
+++ b/pyignite/connection/__init__.py
@@ -33,414 +33,6 @@
 as well as Ignite protocol handshaking.
 """
 
-from collections import OrderedDict
-import socket
-from threading import RLock
-from typing import Union
-
-from pyignite.constants import *
-from pyignite.exceptions import (
-    HandshakeError, ParameterError, SocketError, connection_errors,
-)
-from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
-from pyignite.datatypes.internal import Struct
-from pyignite.utils import DaemonicTimer
-
-from .handshake import HandshakeRequest
-from .ssl import wrap
-
+from .connection import Connection
 
 __all__ = ['Connection']
-
-from ..stream import BinaryStream, READ_BACKWARD
-
-
-class Connection:
-    """
-    This is a `pyignite` class, that represents a connection to Ignite
-    node. It serves multiple purposes:
-
-     * socket wrapper. Detects fragmentation and network errors. See also
-       https://docs.python.org/3/howto/sockets.html,
-     * binary protocol connector. Incapsulates handshake and failover reconnection.
-    """
-
-    _socket = None
-    _failed = None
-    _in_use = None
-
-    client = None
-    host = None
-    port = None
-    timeout = None
-    username = None
-    password = None
-    ssl_params = {}
-    uuid = None
-
-    @staticmethod
-    def _check_ssl_params(params):
-        expected_args = [
-            'use_ssl',
-            'ssl_version',
-            'ssl_ciphers',
-            'ssl_cert_reqs',
-            'ssl_keyfile',
-            'ssl_keyfile_password',
-            'ssl_certfile',
-            'ssl_ca_certfile',
-        ]
-        for param in params:
-            if param not in expected_args:
-                raise ParameterError((
-                    'Unexpected parameter for connection initialization: `{}`'
-                ).format(param))
-
-    def __init__(
-        self, client: 'Client', timeout: float = 2.0,
-        username: str = None, password: str = None, **ssl_params
-    ):
-        """
-        Initialize connection.
-
-        For the use of the SSL-related parameters see
-        https://docs.python.org/3/library/ssl.html#ssl-certificates.
-
-        :param client: Ignite client object,
-        :param timeout: (optional) sets timeout (in seconds) for each socket
-         operation including `connect`. 0 means non-blocking mode, which is
-         virtually guaranteed to fail. Can accept integer or float value.
-         Default is None (blocking mode),
-        :param use_ssl: (optional) set to True if Ignite server uses SSL
-         on its binary connector. Defaults to use SSL when username
-         and password has been supplied, not to use SSL otherwise,
-        :param ssl_version: (optional) SSL version constant from standard
-         `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
-        :param ssl_ciphers: (optional) ciphers to use. If not provided,
-         `ssl` default ciphers are used,
-        :param ssl_cert_reqs: (optional) determines how the remote side
-         certificate is treated:
-
-         * `ssl.CERT_NONE` − remote certificate is ignored (default),
-         * `ssl.CERT_OPTIONAL` − remote certificate will be validated,
-           if provided,
-         * `ssl.CERT_REQUIRED` − valid remote certificate is required,
-
-        :param ssl_keyfile: (optional) a path to SSL key file to identify
-         local (client) party,
-        :param ssl_keyfile_password: (optional) password for SSL key file,
-         can be provided when key file is encrypted to prevent OpenSSL
-         password prompt,
-        :param ssl_certfile: (optional) a path to ssl certificate file
-         to identify local (client) party,
-        :param ssl_ca_certfile: (optional) a path to a trusted certificate
-         or a certificate chain. Required to check the validity of the remote
-         (server-side) certificate,
-        :param username: (optional) user name to authenticate to Ignite
-         cluster,
-        :param password: (optional) password to authenticate to Ignite cluster.
-        """
-        self.client = client
-        self.timeout = timeout
-        self.username = username
-        self.password = password
-        self._check_ssl_params(ssl_params)
-        if self.username and self.password and 'use_ssl' not in ssl_params:
-            ssl_params['use_ssl'] = True
-        self.ssl_params = ssl_params
-        self._failed = False
-        self._mux = RLock()
-        self._in_use = False
-
-    @property
-    def socket(self) -> socket.socket:
-        """ Network socket. """
-        return self._socket
-
-    @property
-    def closed(self) -> bool:
-        """ Tells if socket is closed. """
-        with self._mux:
-            return self._socket is None
-
-    @property
-    def failed(self) -> bool:
-        """ Tells if connection is failed. """
-        with self._mux:
-            return self._failed
-
-    @property
-    def alive(self) -> bool:
-        """ Tells if connection is up and no failure detected. """
-        with self._mux:
-            return not (self._failed or self.closed)
-
-    def __repr__(self) -> str:
-        return '{}:{}'.format(self.host or '?', self.port or '?')
-
-    _wrap = wrap
-
-    def get_protocol_version(self):
-        """
-        Returns the tuple of major, minor, and revision numbers of the used
-        thin protocol version, or None, if no connection to the Ignite cluster
-        was yet established.
-        """
-        return self.client.protocol_version
-
-    def _fail(self):
-        """ set client to failed state. """
-        with self._mux:
-            self._failed = True
-
-            self._in_use = False
-
-    def read_response(self) -> Union[dict, OrderedDict]:
-        """
-        Processes server's response to the handshake request.
-
-        :return: handshake data.
-        """
-        response_start = Struct([
-            ('length', Int),
-            ('op_code', Byte),
-        ])
-        with BinaryStream(self, self.recv()) as stream:
-            start_class = response_start.parse(stream)
-            start = stream.read_ctype(start_class, direction=READ_BACKWARD)
-            data = response_start.to_python(start)
-            response_end = None
-            if data['op_code'] == 0:
-                response_end = Struct([
-                    ('version_major', Short),
-                    ('version_minor', Short),
-                    ('version_patch', Short),
-                    ('message', String),
-                ])
-            elif self.get_protocol_version() >= (1, 4, 0):
-                response_end = Struct([
-                    ('node_uuid', UUIDObject),
-                ])
-            if response_end:
-                end_class = response_end.parse(stream)
-                end = stream.read_ctype(end_class, direction=READ_BACKWARD)
-                data.update(response_end.to_python(end))
-            return data
-
-    def connect(
-        self, host: str = None, port: int = None
-    ) -> Union[dict, OrderedDict]:
-        """
-        Connect to the given server node with protocol version fallback.
-
-        :param host: Ignite server node's host name or IP,
-        :param port: Ignite server node's port number.
-        """
-        detecting_protocol = False
-
-        with self._mux:
-            if self._in_use:
-                raise ConnectionError('Connection is in use.')
-            self._in_use = True
-
-        # choose highest version first
-        if self.client.protocol_version is None:
-            detecting_protocol = True
-            self.client.protocol_version = max(PROTOCOLS)
-
-        try:
-            result = self._connect_version(host, port)
-        except HandshakeError as e:
-            if e.expected_version in PROTOCOLS:
-                self.client.protocol_version = e.expected_version
-                result = self._connect_version(host, port)
-            else:
-                raise e
-        except connection_errors:
-            # restore undefined protocol version
-            if detecting_protocol:
-                self.client.protocol_version = None
-            raise
-
-        # connection is ready for end user
-        self.uuid = result.get('node_uuid', None)  # version-specific (1.4+)
-
-        self._failed = False
-        return result
-
-    def _connect_version(
-        self, host: str = None, port: int = None,
-    ) -> Union[dict, OrderedDict]:
-        """
-        Connect to the given server node using protocol version
-        defined on client.
-
-        :param host: Ignite server node's host name or IP,
-        :param port: Ignite server node's port number.
-        """
-
-        host = host or IGNITE_DEFAULT_HOST
-        port = port or IGNITE_DEFAULT_PORT
-
-        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self._socket.settimeout(self.timeout)
-        self._socket = self._wrap(self.socket)
-        self._socket.connect((host, port))
-
-        protocol_version = self.client.protocol_version
-
-        hs_request = HandshakeRequest(
-            protocol_version,
-            self.username,
-            self.password
-        )
-
-        with BinaryStream(self) as stream:
-            hs_request.from_python(stream)
-            self.send(stream.getbuffer())
-
-        hs_response = self.read_response()
-        if hs_response['op_code'] == 0:
-            # disconnect but keep in use
-            self.close(release=False)
-
-            error_text = 'Handshake error: {}'.format(hs_response['message'])
-            # if handshake fails for any reason other than protocol mismatch
-            # (i.e. authentication error), server version is 0.0.0
-            if any([
-                hs_response['version_major'],
-                hs_response['version_minor'],
-                hs_response['version_patch'],
-            ]):
-                error_text += (
-                    ' Server expects binary protocol version '
-                    '{version_major}.{version_minor}.{version_patch}. Client '
-                    'provides {client_major}.{client_minor}.{client_patch}.'
-                ).format(
-                    client_major=protocol_version[0],
-                    client_minor=protocol_version[1],
-                    client_patch=protocol_version[2],
-                    **hs_response
-                )
-            raise HandshakeError((
-                hs_response['version_major'],
-                hs_response['version_minor'],
-                hs_response['version_patch'],
-            ), error_text)
-        self.host, self.port = host, port
-        return hs_response
-
-    def reconnect(self, seq_no=0):
-        """
-        Tries to reconnect synchronously, then in background.
-        """
-
-        # stop trying to reconnect
-        if seq_no >= len(RECONNECT_BACKOFF_SEQUENCE):
-            self._failed = False
-
-        self._reconnect()
-
-        if self.failed:
-            DaemonicTimer(
-                RECONNECT_BACKOFF_SEQUENCE[seq_no],
-                self.reconnect,
-                kwargs={'seq_no': seq_no + 1},
-            ).start()
-
-    def _reconnect(self):
-        # do not reconnect if connection is already working
-        # or was closed on purpose
-        if not self.failed:
-            return
-
-        self.close()
-
-        # connect and silence the connection errors
-        try:
-            self.connect(self.host, self.port)
-        except connection_errors:
-            pass
-
-    def _transfer_params(self, to: 'Connection'):
-        """
-        Transfer non-SSL parameters to target connection object.
-
-        :param to: connection object to transfer parameters to.
-        """
-        to.username = self.username
-        to.password = self.password
-        to.client = self.client
-        to.host = self.host
-        to.port = self.port
-
-    def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
-        """
-        Send data down the socket.
-
-        :param data: bytes to send,
-        :param flags: (optional) OS-specific flags.
-        """
-        if self.closed:
-            raise SocketError('Attempt to use closed connection.')
-
-        kwargs = {}
-        if flags is not None:
-            kwargs['flags'] = flags
-
-        try:
-            self.socket.sendall(data, **kwargs)
-        except Exception:
-            self._fail()
-            self.reconnect()
-            raise
-
-    def recv(self, flags=None) -> bytearray:
-        def _recv(buffer, num_bytes):
-            bytes_to_receive = num_bytes
-            while bytes_to_receive > 0:
-                try:
-                    bytes_rcvd = self.socket.recv_into(buffer, bytes_to_receive, **kwargs)
-                    if bytes_rcvd == 0:
-                        raise SocketError('Connection broken.')
-                except connection_errors:
-                    self._fail()
-                    self.reconnect()
-                    raise
-
-                buffer = buffer[bytes_rcvd:]
-                bytes_to_receive -= bytes_rcvd
-
-        if self.closed:
-            raise SocketError('Attempt to use closed connection.')
-
-        kwargs = {}
-        if flags is not None:
-            kwargs['flags'] = flags
-
-        data = bytearray(4)
-        _recv(memoryview(data), 4)
-        response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
-
-        data.extend(bytearray(response_len))
-        _recv(memoryview(data)[4:], response_len)
-        return data
-
-
-    def close(self, release=True):
-        """
-        Try to mark socket closed, then unlink it. This is recommended but
-        not required, since sockets are automatically closed when
-        garbage-collected.
-        """
-        with self._mux:
-            if self._socket:
-                try:
-                    self._socket.shutdown(socket.SHUT_RDWR)
-                    self._socket.close()
-                except connection_errors:
-                    pass
-                self._socket = None
-
-            if release:
-                self._in_use = False
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
new file mode 100644
index 0000000..6ab6c6a
--- /dev/null
+++ b/pyignite/connection/connection.py
@@ -0,0 +1,381 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+import socket
+from typing import Union
+
+from pyignite.constants import *
+from pyignite.exceptions import (
+    HandshakeError, ParameterError, SocketError, connection_errors,
+)
+from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
+from pyignite.datatypes.internal import Struct
+
+from .handshake import HandshakeRequest
+from .ssl import wrap
+from ..stream import BinaryStream, READ_BACKWARD
+
+
+class Connection:
+    """
+    This is a `pyignite` class, that represents a connection to Ignite
+    node. It serves multiple purposes:
+
+     * socket wrapper. Detects fragmentation and network errors. See also
+       https://docs.python.org/3/howto/sockets.html,
+     * binary protocol connector. Incapsulates handshake and failover reconnection.
+    """
+
+    _socket = None
+    _failed = None
+
+    client = None
+    host = None
+    port = None
+    timeout = None
+    username = None
+    password = None
+    ssl_params = {}
+    uuid = None
+
+    @staticmethod
+    def _check_ssl_params(params):
+        expected_args = [
+            'use_ssl',
+            'ssl_version',
+            'ssl_ciphers',
+            'ssl_cert_reqs',
+            'ssl_keyfile',
+            'ssl_keyfile_password',
+            'ssl_certfile',
+            'ssl_ca_certfile',
+        ]
+        for param in params:
+            if param not in expected_args:
+                raise ParameterError((
+                    'Unexpected parameter for connection initialization: `{}`'
+                ).format(param))
+
+    def __init__(
+        self, client: 'Client', timeout: float = 2.0,
+        username: str = None, password: str = None, **ssl_params
+    ):
+        """
+        Initialize connection.
+
+        For the use of the SSL-related parameters see
+        https://docs.python.org/3/library/ssl.html#ssl-certificates.
+
+        :param client: Ignite client object,
+        :param timeout: (optional) sets timeout (in seconds) for each socket
+         operation including `connect`. 0 means non-blocking mode, which is
+         virtually guaranteed to fail. Can accept integer or float value.
+         Default is None (blocking mode),
+        :param use_ssl: (optional) set to True if Ignite server uses SSL
+         on its binary connector. Defaults to use SSL when username
+         and password has been supplied, not to use SSL otherwise,
+        :param ssl_version: (optional) SSL version constant from standard
+         `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
+        :param ssl_ciphers: (optional) ciphers to use. If not provided,
+         `ssl` default ciphers are used,
+        :param ssl_cert_reqs: (optional) determines how the remote side
+         certificate is treated:
+
+         * `ssl.CERT_NONE` − remote certificate is ignored (default),
+         * `ssl.CERT_OPTIONAL` − remote certificate will be validated,
+           if provided,
+         * `ssl.CERT_REQUIRED` − valid remote certificate is required,
+
+        :param ssl_keyfile: (optional) a path to SSL key file to identify
+         local (client) party,
+        :param ssl_keyfile_password: (optional) password for SSL key file,
+         can be provided when key file is encrypted to prevent OpenSSL
+         password prompt,
+        :param ssl_certfile: (optional) a path to ssl certificate file
+         to identify local (client) party,
+        :param ssl_ca_certfile: (optional) a path to a trusted certificate
+         or a certificate chain. Required to check the validity of the remote
+         (server-side) certificate,
+        :param username: (optional) user name to authenticate to Ignite
+         cluster,
+        :param password: (optional) password to authenticate to Ignite cluster.
+        """
+        self.client = client
+        self.timeout = timeout
+        self.username = username
+        self.password = password
+        self._check_ssl_params(ssl_params)
+        if self.username and self.password and 'use_ssl' not in ssl_params:
+            ssl_params['use_ssl'] = True
+        self.ssl_params = ssl_params
+        self._failed = False
+
+    @property
+    def closed(self) -> bool:
+        """ Tells if socket is closed. """
+        return self._socket is None
+
+    @property
+    def failed(self) -> bool:
+        """ Tells if connection is failed. """
+        return self._failed
+
+    @failed.setter
+    def failed(self, value):
+        self._failed = value
+
+    @property
+    def alive(self) -> bool:
+        """ Tells if connection is up and no failure detected. """
+        return not self.failed and not self.closed
+
+    def __repr__(self) -> str:
+        return '{}:{}'.format(self.host or '?', self.port or '?')
+
+    _wrap = wrap
+
+    def get_protocol_version(self):
+        """
+        Returns the tuple of major, minor, and revision numbers of the used
+        thin protocol version, or None, if no connection to the Ignite cluster
+        was yet established.
+        """
+        return self.client.protocol_version
+
+    def read_response(self) -> Union[dict, OrderedDict]:
+        """
+        Processes server's response to the handshake request.
+
+        :return: handshake data.
+        """
+        response_start = Struct([
+            ('length', Int),
+            ('op_code', Byte),
+        ])
+        with BinaryStream(self, self.recv()) as stream:
+            start_class = response_start.parse(stream)
+            start = stream.read_ctype(start_class, direction=READ_BACKWARD)
+            data = response_start.to_python(start)
+            response_end = None
+            if data['op_code'] == 0:
+                response_end = Struct([
+                    ('version_major', Short),
+                    ('version_minor', Short),
+                    ('version_patch', Short),
+                    ('message', String),
+                ])
+            elif self.get_protocol_version() >= (1, 4, 0):
+                response_end = Struct([
+                    ('node_uuid', UUIDObject),
+                ])
+            if response_end:
+                end_class = response_end.parse(stream)
+                end = stream.read_ctype(end_class, direction=READ_BACKWARD)
+                data.update(response_end.to_python(end))
+            return data
+
+    def connect(
+        self, host: str = None, port: int = None
+    ) -> Union[dict, OrderedDict]:
+        """
+        Connect to the given server node with protocol version fallback.
+
+        :param host: Ignite server node's host name or IP,
+        :param port: Ignite server node's port number.
+        """
+        detecting_protocol = False
+
+        # choose highest version first
+        if self.client.protocol_version is None:
+            detecting_protocol = True
+            self.client.protocol_version = max(PROTOCOLS)
+
+        try:
+            result = self._connect_version(host, port)
+        except HandshakeError as e:
+            if e.expected_version in PROTOCOLS:
+                self.client.protocol_version = e.expected_version
+                result = self._connect_version(host, port)
+            else:
+                raise e
+        except connection_errors:
+            # restore undefined protocol version
+            if detecting_protocol:
+                self.client.protocol_version = None
+            raise
+
+        # connection is ready for end user
+        self.uuid = result.get('node_uuid', None)  # version-specific (1.4+)
+
+        self.failed = False
+        return result
+
+    def _connect_version(
+        self, host: str = None, port: int = None,
+    ) -> Union[dict, OrderedDict]:
+        """
+        Connect to the given server node using protocol version
+        defined on client.
+
+        :param host: Ignite server node's host name or IP,
+        :param port: Ignite server node's port number.
+        """
+
+        host = host or IGNITE_DEFAULT_HOST
+        port = port or IGNITE_DEFAULT_PORT
+
+        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self._socket.settimeout(self.timeout)
+        self._socket = self._wrap(self._socket)
+        self._socket.connect((host, port))
+
+        protocol_version = self.client.protocol_version
+
+        hs_request = HandshakeRequest(
+            protocol_version,
+            self.username,
+            self.password
+        )
+
+        with BinaryStream(self) as stream:
+            hs_request.from_python(stream)
+            self.send(stream.getbuffer())
+
+        hs_response = self.read_response()
+        if hs_response['op_code'] == 0:
+            self.close()
+
+            error_text = 'Handshake error: {}'.format(hs_response['message'])
+            # if handshake fails for any reason other than protocol mismatch
+            # (i.e. authentication error), server version is 0.0.0
+            if any([
+                hs_response['version_major'],
+                hs_response['version_minor'],
+                hs_response['version_patch'],
+            ]):
+                error_text += (
+                    ' Server expects binary protocol version '
+                    '{version_major}.{version_minor}.{version_patch}. Client '
+                    'provides {client_major}.{client_minor}.{client_patch}.'
+                ).format(
+                    client_major=protocol_version[0],
+                    client_minor=protocol_version[1],
+                    client_patch=protocol_version[2],
+                    **hs_response
+                )
+            raise HandshakeError((
+                hs_response['version_major'],
+                hs_response['version_minor'],
+                hs_response['version_patch'],
+            ), error_text)
+        self.host, self.port = host, port
+        return hs_response
+
+    def reconnect(self):
+        # do not reconnect if connection is already working
+        # or was closed on purpose
+        if not self.failed:
+            return
+
+        self.close()
+
+        # connect and silence the connection errors
+        try:
+            self.connect(self.host, self.port)
+        except connection_errors:
+            pass
+
+    def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
+        """
+        Send data down the socket.
+
+        :param data: bytes to send,
+        :param flags: (optional) OS-specific flags.
+        """
+        if self.closed:
+            raise SocketError('Attempt to use closed connection.')
+
+        kwargs = {}
+        if flags is not None:
+            kwargs['flags'] = flags
+
+        try:
+            self._socket.sendall(data, **kwargs)
+        except connection_errors:
+            self.failed = True
+            self.reconnect()
+            raise
+
+    def recv(self, flags=None) -> bytearray:
+        def _recv(buffer, num_bytes):
+            bytes_to_receive = num_bytes
+            while bytes_to_receive > 0:
+                try:
+                    bytes_rcvd = self._socket.recv_into(buffer, bytes_to_receive, **kwargs)
+                    if bytes_rcvd == 0:
+                        raise SocketError('Connection broken.')
+                except connection_errors:
+                    self.failed = True
+                    self.reconnect()
+                    raise
+
+                buffer = buffer[bytes_rcvd:]
+                bytes_to_receive -= bytes_rcvd
+
+        if self.closed:
+            raise SocketError('Attempt to use closed connection.')
+
+        kwargs = {}
+        if flags is not None:
+            kwargs['flags'] = flags
+
+        data = bytearray(4)
+        _recv(memoryview(data), 4)
+        response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
+
+        data.extend(bytearray(response_len))
+        _recv(memoryview(data)[4:], response_len)
+        return data
+
+    def close(self):
+        """
+        Try to mark socket closed, then unlink it. This is recommended but
+        not required, since sockets are automatically closed when
+        garbage-collected.
+        """
+        if self._socket:
+            try:
+                self._socket.shutdown(socket.SHUT_RDWR)
+                self._socket.close()
+            except connection_errors:
+                pass
+
+            self._socket = None
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index aed3cda..b8d9c02 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -564,8 +564,8 @@
 
     @classmethod
     def from_python_not_null(cls, stream, value):
-        stream.register_binary_type(value.__class__)
         if getattr(value, '_buffer', None):
             stream.write(value._buffer)
         else:
+            stream.register_binary_type(value.__class__)
             value._from_python(stream)
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 0111a22..a6da9fe 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -18,7 +18,7 @@
 import decimal
 from datetime import date, datetime, timedelta
 from io import SEEK_CUR
-from typing import Any, Tuple, Union, Callable
+from typing import Any, Tuple, Union, Callable, List
 import uuid
 
 import attr
@@ -115,8 +115,9 @@
 
 
 class Conditional:
-
-    def __init__(self, predicate1: Callable[[any], bool], predicate2: Callable[[any], bool], var1, var2):
+    def __init__(self, fields: List, predicate1: Callable[[any], bool],
+                 predicate2: Callable[[any], bool], var1, var2):
+        self.fields = fields
         self.predicate1 = predicate1
         self.predicate2 = predicate2
         self.var1 = var1
@@ -209,12 +210,19 @@
     defaults = attr.ib(type=dict, default={})
 
     def parse(self, stream):
-        fields, values = [], {}
+        fields, ctx = [], {}
+
+        for _, c_type in self.fields:
+            if isinstance(c_type, Conditional):
+                for name in c_type.fields:
+                    ctx[name] = None
+
         for name, c_type in self.fields:
             is_cond = isinstance(c_type, Conditional)
-            c_type = c_type.parse(stream, values) if is_cond else c_type.parse(stream)
+            c_type = c_type.parse(stream, ctx) if is_cond else c_type.parse(stream)
             fields.append((name, c_type))
-            values[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
+            if name in ctx:
+                ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
 
         data_class = type(
             'Struct',
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 5bd114b..b5be753 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -105,9 +105,11 @@
         # this test depends on protocol version
         if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED:
             # update latest affinity version
-            conn.client.affinity_version = (
-                response.affinity_version, response.affinity_minor
-            )
+            new_affinity = (response.affinity_version, response.affinity_minor)
+            old_affinity = conn.client.affinity_version
+
+            if new_affinity > old_affinity:
+                conn.client.affinity_version = new_affinity
 
         # build result
         result = APIResult(response)
diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py
index 1ecdcfb..46ac683 100644
--- a/pyignite/stream/binary_stream.py
+++ b/pyignite/stream/binary_stream.py
@@ -95,7 +95,10 @@
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        self.stream.close()
+        try:
+            self.stream.close()
+        except BufferError:
+            pass
 
     def get_dataclass(self, header):
         # get field names from outer space
diff --git a/pyignite/utils.py b/pyignite/utils.py
index 3d0378f..6c636ae 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -18,7 +18,6 @@
 import warnings
 
 from functools import wraps
-from threading import Event, Thread
 from typing import Any, Optional, Type, Tuple, Union
 
 from pyignite.datatypes.base import IgniteDataType
@@ -255,30 +254,6 @@
     return c_type(value).value
 
 
-class DaemonicTimer(Thread):
-    """
-    Same as normal `threading.Timer`, but do not delay the program exit.
-    """
-
-    def __init__(self, interval, function, args=None, kwargs=None):
-        Thread.__init__(self, daemon=True)
-        self.interval = interval
-        self.function = function
-        self.args = args if args is not None else []
-        self.kwargs = kwargs if kwargs is not None else {}
-        self.finished = Event()
-
-    def cancel(self):
-        """Stop the timer if it hasn't finished yet."""
-        self.finished.set()
-
-    def run(self):
-        self.finished.wait(self.interval)
-        if not self.finished.is_set():
-            self.function(*self.args, **self.kwargs)
-        self.finished.set()
-
-
 def capitalize(string: str) -> str:
     """
     Capitalizing the string, assuming the first character is a letter.
diff --git a/tests/config/log4j.xml.jinja2 b/tests/config/log4j.xml.jinja2
index 628f66c..983ae9e 100644
--- a/tests/config/log4j.xml.jinja2
+++ b/tests/config/log4j.xml.jinja2
@@ -33,7 +33,6 @@
         </RollingFile>
     </Appenders>
     <Loggers>
-        <Logger name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener" level="debug"/>
         <Root level="info">
             <AppenderRef ref="CONSOLE"/>
             <AppenderRef ref="FILE"/>
diff --git a/tests/conftest.py b/tests/conftest.py
index 54a7fda..bc8804d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -120,7 +120,7 @@
         password
 ):
     node = node[:1]
-    yield from client(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile,
+    yield from client0(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile,
                       ssl_cert_reqs, ssl_ciphers, ssl_version, username, password)
 
 
@@ -211,7 +211,7 @@
         '--timeout',
         action='store',
         type=float,
-        default=None,
+        default=2.0,
         help=(
             'Timeout (in seconds) for each socket operation. Can accept '
             'integer or float value. Default is None'
diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py
index 866222b..3489dea 100644
--- a/tests/test_affinity_request_routing.py
+++ b/tests/test_affinity_request_routing.py
@@ -18,6 +18,7 @@
 
 from pyignite import *
 from pyignite.connection import Connection
+from pyignite.constants import PROTOCOL_BYTE_ORDER
 from pyignite.datatypes import *
 from pyignite.datatypes.cache_config import CacheMode
 from pyignite.datatypes.prop_codes import *
@@ -30,7 +31,12 @@
 
 def patched_send(self, *args, **kwargs):
     """Patched send function that push to queue idx of server to which request is routed."""
-    requests.append(self.port % 100)
+    buf = args[0]
+    if buf and len(buf) >= 6:
+        op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
+        # Filter only caches operation.
+        if 1000 <= op_code < 1100:
+            requests.append(self.port % 100)
     return old_send(self, *args, **kwargs)
 
 
diff --git a/tox.ini b/tox.ini
index 4361413..eb7d1a6 100644
--- a/tox.ini
+++ b/tox.ini
@@ -17,12 +17,6 @@
 skipsdist = True
 envlist = py{36,37,38}-{no-ssl,ssl,ssl-password}
 
-[travis]
-python =
-  3.6: py36-{no-ssl,ssl,ssl-password}
-  3.7: py37-{no-ssl,ssl,ssl-password}
-  3.8: py38-{no-ssl,ssl,ssl-password}
-
 [testenv]
 passenv = TEAMCITY_VERSION IGNITE_HOME
 envdir = {homedir}/.virtualenvs/pyignite-{envname}