IGNITE-14167 Simplify reconnecting, fix affinity topology change detection
This closes #16
diff --git a/.travis.yml b/.travis.yml
index f884bdb..3095941 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+language: python
sudo: required
addons:
@@ -21,7 +22,9 @@
- openjdk-8-jdk
env:
- - IGNITE_VERSION=2.9.1 IGNITE_HOME=/opt/ignite
+ global:
+ - IGNITE_VERSION=2.9.1
+ - IGNITE_HOME=/opt/ignite
before_install:
- curl -L https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip > ignite.zip
@@ -29,10 +32,17 @@
- mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
- mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
-language: python
-python:
- - "3.6"
- - "3.7"
- - "3.8"
-install: pip install tox-travis
+jobs:
+ include:
+ - python: '3.6'
+ arch: amd64
+ env: TOXENV=py36-no-ssl,py36-ssl,py36-ssl-password
+ - python: '3.7'
+ arch: amd64
+ env: TOXENV=py37-no-ssl,py37-ssl,py37-ssl-password
+ - python: '3.8'
+ arch: amd64
+ env: TOXENV=py38-no-ssl,py38-ssl,py38-ssl-password
+
+install: pip install tox
script: tox
\ No newline at end of file
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
index 16148a1..7d09517 100644
--- a/pyignite/api/affinity.py
+++ b/pyignite/api/affinity.py
@@ -55,11 +55,13 @@
partition_mapping = StructArray([
('is_applicable', Bool),
- ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+ ('cache_mapping', Conditional(['is_applicable'],
+ lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
lambda ctx: ctx['is_applicable'],
cache_mapping, empty_cache_mapping)),
- ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+ ('node_mapping', Conditional(['is_applicable'],
+ lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
lambda ctx: ctx['is_applicable'],
node_mapping, empty_node_mapping)),
])
diff --git a/pyignite/cache.py b/pyignite/cache.py
index dd7dac4..ea672a8 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -283,6 +283,10 @@
parts += len(p)
self.affinity['number_of_partitions'] = parts
+
+ for conn in self.client._nodes:
+ if not conn.alive:
+ conn.reconnect()
else:
# get number of partitions
parts = self.affinity.get('number_of_partitions')
diff --git a/pyignite/client.py b/pyignite/client.py
index 83cb196..77c6373 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -182,15 +182,9 @@
if not self.partition_aware:
# do not try to open more nodes
self._current_node = i
- else:
- # take a chance to schedule the reconnection
- # for all the failed connections, that was probed
- # before this
- for failed_node in self._nodes[:i]:
- failed_node.reconnect()
except connection_errors:
- conn._fail()
+ conn.failed = True
if self.partition_aware:
# schedule the reconnection
conn.reconnect()
diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py
index 0e793f8..1114594 100644
--- a/pyignite/connection/__init__.py
+++ b/pyignite/connection/__init__.py
@@ -33,414 +33,6 @@
as well as Ignite protocol handshaking.
"""
-from collections import OrderedDict
-import socket
-from threading import RLock
-from typing import Union
-
-from pyignite.constants import *
-from pyignite.exceptions import (
- HandshakeError, ParameterError, SocketError, connection_errors,
-)
-from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
-from pyignite.datatypes.internal import Struct
-from pyignite.utils import DaemonicTimer
-
-from .handshake import HandshakeRequest
-from .ssl import wrap
-
+from .connection import Connection
__all__ = ['Connection']
-
-from ..stream import BinaryStream, READ_BACKWARD
-
-
-class Connection:
- """
- This is a `pyignite` class, that represents a connection to Ignite
- node. It serves multiple purposes:
-
- * socket wrapper. Detects fragmentation and network errors. See also
- https://docs.python.org/3/howto/sockets.html,
- * binary protocol connector. Incapsulates handshake and failover reconnection.
- """
-
- _socket = None
- _failed = None
- _in_use = None
-
- client = None
- host = None
- port = None
- timeout = None
- username = None
- password = None
- ssl_params = {}
- uuid = None
-
- @staticmethod
- def _check_ssl_params(params):
- expected_args = [
- 'use_ssl',
- 'ssl_version',
- 'ssl_ciphers',
- 'ssl_cert_reqs',
- 'ssl_keyfile',
- 'ssl_keyfile_password',
- 'ssl_certfile',
- 'ssl_ca_certfile',
- ]
- for param in params:
- if param not in expected_args:
- raise ParameterError((
- 'Unexpected parameter for connection initialization: `{}`'
- ).format(param))
-
- def __init__(
- self, client: 'Client', timeout: float = 2.0,
- username: str = None, password: str = None, **ssl_params
- ):
- """
- Initialize connection.
-
- For the use of the SSL-related parameters see
- https://docs.python.org/3/library/ssl.html#ssl-certificates.
-
- :param client: Ignite client object,
- :param timeout: (optional) sets timeout (in seconds) for each socket
- operation including `connect`. 0 means non-blocking mode, which is
- virtually guaranteed to fail. Can accept integer or float value.
- Default is None (blocking mode),
- :param use_ssl: (optional) set to True if Ignite server uses SSL
- on its binary connector. Defaults to use SSL when username
- and password has been supplied, not to use SSL otherwise,
- :param ssl_version: (optional) SSL version constant from standard
- `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
- :param ssl_ciphers: (optional) ciphers to use. If not provided,
- `ssl` default ciphers are used,
- :param ssl_cert_reqs: (optional) determines how the remote side
- certificate is treated:
-
- * `ssl.CERT_NONE` − remote certificate is ignored (default),
- * `ssl.CERT_OPTIONAL` − remote certificate will be validated,
- if provided,
- * `ssl.CERT_REQUIRED` − valid remote certificate is required,
-
- :param ssl_keyfile: (optional) a path to SSL key file to identify
- local (client) party,
- :param ssl_keyfile_password: (optional) password for SSL key file,
- can be provided when key file is encrypted to prevent OpenSSL
- password prompt,
- :param ssl_certfile: (optional) a path to ssl certificate file
- to identify local (client) party,
- :param ssl_ca_certfile: (optional) a path to a trusted certificate
- or a certificate chain. Required to check the validity of the remote
- (server-side) certificate,
- :param username: (optional) user name to authenticate to Ignite
- cluster,
- :param password: (optional) password to authenticate to Ignite cluster.
- """
- self.client = client
- self.timeout = timeout
- self.username = username
- self.password = password
- self._check_ssl_params(ssl_params)
- if self.username and self.password and 'use_ssl' not in ssl_params:
- ssl_params['use_ssl'] = True
- self.ssl_params = ssl_params
- self._failed = False
- self._mux = RLock()
- self._in_use = False
-
- @property
- def socket(self) -> socket.socket:
- """ Network socket. """
- return self._socket
-
- @property
- def closed(self) -> bool:
- """ Tells if socket is closed. """
- with self._mux:
- return self._socket is None
-
- @property
- def failed(self) -> bool:
- """ Tells if connection is failed. """
- with self._mux:
- return self._failed
-
- @property
- def alive(self) -> bool:
- """ Tells if connection is up and no failure detected. """
- with self._mux:
- return not (self._failed or self.closed)
-
- def __repr__(self) -> str:
- return '{}:{}'.format(self.host or '?', self.port or '?')
-
- _wrap = wrap
-
- def get_protocol_version(self):
- """
- Returns the tuple of major, minor, and revision numbers of the used
- thin protocol version, or None, if no connection to the Ignite cluster
- was yet established.
- """
- return self.client.protocol_version
-
- def _fail(self):
- """ set client to failed state. """
- with self._mux:
- self._failed = True
-
- self._in_use = False
-
- def read_response(self) -> Union[dict, OrderedDict]:
- """
- Processes server's response to the handshake request.
-
- :return: handshake data.
- """
- response_start = Struct([
- ('length', Int),
- ('op_code', Byte),
- ])
- with BinaryStream(self, self.recv()) as stream:
- start_class = response_start.parse(stream)
- start = stream.read_ctype(start_class, direction=READ_BACKWARD)
- data = response_start.to_python(start)
- response_end = None
- if data['op_code'] == 0:
- response_end = Struct([
- ('version_major', Short),
- ('version_minor', Short),
- ('version_patch', Short),
- ('message', String),
- ])
- elif self.get_protocol_version() >= (1, 4, 0):
- response_end = Struct([
- ('node_uuid', UUIDObject),
- ])
- if response_end:
- end_class = response_end.parse(stream)
- end = stream.read_ctype(end_class, direction=READ_BACKWARD)
- data.update(response_end.to_python(end))
- return data
-
- def connect(
- self, host: str = None, port: int = None
- ) -> Union[dict, OrderedDict]:
- """
- Connect to the given server node with protocol version fallback.
-
- :param host: Ignite server node's host name or IP,
- :param port: Ignite server node's port number.
- """
- detecting_protocol = False
-
- with self._mux:
- if self._in_use:
- raise ConnectionError('Connection is in use.')
- self._in_use = True
-
- # choose highest version first
- if self.client.protocol_version is None:
- detecting_protocol = True
- self.client.protocol_version = max(PROTOCOLS)
-
- try:
- result = self._connect_version(host, port)
- except HandshakeError as e:
- if e.expected_version in PROTOCOLS:
- self.client.protocol_version = e.expected_version
- result = self._connect_version(host, port)
- else:
- raise e
- except connection_errors:
- # restore undefined protocol version
- if detecting_protocol:
- self.client.protocol_version = None
- raise
-
- # connection is ready for end user
- self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
-
- self._failed = False
- return result
-
- def _connect_version(
- self, host: str = None, port: int = None,
- ) -> Union[dict, OrderedDict]:
- """
- Connect to the given server node using protocol version
- defined on client.
-
- :param host: Ignite server node's host name or IP,
- :param port: Ignite server node's port number.
- """
-
- host = host or IGNITE_DEFAULT_HOST
- port = port or IGNITE_DEFAULT_PORT
-
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.settimeout(self.timeout)
- self._socket = self._wrap(self.socket)
- self._socket.connect((host, port))
-
- protocol_version = self.client.protocol_version
-
- hs_request = HandshakeRequest(
- protocol_version,
- self.username,
- self.password
- )
-
- with BinaryStream(self) as stream:
- hs_request.from_python(stream)
- self.send(stream.getbuffer())
-
- hs_response = self.read_response()
- if hs_response['op_code'] == 0:
- # disconnect but keep in use
- self.close(release=False)
-
- error_text = 'Handshake error: {}'.format(hs_response['message'])
- # if handshake fails for any reason other than protocol mismatch
- # (i.e. authentication error), server version is 0.0.0
- if any([
- hs_response['version_major'],
- hs_response['version_minor'],
- hs_response['version_patch'],
- ]):
- error_text += (
- ' Server expects binary protocol version '
- '{version_major}.{version_minor}.{version_patch}. Client '
- 'provides {client_major}.{client_minor}.{client_patch}.'
- ).format(
- client_major=protocol_version[0],
- client_minor=protocol_version[1],
- client_patch=protocol_version[2],
- **hs_response
- )
- raise HandshakeError((
- hs_response['version_major'],
- hs_response['version_minor'],
- hs_response['version_patch'],
- ), error_text)
- self.host, self.port = host, port
- return hs_response
-
- def reconnect(self, seq_no=0):
- """
- Tries to reconnect synchronously, then in background.
- """
-
- # stop trying to reconnect
- if seq_no >= len(RECONNECT_BACKOFF_SEQUENCE):
- self._failed = False
-
- self._reconnect()
-
- if self.failed:
- DaemonicTimer(
- RECONNECT_BACKOFF_SEQUENCE[seq_no],
- self.reconnect,
- kwargs={'seq_no': seq_no + 1},
- ).start()
-
- def _reconnect(self):
- # do not reconnect if connection is already working
- # or was closed on purpose
- if not self.failed:
- return
-
- self.close()
-
- # connect and silence the connection errors
- try:
- self.connect(self.host, self.port)
- except connection_errors:
- pass
-
- def _transfer_params(self, to: 'Connection'):
- """
- Transfer non-SSL parameters to target connection object.
-
- :param to: connection object to transfer parameters to.
- """
- to.username = self.username
- to.password = self.password
- to.client = self.client
- to.host = self.host
- to.port = self.port
-
- def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
- """
- Send data down the socket.
-
- :param data: bytes to send,
- :param flags: (optional) OS-specific flags.
- """
- if self.closed:
- raise SocketError('Attempt to use closed connection.')
-
- kwargs = {}
- if flags is not None:
- kwargs['flags'] = flags
-
- try:
- self.socket.sendall(data, **kwargs)
- except Exception:
- self._fail()
- self.reconnect()
- raise
-
- def recv(self, flags=None) -> bytearray:
- def _recv(buffer, num_bytes):
- bytes_to_receive = num_bytes
- while bytes_to_receive > 0:
- try:
- bytes_rcvd = self.socket.recv_into(buffer, bytes_to_receive, **kwargs)
- if bytes_rcvd == 0:
- raise SocketError('Connection broken.')
- except connection_errors:
- self._fail()
- self.reconnect()
- raise
-
- buffer = buffer[bytes_rcvd:]
- bytes_to_receive -= bytes_rcvd
-
- if self.closed:
- raise SocketError('Attempt to use closed connection.')
-
- kwargs = {}
- if flags is not None:
- kwargs['flags'] = flags
-
- data = bytearray(4)
- _recv(memoryview(data), 4)
- response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
-
- data.extend(bytearray(response_len))
- _recv(memoryview(data)[4:], response_len)
- return data
-
-
- def close(self, release=True):
- """
- Try to mark socket closed, then unlink it. This is recommended but
- not required, since sockets are automatically closed when
- garbage-collected.
- """
- with self._mux:
- if self._socket:
- try:
- self._socket.shutdown(socket.SHUT_RDWR)
- self._socket.close()
- except connection_errors:
- pass
- self._socket = None
-
- if release:
- self._in_use = False
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
new file mode 100644
index 0000000..6ab6c6a
--- /dev/null
+++ b/pyignite/connection/connection.py
@@ -0,0 +1,381 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+import socket
+from typing import Union
+
+from pyignite.constants import *
+from pyignite.exceptions import (
+ HandshakeError, ParameterError, SocketError, connection_errors,
+)
+from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
+from pyignite.datatypes.internal import Struct
+
+from .handshake import HandshakeRequest
+from .ssl import wrap
+from ..stream import BinaryStream, READ_BACKWARD
+
+
+class Connection:
+ """
+ This is a `pyignite` class, that represents a connection to Ignite
+ node. It serves multiple purposes:
+
+ * socket wrapper. Detects fragmentation and network errors. See also
+ https://docs.python.org/3/howto/sockets.html,
+ * binary protocol connector. Incapsulates handshake and failover reconnection.
+ """
+
+ _socket = None
+ _failed = None
+
+ client = None
+ host = None
+ port = None
+ timeout = None
+ username = None
+ password = None
+ ssl_params = {}
+ uuid = None
+
+ @staticmethod
+ def _check_ssl_params(params):
+ expected_args = [
+ 'use_ssl',
+ 'ssl_version',
+ 'ssl_ciphers',
+ 'ssl_cert_reqs',
+ 'ssl_keyfile',
+ 'ssl_keyfile_password',
+ 'ssl_certfile',
+ 'ssl_ca_certfile',
+ ]
+ for param in params:
+ if param not in expected_args:
+ raise ParameterError((
+ 'Unexpected parameter for connection initialization: `{}`'
+ ).format(param))
+
+ def __init__(
+ self, client: 'Client', timeout: float = 2.0,
+ username: str = None, password: str = None, **ssl_params
+ ):
+ """
+ Initialize connection.
+
+ For the use of the SSL-related parameters see
+ https://docs.python.org/3/library/ssl.html#ssl-certificates.
+
+ :param client: Ignite client object,
+ :param timeout: (optional) sets timeout (in seconds) for each socket
+ operation including `connect`. 0 means non-blocking mode, which is
+ virtually guaranteed to fail. Can accept integer or float value.
+ Default is None (blocking mode),
+ :param use_ssl: (optional) set to True if Ignite server uses SSL
+ on its binary connector. Defaults to use SSL when username
+ and password has been supplied, not to use SSL otherwise,
+ :param ssl_version: (optional) SSL version constant from standard
+ `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
+ :param ssl_ciphers: (optional) ciphers to use. If not provided,
+ `ssl` default ciphers are used,
+ :param ssl_cert_reqs: (optional) determines how the remote side
+ certificate is treated:
+
+ * `ssl.CERT_NONE` − remote certificate is ignored (default),
+ * `ssl.CERT_OPTIONAL` − remote certificate will be validated,
+ if provided,
+ * `ssl.CERT_REQUIRED` − valid remote certificate is required,
+
+ :param ssl_keyfile: (optional) a path to SSL key file to identify
+ local (client) party,
+ :param ssl_keyfile_password: (optional) password for SSL key file,
+ can be provided when key file is encrypted to prevent OpenSSL
+ password prompt,
+ :param ssl_certfile: (optional) a path to ssl certificate file
+ to identify local (client) party,
+ :param ssl_ca_certfile: (optional) a path to a trusted certificate
+ or a certificate chain. Required to check the validity of the remote
+ (server-side) certificate,
+ :param username: (optional) user name to authenticate to Ignite
+ cluster,
+ :param password: (optional) password to authenticate to Ignite cluster.
+ """
+ self.client = client
+ self.timeout = timeout
+ self.username = username
+ self.password = password
+ self._check_ssl_params(ssl_params)
+ if self.username and self.password and 'use_ssl' not in ssl_params:
+ ssl_params['use_ssl'] = True
+ self.ssl_params = ssl_params
+ self._failed = False
+
+ @property
+ def closed(self) -> bool:
+ """ Tells if socket is closed. """
+ return self._socket is None
+
+ @property
+ def failed(self) -> bool:
+ """ Tells if connection is failed. """
+ return self._failed
+
+ @failed.setter
+ def failed(self, value):
+ self._failed = value
+
+ @property
+ def alive(self) -> bool:
+ """ Tells if connection is up and no failure detected. """
+ return not self.failed and not self.closed
+
+ def __repr__(self) -> str:
+ return '{}:{}'.format(self.host or '?', self.port or '?')
+
+ _wrap = wrap
+
+ def get_protocol_version(self):
+ """
+ Returns the tuple of major, minor, and revision numbers of the used
+ thin protocol version, or None, if no connection to the Ignite cluster
+ was yet established.
+ """
+ return self.client.protocol_version
+
+ def read_response(self) -> Union[dict, OrderedDict]:
+ """
+ Processes server's response to the handshake request.
+
+ :return: handshake data.
+ """
+ response_start = Struct([
+ ('length', Int),
+ ('op_code', Byte),
+ ])
+ with BinaryStream(self, self.recv()) as stream:
+ start_class = response_start.parse(stream)
+ start = stream.read_ctype(start_class, direction=READ_BACKWARD)
+ data = response_start.to_python(start)
+ response_end = None
+ if data['op_code'] == 0:
+ response_end = Struct([
+ ('version_major', Short),
+ ('version_minor', Short),
+ ('version_patch', Short),
+ ('message', String),
+ ])
+ elif self.get_protocol_version() >= (1, 4, 0):
+ response_end = Struct([
+ ('node_uuid', UUIDObject),
+ ])
+ if response_end:
+ end_class = response_end.parse(stream)
+ end = stream.read_ctype(end_class, direction=READ_BACKWARD)
+ data.update(response_end.to_python(end))
+ return data
+
+ def connect(
+ self, host: str = None, port: int = None
+ ) -> Union[dict, OrderedDict]:
+ """
+ Connect to the given server node with protocol version fallback.
+
+ :param host: Ignite server node's host name or IP,
+ :param port: Ignite server node's port number.
+ """
+ detecting_protocol = False
+
+ # choose highest version first
+ if self.client.protocol_version is None:
+ detecting_protocol = True
+ self.client.protocol_version = max(PROTOCOLS)
+
+ try:
+ result = self._connect_version(host, port)
+ except HandshakeError as e:
+ if e.expected_version in PROTOCOLS:
+ self.client.protocol_version = e.expected_version
+ result = self._connect_version(host, port)
+ else:
+ raise e
+ except connection_errors:
+ # restore undefined protocol version
+ if detecting_protocol:
+ self.client.protocol_version = None
+ raise
+
+ # connection is ready for end user
+ self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
+
+ self.failed = False
+ return result
+
+ def _connect_version(
+ self, host: str = None, port: int = None,
+ ) -> Union[dict, OrderedDict]:
+ """
+ Connect to the given server node using protocol version
+ defined on client.
+
+ :param host: Ignite server node's host name or IP,
+ :param port: Ignite server node's port number.
+ """
+
+ host = host or IGNITE_DEFAULT_HOST
+ port = port or IGNITE_DEFAULT_PORT
+
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(self.timeout)
+ self._socket = self._wrap(self._socket)
+ self._socket.connect((host, port))
+
+ protocol_version = self.client.protocol_version
+
+ hs_request = HandshakeRequest(
+ protocol_version,
+ self.username,
+ self.password
+ )
+
+ with BinaryStream(self) as stream:
+ hs_request.from_python(stream)
+ self.send(stream.getbuffer())
+
+ hs_response = self.read_response()
+ if hs_response['op_code'] == 0:
+ self.close()
+
+ error_text = 'Handshake error: {}'.format(hs_response['message'])
+ # if handshake fails for any reason other than protocol mismatch
+ # (i.e. authentication error), server version is 0.0.0
+ if any([
+ hs_response['version_major'],
+ hs_response['version_minor'],
+ hs_response['version_patch'],
+ ]):
+ error_text += (
+ ' Server expects binary protocol version '
+ '{version_major}.{version_minor}.{version_patch}. Client '
+ 'provides {client_major}.{client_minor}.{client_patch}.'
+ ).format(
+ client_major=protocol_version[0],
+ client_minor=protocol_version[1],
+ client_patch=protocol_version[2],
+ **hs_response
+ )
+ raise HandshakeError((
+ hs_response['version_major'],
+ hs_response['version_minor'],
+ hs_response['version_patch'],
+ ), error_text)
+ self.host, self.port = host, port
+ return hs_response
+
+ def reconnect(self):
+ # do not reconnect if connection is already working
+ # or was closed on purpose
+ if not self.failed:
+ return
+
+ self.close()
+
+ # connect and silence the connection errors
+ try:
+ self.connect(self.host, self.port)
+ except connection_errors:
+ pass
+
+ def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
+ """
+ Send data down the socket.
+
+ :param data: bytes to send,
+ :param flags: (optional) OS-specific flags.
+ """
+ if self.closed:
+ raise SocketError('Attempt to use closed connection.')
+
+ kwargs = {}
+ if flags is not None:
+ kwargs['flags'] = flags
+
+ try:
+ self._socket.sendall(data, **kwargs)
+ except connection_errors:
+ self.failed = True
+ self.reconnect()
+ raise
+
+ def recv(self, flags=None) -> bytearray:
+ def _recv(buffer, num_bytes):
+ bytes_to_receive = num_bytes
+ while bytes_to_receive > 0:
+ try:
+ bytes_rcvd = self._socket.recv_into(buffer, bytes_to_receive, **kwargs)
+ if bytes_rcvd == 0:
+ raise SocketError('Connection broken.')
+ except connection_errors:
+ self.failed = True
+ self.reconnect()
+ raise
+
+ buffer = buffer[bytes_rcvd:]
+ bytes_to_receive -= bytes_rcvd
+
+ if self.closed:
+ raise SocketError('Attempt to use closed connection.')
+
+ kwargs = {}
+ if flags is not None:
+ kwargs['flags'] = flags
+
+ data = bytearray(4)
+ _recv(memoryview(data), 4)
+ response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
+
+ data.extend(bytearray(response_len))
+ _recv(memoryview(data)[4:], response_len)
+ return data
+
+ def close(self):
+ """
+ Try to mark socket closed, then unlink it. This is recommended but
+ not required, since sockets are automatically closed when
+ garbage-collected.
+ """
+ if self._socket:
+ try:
+ self._socket.shutdown(socket.SHUT_RDWR)
+ self._socket.close()
+ except connection_errors:
+ pass
+
+ self._socket = None
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index aed3cda..b8d9c02 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -564,8 +564,8 @@
@classmethod
def from_python_not_null(cls, stream, value):
- stream.register_binary_type(value.__class__)
if getattr(value, '_buffer', None):
stream.write(value._buffer)
else:
+ stream.register_binary_type(value.__class__)
value._from_python(stream)
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 0111a22..a6da9fe 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -18,7 +18,7 @@
import decimal
from datetime import date, datetime, timedelta
from io import SEEK_CUR
-from typing import Any, Tuple, Union, Callable
+from typing import Any, Tuple, Union, Callable, List
import uuid
import attr
@@ -115,8 +115,9 @@
class Conditional:
-
- def __init__(self, predicate1: Callable[[any], bool], predicate2: Callable[[any], bool], var1, var2):
+ def __init__(self, fields: List, predicate1: Callable[[any], bool],
+ predicate2: Callable[[any], bool], var1, var2):
+ self.fields = fields
self.predicate1 = predicate1
self.predicate2 = predicate2
self.var1 = var1
@@ -209,12 +210,19 @@
defaults = attr.ib(type=dict, default={})
def parse(self, stream):
- fields, values = [], {}
+ fields, ctx = [], {}
+
+ for _, c_type in self.fields:
+ if isinstance(c_type, Conditional):
+ for name in c_type.fields:
+ ctx[name] = None
+
for name, c_type in self.fields:
is_cond = isinstance(c_type, Conditional)
- c_type = c_type.parse(stream, values) if is_cond else c_type.parse(stream)
+ c_type = c_type.parse(stream, ctx) if is_cond else c_type.parse(stream)
fields.append((name, c_type))
- values[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
+ if name in ctx:
+ ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
data_class = type(
'Struct',
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 5bd114b..b5be753 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -105,9 +105,11 @@
# this test depends on protocol version
if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED:
# update latest affinity version
- conn.client.affinity_version = (
- response.affinity_version, response.affinity_minor
- )
+ new_affinity = (response.affinity_version, response.affinity_minor)
+ old_affinity = conn.client.affinity_version
+
+ if new_affinity > old_affinity:
+ conn.client.affinity_version = new_affinity
# build result
result = APIResult(response)
diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py
index 1ecdcfb..46ac683 100644
--- a/pyignite/stream/binary_stream.py
+++ b/pyignite/stream/binary_stream.py
@@ -95,7 +95,10 @@
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.stream.close()
+ try:
+ self.stream.close()
+ except BufferError:
+ pass
def get_dataclass(self, header):
# get field names from outer space
diff --git a/pyignite/utils.py b/pyignite/utils.py
index 3d0378f..6c636ae 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -18,7 +18,6 @@
import warnings
from functools import wraps
-from threading import Event, Thread
from typing import Any, Optional, Type, Tuple, Union
from pyignite.datatypes.base import IgniteDataType
@@ -255,30 +254,6 @@
return c_type(value).value
-class DaemonicTimer(Thread):
- """
- Same as normal `threading.Timer`, but do not delay the program exit.
- """
-
- def __init__(self, interval, function, args=None, kwargs=None):
- Thread.__init__(self, daemon=True)
- self.interval = interval
- self.function = function
- self.args = args if args is not None else []
- self.kwargs = kwargs if kwargs is not None else {}
- self.finished = Event()
-
- def cancel(self):
- """Stop the timer if it hasn't finished yet."""
- self.finished.set()
-
- def run(self):
- self.finished.wait(self.interval)
- if not self.finished.is_set():
- self.function(*self.args, **self.kwargs)
- self.finished.set()
-
-
def capitalize(string: str) -> str:
"""
Capitalizing the string, assuming the first character is a letter.
diff --git a/tests/config/log4j.xml.jinja2 b/tests/config/log4j.xml.jinja2
index 628f66c..983ae9e 100644
--- a/tests/config/log4j.xml.jinja2
+++ b/tests/config/log4j.xml.jinja2
@@ -33,7 +33,6 @@
</RollingFile>
</Appenders>
<Loggers>
- <Logger name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener" level="debug"/>
<Root level="info">
<AppenderRef ref="CONSOLE"/>
<AppenderRef ref="FILE"/>
diff --git a/tests/conftest.py b/tests/conftest.py
index 54a7fda..bc8804d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -120,7 +120,7 @@
password
):
node = node[:1]
- yield from client(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile,
+ yield from client0(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile,
ssl_cert_reqs, ssl_ciphers, ssl_version, username, password)
@@ -211,7 +211,7 @@
'--timeout',
action='store',
type=float,
- default=None,
+ default=2.0,
help=(
'Timeout (in seconds) for each socket operation. Can accept '
'integer or float value. Default is None'
diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py
index 866222b..3489dea 100644
--- a/tests/test_affinity_request_routing.py
+++ b/tests/test_affinity_request_routing.py
@@ -18,6 +18,7 @@
from pyignite import *
from pyignite.connection import Connection
+from pyignite.constants import PROTOCOL_BYTE_ORDER
from pyignite.datatypes import *
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import *
@@ -30,7 +31,12 @@
def patched_send(self, *args, **kwargs):
"""Patched send function that push to queue idx of server to which request is routed."""
- requests.append(self.port % 100)
+ buf = args[0]
+ if buf and len(buf) >= 6:
+ op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
+ # Filter only caches operation.
+ if 1000 <= op_code < 1100:
+ requests.append(self.port % 100)
return old_send(self, *args, **kwargs)
diff --git a/tox.ini b/tox.ini
index 4361413..eb7d1a6 100644
--- a/tox.ini
+++ b/tox.ini
@@ -17,12 +17,6 @@
skipsdist = True
envlist = py{36,37,38}-{no-ssl,ssl,ssl-password}
-[travis]
-python =
- 3.6: py36-{no-ssl,ssl,ssl-password}
- 3.7: py37-{no-ssl,ssl,ssl-password}
- 3.8: py38-{no-ssl,ssl,ssl-password}
-
[testenv]
passenv = TEAMCITY_VERSION IGNITE_HOME
envdir = {homedir}/.virtualenvs/pyignite-{envname}