blob: c141b26b677213434dca24bd466037e77a9afc2c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
import inspect
import logging
import time
from io import SEEK_CUR
import attr
from pyignite.api.result import APIResult
from pyignite.connection import Connection, AioConnection
from pyignite.constants import MAX_LONG, RHF_TOPOLOGY_CHANGED
from pyignite.queries import op_codes
from pyignite.queries.response import Response
from pyignite.stream import AioBinaryStream, BinaryStream, READ_BACKWARD
logger = logging.getLogger('.'.join(__name__.split('.')[:-1]))
def query_perform(query_struct, conn, post_process_fun=None, **kwargs):
async def _async_internal():
result = await query_struct.perform_async(conn, **kwargs)
if post_process_fun:
return post_process_fun(result)
return result
def _internal():
result = query_struct.perform(conn, **kwargs)
if post_process_fun:
return post_process_fun(result)
return result
if isinstance(conn, AioConnection):
return _async_internal()
return _internal()
_QUERY_COUNTER = 0
def _get_query_id():
global _QUERY_COUNTER
if _QUERY_COUNTER >= MAX_LONG:
return 0
_QUERY_COUNTER += 1
return _QUERY_COUNTER
_OP_CODES = {code: name for name, code in inspect.getmembers(op_codes) if name.startswith('OP_')}
def _get_op_code_name(code):
global _OP_CODES
return _OP_CODES.get(code)
def _sec_to_millis(secs):
return int(secs * 1000)
@attr.s
class Query:
op_code = attr.ib(type=int)
following = attr.ib(type=list, factory=list)
query_id = attr.ib(type=int)
response_type = attr.ib(type=type(Response), default=Response)
_query_c_type = None
_start_ts = 0.0
@query_id.default
def _set_query_id(self):
return _get_query_id()
@classmethod
def build_c_type(cls):
if cls._query_c_type is None:
cls._query_c_type = type(
cls.__name__,
(ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': [
('length', ctypes.c_int),
('op_code', ctypes.c_short),
('query_id', ctypes.c_longlong),
],
},
)
return cls._query_c_type
def from_python(self, stream, values: dict = None):
init_pos, header = stream.tell(), self._build_header(stream)
values = values if values else None
for name, c_type in self.following:
c_type.from_python(stream, values[name])
self.__write_header(stream, header, init_pos)
async def from_python_async(self, stream, values: dict = None):
init_pos, header = stream.tell(), self._build_header(stream)
values = values if values else None
for name, c_type in self.following:
await c_type.from_python_async(stream, values[name])
self.__write_header(stream, header, init_pos)
def _build_header(self, stream):
global _QUERY_COUNTER
header_class = self.build_c_type()
header_len = ctypes.sizeof(header_class)
stream.seek(header_len, SEEK_CUR)
header = header_class()
header.op_code = self.op_code
header.query_id = self.query_id
return header
@staticmethod
def __write_header(stream, header, init_pos):
header.length = stream.tell() - init_pos - ctypes.sizeof(ctypes.c_int)
stream.seek(init_pos)
stream.write(header)
def perform(
self, conn: Connection, query_params: dict = None,
response_config: list = None, **kwargs,
) -> APIResult:
"""
Perform query and process result.
:param conn: connection to Ignite server,
:param query_params: (optional) dict of named query parameters.
Defaults to no parameters,
:param response_config: (optional) response configuration − list of
(name, type_hint) tuples. Defaults to empty return value,
:return: instance of :class:`~pyignite.api.result.APIResult` with raw
value (may undergo further processing in API functions).
"""
try:
self._on_query_started(conn)
with BinaryStream(conn.client) as stream:
self.from_python(stream, query_params)
response_data = conn.request(stream.getvalue())
response_struct = self.response_type(protocol_context=conn.protocol_context,
following=response_config, **kwargs)
with BinaryStream(conn.client, response_data) as stream:
response_ctype = response_struct.parse(stream)
response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
result = self.__post_process_response(conn, response_struct, response)
if result.status == 0:
result.value = response_struct.to_python(response)
self._on_query_finished(conn, result=result)
return result
except Exception as e:
self._on_query_finished(conn, err=e)
raise e
async def perform_async(
self, conn: AioConnection, query_params: dict = None,
response_config: list = None, **kwargs,
) -> APIResult:
"""
Perform query and process result.
:param conn: connection to Ignite server,
:param query_params: (optional) dict of named query parameters.
Defaults to no parameters,
:param response_config: (optional) response configuration − list of
(name, type_hint) tuples. Defaults to empty return value,
:return: instance of :class:`~pyignite.api.result.APIResult` with raw
value (may undergo further processing in API functions).
"""
try:
self._on_query_started(conn)
with AioBinaryStream(conn.client) as stream:
await self.from_python_async(stream, query_params)
data = await conn.request(self.query_id, stream.getvalue())
response_struct = self.response_type(protocol_context=conn.protocol_context,
following=response_config, **kwargs)
with AioBinaryStream(conn.client, data) as stream:
response_ctype = await response_struct.parse_async(stream)
response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
result = self.__post_process_response(conn, response_struct, response)
if result.status == 0:
result.value = await response_struct.to_python_async(response)
self._on_query_finished(conn, result=result)
return result
except Exception as e:
self._on_query_finished(conn, err=e)
raise e
@staticmethod
def __post_process_response(conn, response_struct, response):
if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED:
# update latest affinity version
new_affinity = (response.affinity_version, response.affinity_minor)
old_affinity = conn.client.affinity_version
if new_affinity > old_affinity:
conn.client.affinity_version = new_affinity
# build result
return APIResult(response)
@staticmethod
def _enabled_query_listener(conn):
client = conn.client
return client._event_listeners and client._event_listeners.enabled_query_listener
@staticmethod
def _event_listener(conn):
return conn.client._event_listeners
def _on_query_started(self, conn):
self._start_ts = time.monotonic()
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Start query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s)",
self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid)
if self._enabled_query_listener(conn):
self._event_listener(conn).publish_query_start(conn.host, conn.port, conn.uuid, self.query_id,
self.op_code, _get_op_code_name(self.op_code))
def _on_query_finished(self, conn, result=None, err=None):
if logger.isEnabledFor(logging.DEBUG):
dur_ms = _sec_to_millis(time.monotonic() - self._start_ts)
if result and result.status != 0:
err = result.message
if err:
logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
"in %d ms: %s", self.query_id, _get_op_code_name(self.op_code),
conn.host, conn.port, conn.uuid, dur_ms, err)
if self._enabled_query_listener(conn):
self._event_listener(conn).publish_query_fail(conn.host, conn.port, conn.uuid, self.query_id,
self.op_code, _get_op_code_name(self.op_code),
dur_ms, err)
else:
logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
"successfully in %d ms", self.query_id, _get_op_code_name(self.op_code),
conn.host, conn.port, conn.uuid, dur_ms)
if self._enabled_query_listener(conn):
self._event_listener(conn).publish_query_success(conn.host, conn.port, conn.uuid, self.query_id,
self.op_code, _get_op_code_name(self.op_code),
dur_ms)
class ConfigQuery(Query):
"""
This is a special query, used for creating caches with configuration.
"""
_query_c_type = None
@classmethod
def build_c_type(cls):
if cls._query_c_type is None:
cls._query_c_type = type(
cls.__name__,
(ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': [
('length', ctypes.c_int),
('op_code', ctypes.c_short),
('query_id', ctypes.c_longlong),
('config_length', ctypes.c_int),
],
},
)
return cls._query_c_type
def _build_header(self, stream):
header = super()._build_header(stream)
header.config_length = header.length - ctypes.sizeof(type(header))
return header