blob: 83cb196096d5b79c244d00555f2309fabc51e9a1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains `Client` class, that lets you communicate with Apache
Ignite cluster node by the means of Ignite binary client protocol.
To start the communication, you may connect to the node of their choice
by instantiating the `Client` object and calling
:py:meth:`~pyignite.connection.Connection.connect` method with proper
parameters.
The whole storage room of Ignite cluster is split up into named structures,
called caches. For accessing the particular cache in key-value style
(a-la Redis or memcached) you should first create
the :class:`~pyignite.cache.Cache` object by calling
:py:meth:`~pyignite.client.Client.create_cache` or
:py:meth:`~pyignite.client.Client.get_or_create_cache()` methods, than call
:class:`~pyignite.cache.Cache` methods. If you wish to create a cache object
without communicating with server, there is also a
:py:meth:`~pyignite.client.Client.get_cache()` method that does just that.
For using Ignite SQL, call :py:meth:`~pyignite.client.Client.sql` method.
It returns a generator with result rows.
:py:meth:`~pyignite.client.Client.register_binary_type` and
:py:meth:`~pyignite.client.Client.query_binary_type` methods operates
the local (class-wise) registry for Ignite Complex objects.
"""
from collections import defaultdict, OrderedDict
import random
import re
from itertools import chain
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
from .api.binary import get_binary_type, put_binary_type
from .api.cache_config import cache_get_names
from .api.sql import sql_fields, sql_fields_cursor_get_page
from .cache import Cache
from .connection import Connection
from .constants import *
from .datatypes import BinaryObject
from .datatypes.internal import tc_map
from .exceptions import (
BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
)
from .utils import (
capitalize, entity_id, schema_id, process_delimiter,
status_to_exception, is_iterable,
)
from .binary import GenericObjectMeta
__all__ = ['Client']
class Client:
"""
This is a main `pyignite` class, that is build upon the
:class:`~pyignite.connection.Connection`. In addition to the attributes,
properties and methods of its parent class, `Client` implements
the following features:
* cache factory. Cache objects are used for key-value operations,
* Ignite SQL endpoint,
* binary types registration endpoint.
"""
_registry = defaultdict(dict)
_compact_footer: bool = None
_connection_args: Dict = None
_current_node: int = None
_nodes: List[Connection] = None
# used for Complex object data class names sanitizing
_identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
_ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)
affinity_version: Optional[Tuple] = None
protocol_version: Optional[Tuple] = None
def __init__(
self, compact_footer: bool = None, partition_aware: bool = False,
**kwargs
):
"""
Initialize client.
:param compact_footer: (optional) use compact (True, recommended) or
full (False) schema approach when serializing Complex objects.
Default is to use the same approach the server is using (None).
Apache Ignite binary protocol documentation on this topic:
https://apacheignite.readme.io/docs/binary-client-protocol-data-format#section-schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node:
https://cwiki.apache.org/confluence/display/IGNITE/IEP-23%3A+Best+Effort+Affinity+for+thin+clients
The feature is in experimental status, so the parameter is `False`
by default. This will be changed later.
"""
self._compact_footer = compact_footer
self._connection_args = kwargs
self._nodes = []
self._current_node = 0
self._partition_aware = partition_aware
self.affinity_version = (0, 0)
def get_protocol_version(self) -> Optional[Tuple]:
"""
Returns the tuple of major, minor, and revision numbers of the used
thin protocol version, or None, if no connection to the Ignite cluster
was not yet established.
This method is not a part of the public API. Unless you wish to
extend the `pyignite` capabilities (with additional testing, logging,
examining connections, et c.) you probably should not use it.
"""
return self.protocol_version
@property
def partition_aware(self):
return self._partition_aware and self.partition_awareness_supported_by_protocol
@property
def partition_awareness_supported_by_protocol(self):
# TODO: Need to re-factor this. I believe, we need separate class or
# set of functions to work with protocol versions without manually
# comparing versions with just some random tuples
return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
def connect(self, *args):
"""
Connect to Ignite cluster node(s).
:param args: (optional) host(s) and port(s) to connect to.
"""
if len(args) == 0:
# no parameters − use default Ignite host and port
nodes = [(IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT)]
elif len(args) == 1 and is_iterable(args[0]):
# iterable of host-port pairs is given
nodes = args[0]
elif (
len(args) == 2
and isinstance(args[0], str)
and isinstance(args[1], int)
):
# host and port are given
nodes = [args]
else:
raise ConnectionError('Connection parameters are not valid.')
# the following code is quite twisted, because the protocol version
# is initially unknown
# TODO: open first node in foreground, others − in background
for i, node in enumerate(nodes):
host, port = node
conn = Connection(self, **self._connection_args)
conn.host = host
conn.port = port
try:
if self.protocol_version is None or self.partition_aware:
# open connection before adding to the pool
conn.connect(host, port)
# now we have the protocol version
if not self.partition_aware:
# do not try to open more nodes
self._current_node = i
else:
# take a chance to schedule the reconnection
# for all the failed connections, that was probed
# before this
for failed_node in self._nodes[:i]:
failed_node.reconnect()
except connection_errors:
conn._fail()
if self.partition_aware:
# schedule the reconnection
conn.reconnect()
self._nodes.append(conn)
if self.protocol_version is None:
raise ReconnectError('Can not connect.')
def close(self):
for conn in self._nodes:
conn.close()
self._nodes.clear()
@property
def random_node(self) -> Connection:
"""
Returns random usable node.
This method is not a part of the public API. Unless you wish to
extend the `pyignite` capabilities (with additional testing, logging,
examining connections, et c.) you probably should not use it.
"""
if self.partition_aware:
# if partition awareness is used just pick a random connected node
try:
return random.choice(
list(n for n in self._nodes if n.alive)
)
except IndexError:
# cannot choose from an empty sequence
raise ReconnectError('Can not reconnect: out of nodes.') from None
else:
# if partition awareness is not used then just return the current
# node if it's alive or the next usable node if connection with the
# current is broken
node = self._nodes[self._current_node]
if node.alive:
return node
# close current (supposedly failed) node
self._nodes[self._current_node].close()
# advance the node index
self._current_node += 1
if self._current_node >= len(self._nodes):
self._current_node = 0
# prepare the list of node indexes to try to connect to
num_nodes = len(self._nodes)
for i in chain(range(self._current_node, num_nodes), range(self._current_node)):
node = self._nodes[i]
try:
node.connect(node.host, node.port)
except connection_errors:
pass
else:
return node
# no nodes left
raise ReconnectError('Can not reconnect: out of nodes.')
@status_to_exception(BinaryTypeError)
def get_binary_type(self, binary_type: Union[str, int]) -> dict:
"""
Gets the binary type information from the Ignite server. This is quite
a low-level implementation of Ignite thin client protocol's
`OP_GET_BINARY_TYPE` operation. You would probably want to use
:py:meth:`~pyignite.client.Client.query_binary_type` instead.
:param binary_type: binary type name or ID,
:return: binary type description − a dict with the following fields:
- `type_exists`: True if the type is registered, False otherwise. In
the latter case all the following fields are omitted,
- `type_id`: Complex object type ID,
- `type_name`: Complex object type name,
- `affinity_key_field`: string value or None,
- `is_enum`: False in case of Complex object registration,
- `schemas`: a list, containing the Complex object schemas in format:
OrderedDict[field name: field type hint]. A schema can be empty.
"""
def convert_type(tc_type: int):
try:
return tc_map(tc_type.to_bytes(1, PROTOCOL_BYTE_ORDER))
except (KeyError, OverflowError):
# if conversion to char or type lookup failed,
# we probably have a binary object type ID
return BinaryObject
def convert_schema(
field_ids: list, binary_fields: list
) -> OrderedDict:
converted_schema = OrderedDict()
for field_id in field_ids:
binary_field = [
x
for x in binary_fields
if x['field_id'] == field_id
][0]
converted_schema[binary_field['field_name']] = convert_type(
binary_field['type_id']
)
return converted_schema
conn = self.random_node
result = get_binary_type(conn, binary_type)
if result.status != 0 or not result.value['type_exists']:
return result
binary_fields = result.value.pop('binary_fields')
old_format_schemas = result.value.pop('schema')
result.value['schemas'] = []
for s_id, field_ids in old_format_schemas.items():
result.value['schemas'].append(
convert_schema(field_ids, binary_fields)
)
return result
@property
def compact_footer(self) -> bool:
"""
This property remembers Complex object schema encoding approach when
decoding any Complex object, to use the same approach on Complex
object encoding.
:return: True if compact schema was used by server or no Complex
object decoding has yet taken place, False if full schema was used.
"""
# this is an ordinary object property, but its backing storage
# is a class attribute
# use compact schema by default, but leave initial (falsy) backing
# value unchanged
return (
self.__class__._compact_footer
or self.__class__._compact_footer is None
)
@compact_footer.setter
def compact_footer(self, value: bool):
# normally schema approach should not change
if self.__class__._compact_footer not in (value, None):
raise Warning('Can not change client schema approach.')
else:
self.__class__._compact_footer = value
@status_to_exception(BinaryTypeError)
def put_binary_type(
self, type_name: str, affinity_key_field: str = None,
is_enum=False, schema: dict = None
):
"""
Registers binary type information in cluster. Do not update binary
registry. This is a literal implementation of Ignite thin client
protocol's `OP_PUT_BINARY_TYPE` operation. You would probably want
to use :py:meth:`~pyignite.client.Client.register_binary_type` instead.
:param type_name: name of the data type being registered,
:param affinity_key_field: (optional) name of the affinity key field,
:param is_enum: (optional) register enum if True, binary object
otherwise. Defaults to False,
:param schema: (optional) when register enum, pass a dict
of enumerated parameter names as keys and an integers as values.
When register binary type, pass a dict of field names: field types.
Binary type with no fields is OK.
"""
return put_binary_type(
self.random_node, type_name, affinity_key_field, is_enum, schema
)
@staticmethod
def _create_dataclass(type_name: str, schema: OrderedDict = None) -> Type:
"""
Creates default (generic) class for Ignite Complex object.
:param type_name: Complex object type name,
:param schema: Complex object schema,
:return: the resulting class.
"""
schema = schema or {}
return GenericObjectMeta(type_name, (), {}, schema=schema)
def _sync_binary_registry(self, type_id: int):
"""
Reads Complex object description from Ignite server. Creates default
Complex object classes and puts in registry, if not already there.
:param type_id: Complex object type ID.
"""
type_info = self.get_binary_type(type_id)
if type_info['type_exists']:
for schema in type_info['schemas']:
if not self._registry[type_id].get(schema_id(schema), None):
data_class = self._create_dataclass(
self._create_type_name(type_info['type_name']),
schema,
)
self._registry[type_id][schema_id(schema)] = data_class
@classmethod
def _create_type_name(cls, type_name: str) -> str:
"""
Creates Python data class name from Ignite binary type name.
Handles all the special cases found in
`java.org.apache.ignite.binary.BinaryBasicNameMapper.simpleName()`.
Tries to adhere to PEP8 along the way.
"""
# general sanitizing
type_name = cls._identifier.sub('', type_name)
# - name ending with '$' (Scala)
# - name + '$' + some digits (anonymous class)
# - '$$Lambda$' in the middle
type_name = process_delimiter(type_name, '$')
# .NET outer/inner class delimiter
type_name = process_delimiter(type_name, '+')
# Java fully qualified class name
type_name = process_delimiter(type_name, '.')
# start chars sanitizing
type_name = capitalize(cls._ident_start.sub('', type_name))
return type_name
def register_binary_type(
self, data_class: Type, affinity_key_field: str = None,
):
"""
Register the given class as a representation of a certain Complex
object type. Discards autogenerated or previously registered class.
:param data_class: Complex object class,
:param affinity_key_field: (optional) affinity parameter.
"""
if not self.query_binary_type(
data_class.type_id, data_class.schema_id
):
self.put_binary_type(
data_class.type_name,
affinity_key_field,
schema=data_class.schema,
)
self._registry[data_class.type_id][data_class.schema_id] = data_class
def query_binary_type(
self, binary_type: Union[int, str], schema: Union[int, dict] = None,
sync: bool = True
):
"""
Queries the registry of Complex object classes.
:param binary_type: Complex object type name or ID,
:param schema: (optional) Complex object schema or schema ID,
:param sync: (optional) look up the Ignite server for registered
Complex objects and create data classes for them if needed,
:return: found dataclass or None, if `schema` parameter is provided,
a dict of {schema ID: dataclass} format otherwise.
"""
type_id = entity_id(binary_type)
s_id = schema_id(schema)
if schema:
try:
result = self._registry[type_id][s_id]
except KeyError:
result = None
else:
result = self._registry[type_id]
if sync and not result:
self._sync_binary_registry(type_id)
return self.query_binary_type(type_id, s_id, sync=False)
return result
def create_cache(self, settings: Union[str, dict]) -> 'Cache':
"""
Creates Ignite cache by name. Raises `CacheError` if such a cache is
already exists.
:param settings: cache name or dict of cache properties' codes
and values. All cache properties are documented here:
:ref:`cache_props`. See also the
:ref:`cache creation example <sql_cache_create>`,
:return: :class:`~pyignite.cache.Cache` object.
"""
return Cache(self, settings)
def get_or_create_cache(self, settings: Union[str, dict]) -> 'Cache':
"""
Creates Ignite cache, if not exist.
:param settings: cache name or dict of cache properties' codes
and values. All cache properties are documented here:
:ref:`cache_props`. See also the
:ref:`cache creation example <sql_cache_create>`,
:return: :class:`~pyignite.cache.Cache` object.
"""
return Cache(self, settings, with_get=True)
def get_cache(self, settings: Union[str, dict]) -> 'Cache':
"""
Creates Cache object with a given cache name without checking it up
on server. If such a cache does not exist, some kind of exception
(most probably `CacheError`) may be raised later.
:param settings: cache name or cache properties (but only `PROP_NAME`
property is allowed),
:return: :class:`~pyignite.cache.Cache` object.
"""
return Cache(self, settings, get_only=True)
@status_to_exception(CacheError)
def get_cache_names(self) -> list:
"""
Gets existing cache names.
:return: list of cache names.
"""
return cache_get_names(self.random_node)
def sql(
self, query_str: str, page_size: int = 1024, query_args: Iterable = None,
schema: Union[int, str] = 'PUBLIC',
statement_type: int = 0, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False,
max_rows: int = -1, timeout: int = 0,
):
"""
Runs an SQL query and returns its result.
:param query_str: SQL query string,
:param page_size: (optional) cursor page size. Default is 1024, which
means that client makes one server call per 1024 rows,
:param query_args: (optional) query arguments. List of values or
(value, type hint) tuples,
:param schema: (optional) schema for the query. Defaults to `PUBLIC`,
:param statement_type: (optional) statement type. Can be:
* StatementType.ALL − any type (default),
* StatementType.SELECT − select,
* StatementType.UPDATE − update.
:param distributed_joins: (optional) distributed joins. Defaults
to False,
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
:param replicated_only: (optional) whether query contains only
replicated tables or not. Defaults to False,
:param enforce_join_order: (optional) enforce join order. Defaults
to False,
:param collocated: (optional) whether your data is co-located or not.
Defaults to False,
:param lazy: (optional) lazy query execution. Defaults to False,
:param include_field_names: (optional) include field names in result.
Defaults to False,
:param max_rows: (optional) query-wide maximum of rows. Defaults to -1
(all rows),
:param timeout: (optional) non-negative timeout value in ms.
Zero disables timeout (default),
:return: generator with result rows as a lists. If
`include_field_names` was set, the first row will hold field names.
"""
def generate_result(value):
cursor = value['cursor']
more = value['more']
if include_field_names:
yield value['fields']
field_count = len(value['fields'])
else:
field_count = value['field_count']
for line in value['data']:
yield line
while more:
inner_result = sql_fields_cursor_get_page(
conn, cursor, field_count
)
if inner_result.status != 0:
raise SQLError(result.message)
more = inner_result.value['more']
for line in inner_result.value['data']:
yield line
conn = self.random_node
schema = self.get_cache(schema)
result = sql_fields(
conn, schema.cache_id, query_str,
page_size, query_args, schema.name,
statement_type, distributed_joins, local, replicated_only,
enforce_join_order, collocated, lazy, include_field_names,
max_rows, timeout,
)
if result.status != 0:
raise SQLError(result.message)
return generate_result(result.value)