IGNITE-15103 Implement debug and error logging of connections and queries - Fixes #45.
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index c6ecbc6..c5fa24d 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -33,7 +33,7 @@
from typing import Union
from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
-from pyignite.exceptions import HandshakeError, SocketError, connection_errors
+from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
from .bitmask_feature import BitmaskFeature
from .connection import BaseConnection
@@ -68,7 +68,7 @@
hs_response = self.__parse_handshake(packet, self._conn.client)
self._handshake_fut.set_result(hs_response)
else:
- self._conn.on_message(packet)
+ self._conn.process_message(packet)
self._buffer = self._buffer[packet_sz:len(self._buffer)]
def __has_full_response(self):
@@ -84,7 +84,7 @@
connected = self._handshake_fut.done()
if not connected:
self._handshake_fut.set_exception(exc)
- self._conn.on_connection_lost(exc, connected)
+ self._conn.process_connection_lost(exc, connected)
@staticmethod
def __send_handshake(transport, conn):
@@ -177,38 +177,41 @@
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
try:
+ self._on_handshake_start()
result = await self._connect_version()
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
self.client.protocol_context.version = e.expected_version
result = await self._connect_version()
else:
+ self._on_handshake_fail(e)
raise e
- except connection_errors:
+ except AuthenticationError as e:
+ self._on_handshake_fail(e)
+ raise e
+ except Exception as e:
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
- raise
+ self._on_handshake_fail(e)
+ raise e
- # connection is ready for end user
- features = BitmaskFeature.from_array(result.get('features', None))
- self.client.protocol_context.features = features
- self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
- self.failed = False
+ self._on_handshake_success(result)
- def on_connection_lost(self, error, reconnect=False):
+ def process_connection_lost(self, err, reconnect=False):
self.failed = True
for _, fut in self._pending_reqs.items():
- fut.set_exception(error)
+ fut.set_exception(err)
self._pending_reqs.clear()
if self._transport_closed_fut and not self._transport_closed_fut.done():
self._transport_closed_fut.set_result(None)
if reconnect and not self._closed:
+ self._on_connection_lost(err)
self._loop.create_task(self._reconnect())
- def on_message(self, data):
+ def process_message(self, data):
req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
if req_id in self._pending_reqs:
self._pending_reqs[req_id].set_result(data)
@@ -227,7 +230,7 @@
hs_response = await handshake_fut
if hs_response.op_code == 0:
- await self._close_transport()
+ await self.close()
self._process_handshake_error(hs_response)
return hs_response
@@ -281,4 +284,5 @@
except asyncio.TimeoutError:
pass
finally:
+ self._on_connection_lost(expected=True)
self._transport_closed_fut = None
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
index 2e6d6aa..ae5587a 100644
--- a/pyignite/connection/connection.py
+++ b/pyignite/connection/connection.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
from collections import OrderedDict
import socket
from typing import Union
@@ -28,6 +29,8 @@
CLIENT_STATUS_AUTH_FAILURE = 2000
+logger = logging.getLogger('.'.join(__name__.split('.')[:-1]))
+
class BaseConnection:
def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None,
@@ -78,21 +81,53 @@
return self.client.protocol_context
def _process_handshake_error(self, response):
- error_text = f'Handshake error: {response.message}'
# if handshake fails for any reason other than protocol mismatch
# (i.e. authentication error), server version is 0.0.0
+ if response.client_status == CLIENT_STATUS_AUTH_FAILURE:
+ raise AuthenticationError(response.message)
+
protocol_version = self.client.protocol_context.version
server_version = (response.version_major, response.version_minor, response.version_patch)
-
+ error_text = f'Handshake error: {response.message}'
if any(server_version):
error_text += f' Server expects binary protocol version ' \
f'{server_version[0]}.{server_version[1]}.{server_version[2]}. ' \
f'Client provides ' \
f'{protocol_version[0]}.{protocol_version[1]}.{protocol_version[2]}.'
- elif response.client_status == CLIENT_STATUS_AUTH_FAILURE:
- raise AuthenticationError(error_text)
raise HandshakeError(server_version, error_text)
+ def _on_handshake_start(self):
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
+ self.host, self.port, self.client.protocol_context)
+
+ def _on_handshake_success(self, result):
+ features = BitmaskFeature.from_array(result.get('features', None))
+ self.client.protocol_context.features = features
+ self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
+ self.failed = False
+
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
+ self.host, self.port, self.uuid, self.client.protocol_context)
+
+ def _on_handshake_fail(self, err):
+ if isinstance(err, AuthenticationError):
+ logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
+ self.host, self.port, err)
+ else:
+ logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
+ "with protocol context %s failed: %s",
+ self.host, self.port, self.client.protocol_context, err, exc_info=True)
+
+ def _on_connection_lost(self, err=None, expected=False):
+ if expected and logger.isEnabledFor(logging.DEBUG):
+ logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
+ self.host, self.port, self.uuid)
+ else:
+ logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
+ self.host, self.port, self.uuid, err)
+
class Connection(BaseConnection):
"""
@@ -168,24 +203,26 @@
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
try:
+ self._on_handshake_start()
result = self._connect_version()
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
self.client.protocol_context.version = e.expected_version
result = self._connect_version()
else:
+ self._on_handshake_fail(e)
raise e
- except connection_errors:
+ except AuthenticationError as e:
+ self._on_handshake_fail(e)
+ raise e
+ except Exception as e:
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
- raise
+ self._on_handshake_fail(e)
+ raise e
- # connection is ready for end user
- features = BitmaskFeature.from_array(result.get('features', None))
- self.client.protocol_context.features = features
- self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
- self.failed = False
+ self._on_handshake_success(result)
def _connect_version(self) -> Union[dict, OrderedDict]:
"""
@@ -258,11 +295,12 @@
try:
self._socket.sendall(data, **kwargs)
- except connection_errors:
+ except connection_errors as e:
self.failed = True
if reconnect:
+ self._on_connection_lost(e)
self.reconnect()
- raise
+ raise e
def recv(self, flags=None, reconnect=True) -> bytearray:
"""
@@ -287,11 +325,12 @@
if bytes_received == 0:
raise SocketError('Connection broken.')
bytes_total_received += bytes_received
- except connection_errors:
+ except connection_errors as e:
self.failed = True
if reconnect:
+ self._on_connection_lost(e)
self.reconnect()
- raise
+ raise e
if bytes_total_received < 4:
continue
@@ -325,5 +364,5 @@
self._socket.close()
except connection_errors:
pass
-
+ self._on_connection_lost(expected=True)
self._socket = None
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
index 0f43aa4..58f509e 100644
--- a/pyignite/connection/protocol_context.py
+++ b/pyignite/connection/protocol_context.py
@@ -37,6 +37,9 @@
self.version == other.version and \
self.features == other.features
+ def __str__(self):
+ return f'ProtocolContext(version={self._version}, features={self._features})'
+
def _ensure_consistency(self):
if not self.is_feature_flags_supported():
self._features = None
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 4bcab9f..89c354e 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -14,6 +14,9 @@
# limitations under the License.
import ctypes
+import inspect
+import logging
+import time
from io import SEEK_CUR
import attr
@@ -21,9 +24,12 @@
from pyignite.api.result import APIResult
from pyignite.connection import Connection, AioConnection
from pyignite.constants import MAX_LONG, RHF_TOPOLOGY_CHANGED
+from pyignite.queries import op_codes
from pyignite.queries.response import Response
from pyignite.stream import AioBinaryStream, BinaryStream, READ_BACKWARD
+logger = logging.getLogger('.'.join(__name__.split('.')[:-1]))
+
def query_perform(query_struct, conn, post_process_fun=None, **kwargs):
async def _async_internal():
@@ -54,6 +60,18 @@
return _QUERY_COUNTER
+_OP_CODES = {code: name for name, code in inspect.getmembers(op_codes) if name.startswith('OP_')}
+
+
+def _get_op_code_name(code):
+ global _OP_CODES
+ return _OP_CODES.get(code)
+
+
+def _sec_to_millis(secs):
+ return int(secs * 1000)
+
+
@attr.s
class Query:
op_code = attr.ib(type=int)
@@ -61,6 +79,7 @@
query_id = attr.ib(type=int)
response_type = attr.ib(type=type(Response), default=Response)
_query_c_type = None
+ _start_ts = 0.0
@query_id.default
def _set_query_id(self):
@@ -134,22 +153,28 @@
:return: instance of :class:`~pyignite.api.result.APIResult` with raw
value (may undergo further processing in API functions).
"""
- with BinaryStream(conn.client) as stream:
- self.from_python(stream, query_params)
- response_data = conn.request(stream.getvalue())
+ try:
+ self._on_query_started(conn)
- response_struct = self.response_type(protocol_context=conn.protocol_context,
- following=response_config, **kwargs)
+ with BinaryStream(conn.client) as stream:
+ self.from_python(stream, query_params)
+ response_data = conn.request(stream.getvalue())
- with BinaryStream(conn.client, response_data) as stream:
- response_ctype = response_struct.parse(stream)
- response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
+ response_struct = self.response_type(protocol_context=conn.protocol_context,
+ following=response_config, **kwargs)
- result = self.__post_process_response(conn, response_struct, response)
+ with BinaryStream(conn.client, response_data) as stream:
+ response_ctype = response_struct.parse(stream)
+ response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
- if result.status == 0:
- result.value = response_struct.to_python(response)
- return result
+ result = self.__post_process_response(conn, response_struct, response)
+ if result.status == 0:
+ result.value = response_struct.to_python(response)
+ self._on_query_finished(conn, result=result)
+ return result
+ except Exception as e:
+ self._on_query_finished(conn, err=e)
+ raise e
async def perform_async(
self, conn: AioConnection, query_params: dict = None,
@@ -166,22 +191,28 @@
:return: instance of :class:`~pyignite.api.result.APIResult` with raw
value (may undergo further processing in API functions).
"""
- with AioBinaryStream(conn.client) as stream:
- await self.from_python_async(stream, query_params)
- data = await conn.request(self.query_id, stream.getvalue())
+ try:
+ self._on_query_started(conn)
- response_struct = self.response_type(protocol_context=conn.protocol_context,
- following=response_config, **kwargs)
+ with AioBinaryStream(conn.client) as stream:
+ await self.from_python_async(stream, query_params)
+ data = await conn.request(self.query_id, stream.getvalue())
- with AioBinaryStream(conn.client, data) as stream:
- response_ctype = await response_struct.parse_async(stream)
- response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
+ response_struct = self.response_type(protocol_context=conn.protocol_context,
+ following=response_config, **kwargs)
- result = self.__post_process_response(conn, response_struct, response)
+ with AioBinaryStream(conn.client, data) as stream:
+ response_ctype = await response_struct.parse_async(stream)
+ response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
- if result.status == 0:
- result.value = await response_struct.to_python_async(response)
- return result
+ result = self.__post_process_response(conn, response_struct, response)
+ if result.status == 0:
+ result.value = await response_struct.to_python_async(response)
+ self._on_query_finished(conn, result=result)
+ return result
+ except Exception as e:
+ self._on_query_finished(conn, err=e)
+ raise e
@staticmethod
def __post_process_response(conn, response_struct, response):
@@ -196,6 +227,26 @@
# build result
return APIResult(response)
+ def _on_query_started(self, conn):
+ if logger.isEnabledFor(logging.DEBUG):
+ self._start_ts = time.monotonic()
+ logger.debug("Start query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s)",
+ self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid)
+
+ def _on_query_finished(self, conn, result=None, err=None):
+ if logger.isEnabledFor(logging.DEBUG):
+ dur_ms = _sec_to_millis(time.monotonic() - self._start_ts)
+ if result and result.status != 0:
+ err = result.message
+ if err:
+ logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
+ "in %.3f ms: %s", self.query_id, _get_op_code_name(self.op_code),
+ conn.host, conn.port, conn.uuid, dur_ms, err)
+ else:
+ logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
+ "successfully in %.3f ms", self.query_id, _get_op_code_name(self.op_code),
+ conn.host, conn.port, conn.uuid, dur_ms)
+
class ConfigQuery(Query):
"""
diff --git a/tests/conftest.py b/tests/conftest.py
index 1c65356..70995a2 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -13,9 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
+import logging
+import sys
import pytest
+logger = logging.getLogger('pyignite')
+logger.setLevel(logging.DEBUG)
+handler = logging.StreamHandler(stream=sys.stdout)
+handler.setFormatter(
+ logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
+)
+logger.addHandler(handler)
+
@pytest.fixture(autouse=True)
def run_examples(request):
diff --git a/tests/security/test_auth.py b/tests/security/test_auth.py
index b02f224..3586c91 100644
--- a/tests/security/test_auth.py
+++ b/tests/security/test_auth.py
@@ -12,6 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import re
+
import pytest
from pyignite import Client, AioClient
@@ -39,19 +42,32 @@
clear_ignite_work_dir()
-def test_auth_success(with_ssl, ssl_params):
+def test_auth_success(with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params)
- with client.connect("127.0.0.1", 10801):
- assert all(node.alive for node in client._nodes)
+ with caplog.at_level(logger='pyignite', level=logging.DEBUG):
+ with client.connect("127.0.0.1", 10801):
+ assert all(node.alive for node in client._nodes)
+
+ __assert_successful_connect_log(caplog)
@pytest.mark.asyncio
-async def test_auth_success_async(with_ssl, ssl_params):
+async def test_auth_success_async(with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params)
- async with client.connect("127.0.0.1", 10801):
- assert all(node.alive for node in client._nodes)
+ with caplog.at_level(logger='pyignite', level=logging.DEBUG):
+ async with client.connect("127.0.0.1", 10801):
+ assert all(node.alive for node in client._nodes)
+
+ __assert_successful_connect_log(caplog)
+
+
+def __assert_successful_connect_log(caplog):
+ assert any(re.match(r'Connecting to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records)
+ assert any(re.match(r'Connected to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records)
+ assert any(re.match(r'Connection closed to node\(address=127.0.0.1,\s+port=10801', r.message)
+ for r in caplog.records)
auth_failed_params = [
@@ -65,7 +81,7 @@
'username, password',
auth_failed_params
)
-def test_auth_failed(username, password, with_ssl, ssl_params):
+def test_auth_failed(username, password, with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
with pytest.raises(AuthenticationError):
@@ -73,16 +89,25 @@
with client.connect("127.0.0.1", 10801):
pass
+ __assert_auth_failed_log(caplog)
+
@pytest.mark.parametrize(
'username, password',
auth_failed_params
)
@pytest.mark.asyncio
-async def test_auth_failed_async(username, password, with_ssl, ssl_params):
+async def test_auth_failed_async(username, password, with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
with pytest.raises(AuthenticationError):
client = AioClient(username=username, password=password, **ssl_params)
async with client.connect("127.0.0.1", 10801):
pass
+
+ __assert_auth_failed_log(caplog)
+
+
+def __assert_auth_failed_log(caplog):
+ pattern = r'Authentication failed while connecting to node\(address=127.0.0.1,\s+port=10801'
+ assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records)
diff --git a/tests/security/test_ssl.py b/tests/security/test_ssl.py
index 7736864..2cbed4b 100644
--- a/tests/security/test_ssl.py
+++ b/tests/security/test_ssl.py
@@ -12,6 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import re
+
import pytest
from pyignite import Client, AioClient
@@ -72,17 +75,26 @@
@pytest.mark.parametrize('invalid_ssl_params', invalid_params)
-def test_connection_error_with_incorrect_config(invalid_ssl_params):
+def test_connection_error_with_incorrect_config(invalid_ssl_params, caplog):
with pytest.raises(ReconnectError):
client = Client(**invalid_ssl_params)
with client.connect([("127.0.0.1", 10801)]):
pass
+ __assert_handshake_failed_log(caplog)
+
@pytest.mark.parametrize('invalid_ssl_params', invalid_params)
@pytest.mark.asyncio
-async def test_connection_error_with_incorrect_config_async(invalid_ssl_params):
+async def test_connection_error_with_incorrect_config_async(invalid_ssl_params, caplog):
with pytest.raises(ReconnectError):
client = AioClient(**invalid_ssl_params)
async with client.connect([("127.0.0.1", 10801)]):
pass
+
+ __assert_handshake_failed_log(caplog)
+
+
+def __assert_handshake_failed_log(caplog):
+ pattern = r'Failed to perform handshake, connection to node\(address=127.0.0.1,\s+port=10801.*failed:'
+ assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records)