| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ Tools to monitor client's events. |
| |
| For example, a simple query logger might be implemented like this:: |
| |
| import logging |
| |
| from pyignite import monitoring |
| |
| class QueryLogger(monitoring.QueryEventListener): |
| |
| def on_query_start(self, event): |
| logging.info(f"Query {event.op_name} with query id " |
| f"{event.query_id} started on server " |
| f"{event.host}:{event.port}") |
| |
| def on_query_fail(self, event): |
| logging.info(f"Query {event.op_name} with query id " |
| f"{event.query_id} on server " |
| f"{event.host}:{event.port} " |
| f"failed in {event.duration}ms " |
| f"with error {event.error_msg}") |
| |
| def on_query_success(self, event): |
| logging.info(f"Query {event.op_name} with query id " |
| f"{event.query_id} on server " \ |
| f"{event.host}:{event.port} " \ |
| f"succeeded in {event.duration}ms") |
| |
| :class:`~ConnectionEventListener` is also available. |
| |
| Event listeners can be registered by passing parameter to :class:`~pyignite.client.Client` or |
| :class:`~pyignite.aio_client.AioClient` constructor:: |
| |
| client = Client(event_listeners=[QueryLogger()]) |
| with client.connect('127.0.0.1', 10800): |
| .... |
| |
| .. note:: Events are delivered **synchronously**. Application threads block |
| waiting for event handlers. Care must be taken to ensure that your event handlers are efficient |
| enough to not adversely affect overall application performance. |
| |
| .. note:: Debug logging is also available, standard ``logging`` is used. Just set ``DEBUG`` level to |
| *pyignite* logger. |
| """ |
| from typing import Optional, Sequence |
| |
| |
| class _BaseEvent: |
| def __init__(self, **kwargs): |
| if kwargs: |
| for k, v in kwargs.items(): |
| object.__setattr__(self, k, v) |
| |
| def __setattr__(self, name, value): |
| raise TypeError(f'{self.__class__.__name__} is immutable') |
| |
| def __repr__(self): |
| pass |
| |
| |
| class _ConnectionEvent(_BaseEvent): |
| __slots__ = ('host', 'port') |
| host: str |
| port: int |
| |
| def __init__(self, host, port, **kwargs): |
| super().__init__(host=host, port=port, **kwargs) |
| |
| |
| class _HandshakeEvent(_ConnectionEvent): |
| __slots__ = ('protocol_context',) |
| protocol_context: Optional['ProtocolContext'] |
| |
| def __init__(self, host, port, protocol_context=None, **kwargs): |
| super().__init__(host, port, protocol_context=protocol_context.copy() if protocol_context else None, **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"protocol_context={self.protocol_context})" |
| |
| |
| class HandshakeStartEvent(_HandshakeEvent): |
| """ |
| Published when a handshake started. |
| |
| :ivar host: Address of the node to connect, |
| :ivar port: Port number of the node to connect, |
| :ivar protocol_context: Client's protocol context. |
| """ |
| def __init__(self, host, port, protocol_context=None, **kwargs): |
| """ |
| This class is not supposed to be constructed by user. |
| """ |
| super().__init__(host, port, protocol_context, **kwargs) |
| |
| |
| class HandshakeFailedEvent(_HandshakeEvent): |
| """ |
| Published when a handshake failed. |
| |
| :ivar host: Address of the node to connect, |
| :ivar port: Port number of the node to connect, |
| :ivar protocol_context: Client's protocol context, |
| :ivar error_msg: Error message. |
| """ |
| __slots__ = ('error_msg',) |
| error_msg: str |
| |
| def __init__(self, host, port, protocol_context=None, err=None, **kwargs): |
| """ |
| This class is not supposed to be constructed by user. |
| """ |
| super().__init__(host, port, protocol_context, error_msg=repr(err) if err else '', **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"protocol_context={self.protocol_context}, error_msg={self.error_msg})" |
| |
| |
| class AuthenticationFailedEvent(HandshakeFailedEvent): |
| """ |
| Published when an authentication is failed. |
| |
| :ivar host: Address of the node to connect, |
| :ivar port: Port number of the node to connect, |
| :ivar protocol_context: Client protocol context, |
| :ivar error_msg: Error message. |
| """ |
| pass |
| |
| |
| class HandshakeSuccessEvent(_HandshakeEvent): |
| """ |
| Published when a handshake succeeded. |
| |
| :ivar host: Address of the node to connect, |
| :ivar port: Port number of the node to connect, |
| :ivar protocol_context: Client's protocol context, |
| :ivar node_uuid: Node's uuid, string. |
| """ |
| __slots__ = ('node_uuid',) |
| node_uuid: str |
| |
| def __init__(self, host, port, protocol_context, node_uuid, **kwargs): |
| """ |
| This class is not supposed to be constructed by user. |
| """ |
| super().__init__(host, port, protocol_context, node_uuid=str(node_uuid) if node_uuid else '', **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"node_uuid={self.node_uuid}, protocol_context={self.protocol_context})" |
| |
| |
| class ConnectionClosedEvent(_ConnectionEvent): |
| """ |
| Published when a connection to the node is expectedly closed. |
| |
| :ivar host: Address of node to connect, |
| :ivar port: Port number of node to connect, |
| :ivar node_uuid: Node uuid, string. |
| """ |
| __slots__ = ('node_uuid',) |
| node_uuid: str |
| |
| def __init__(self, host, port, node_uuid, **kwargs): |
| """ |
| This class is not supposed to be constructed by user. |
| """ |
| super().__init__(host, port, node_uuid=str(node_uuid) if node_uuid else '', **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, node_uuid={self.node_uuid})" |
| |
| |
| class ConnectionLostEvent(ConnectionClosedEvent): |
| """ |
| Published when a connection to the node is lost. |
| |
| :ivar host: Address of the node to connect, |
| :ivar port: Port number of the node to connect, |
| :ivar node_uuid: Node's uuid, string, |
| :ivar error_msg: Error message. |
| """ |
| __slots__ = ('error_msg',) |
| node_uuid: str |
| error_msg: str |
| |
| def __init__(self, host, port, node_uuid, err, **kwargs): |
| """ |
| This class is not supposed to be constructed by user. |
| """ |
| super().__init__(host, port, node_uuid, error_msg=repr(err) if err else '', **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"node_uuid={self.node_uuid}, error_msg={self.error_msg})" |
| |
| |
| class _EventListener: |
| pass |
| |
| |
| class ConnectionEventListener(_EventListener): |
| """ |
| Base class for connection event listeners. |
| """ |
| def on_handshake_start(self, event: HandshakeStartEvent): |
| """ |
| Handle handshake start event. |
| |
| :param event: Instance of :class:`HandshakeStartEvent`. |
| """ |
| pass |
| |
| def on_handshake_success(self, event: HandshakeSuccessEvent): |
| """ |
| Handle handshake success event. |
| |
| :param event: Instance of :class:`HandshakeSuccessEvent`. |
| """ |
| pass |
| |
| def on_handshake_fail(self, event: HandshakeFailedEvent): |
| """ |
| Handle handshake failed event. |
| |
| :param event: Instance of :class:`HandshakeFailedEvent`. |
| """ |
| pass |
| |
| def on_authentication_fail(self, event: AuthenticationFailedEvent): |
| """ |
| Handle authentication failed event. |
| |
| :param event: Instance of :class:`AuthenticationFailedEvent`. |
| """ |
| pass |
| |
| def on_connection_closed(self, event: ConnectionClosedEvent): |
| """ |
| Handle connection closed event. |
| |
| :param event: Instance of :class:`ConnectionClosedEvent`. |
| """ |
| pass |
| |
| def on_connection_lost(self, event: ConnectionLostEvent): |
| """ |
| Handle connection lost event. |
| |
| :param event: Instance of :class:`ConnectionLostEvent`. |
| """ |
| pass |
| |
| |
| class _QueryEvent(_BaseEvent): |
| __slots__ = ('host', 'port', 'node_uuid', 'query_id', 'op_code', 'op_name') |
| host: str |
| port: int |
| node_uuid: str |
| query_id: int |
| op_code: int |
| op_name: str |
| |
| def __init__(self, host, port, node_uuid, query_id, op_code, op_name, **kwargs): |
| """ |
| This class is not supposed to be constructed by user. |
| """ |
| super().__init__(host=host, port=port, node_uuid=str(node_uuid) if node_uuid else '', |
| query_id=query_id, op_code=op_code, op_name=op_name, **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \ |
| f"op_code={self.op_code}, op_name={self.op_name})" |
| |
| |
| class QueryStartEvent(_QueryEvent): |
| """ |
| Published when a client's query started. |
| |
| :ivar host: Address of the node on which the query is executed, |
| :ivar port: Port number of the node on which the query is executed, |
| :ivar node_uuid: Node's uuid, string, |
| :ivar query_id: Query's id, |
| :ivar op_code: Operation's id, |
| :ivar op_name: Operation's name. |
| """ |
| pass |
| |
| |
| class QuerySuccessEvent(_QueryEvent): |
| """ |
| Published when a client's query finished successfully. |
| |
| :ivar host: Address of the node on which the query is executed, |
| :ivar port: Port number of the node on which the query is executed, |
| :ivar node_uuid: Node's uuid, string, |
| :ivar query_id: Query's id, |
| :ivar op_code: Operation's id, |
| :ivar op_name: Operation's name, |
| :ivar duration: Query's duration in milliseconds. |
| """ |
| __slots__ = ('duration', ) |
| duration: int |
| |
| def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, **kwargs): |
| super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration, **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \ |
| f"op_code={self.op_code}, op_name={self.op_name}, duration={self.duration})" |
| |
| |
| class QueryFailEvent(_QueryEvent): |
| """ |
| Published when a client's query failed. |
| |
| :ivar host: Address of the node on which the query is executed, |
| :ivar port: Port number of the node on which the query is executed, |
| :ivar node_uuid: Node's uuid, string, |
| :ivar query_id: Query's id, |
| :ivar op_code: Operation's id, |
| :ivar op_name: Operation's name, |
| :ivar duration: Query's duration in milliseconds, |
| :ivar error_msg: Error message. |
| """ |
| __slots__ = ('duration', 'err_msg') |
| duration: int |
| err_msg: str |
| |
| def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, err, **kwargs): |
| super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration, |
| err_msg=repr(err) if err else '', **kwargs) |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ |
| f"node_uuid={self.node_uuid}, query_id={self.query_id}, op_code={self.op_code}, " \ |
| f"op_name={self.op_name}, duration={self.duration}, err_msg={self.err_msg})" |
| |
| |
| class QueryEventListener(_EventListener): |
| """ |
| Base class for query event listeners. |
| """ |
| def on_query_start(self, event: QueryStartEvent): |
| """ |
| Handle query start event. |
| |
| :param event: Instance of :class:`QueryStartEvent`. |
| """ |
| pass |
| |
| def on_query_success(self, event: QuerySuccessEvent): |
| """ |
| Handle query success event. |
| |
| :param event: Instance of :class:`QuerySuccessEvent`. |
| """ |
| pass |
| |
| def on_query_fail(self, event: QueryFailEvent): |
| """ |
| Handle query fail event. |
| |
| :param event: Instance of :class:`QueryFailEvent`. |
| """ |
| pass |
| |
| |
| class _EventListeners: |
| def __init__(self, listeners: Optional[Sequence]): |
| self.__connection_listeners = [] |
| self.__query_listeners = [] |
| if listeners: |
| for listener in listeners: |
| if isinstance(listener, ConnectionEventListener): |
| self.__connection_listeners.append(listener) |
| elif isinstance(listener, QueryEventListener): |
| self.__query_listeners.append(listener) |
| |
| @property |
| def enabled_connection_listener(self): |
| return bool(self.__connection_listeners) |
| |
| @property |
| def enabled_query_listener(self): |
| return bool(self.__query_listeners) |
| |
| def publish_handshake_start(self, host, port, protocol_context): |
| evt = HandshakeStartEvent(host, port, protocol_context) |
| self.__publish_connection_events(lambda listener: listener.on_handshake_start(evt)) |
| |
| def publish_handshake_success(self, host, port, protocol_context, node_uuid): |
| evt = HandshakeSuccessEvent(host, port, protocol_context, node_uuid) |
| self.__publish_connection_events(lambda listener: listener.on_handshake_success(evt)) |
| |
| def publish_handshake_fail(self, host, port, protocol_context, err): |
| evt = HandshakeFailedEvent(host, port, protocol_context, err) |
| self.__publish_connection_events(lambda listener: listener.on_handshake_fail(evt)) |
| |
| def publish_authentication_fail(self, host, port, protocol_context, err): |
| evt = AuthenticationFailedEvent(host, port, protocol_context, err) |
| self.__publish_connection_events(lambda listener: listener.on_authentication_fail(evt)) |
| |
| def publish_connection_closed(self, host, port, node_uuid): |
| evt = ConnectionClosedEvent(host, port, node_uuid) |
| self.__publish_connection_events(lambda listener: listener.on_connection_closed(evt)) |
| |
| def publish_connection_lost(self, host, port, node_uuid, err): |
| evt = ConnectionLostEvent(host, port, node_uuid, err) |
| self.__publish_connection_events(lambda listener: listener.on_connection_lost(evt)) |
| |
| def publish_query_start(self, host, port, node_uuid, query_id, op_code, op_name): |
| evt = QueryStartEvent(host, port, node_uuid, query_id, op_code, op_name) |
| self.__publish_query_events(lambda listener: listener.on_query_start(evt)) |
| |
| def publish_query_success(self, host, port, node_uuid, query_id, op_code, op_name, duration): |
| evt = QuerySuccessEvent(host, port, node_uuid, query_id, op_code, op_name, duration) |
| self.__publish_query_events(lambda listener: listener.on_query_success(evt)) |
| |
| def publish_query_fail(self, host, port, node_uuid, query_id, op_code, op_name, duration, err): |
| evt = QueryFailEvent(host, port, node_uuid, query_id, op_code, op_name, duration, err) |
| self.__publish_query_events(lambda listener: listener.on_query_fail(evt)) |
| |
| def __publish_connection_events(self, callback): |
| try: |
| for listener in self.__connection_listeners: |
| callback(listener) |
| except: # noqa: 13 |
| pass |
| |
| def __publish_query_events(self, callback): |
| try: |
| for listener in self.__query_listeners: |
| callback(listener) |
| except: # noqa: 13 |
| pass |