| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| This module contains `Client` class, that lets you communicate with Apache |
| Ignite cluster node by the means of Ignite binary client protocol. |
| |
| To start the communication, you may connect to the node of their choice |
| by instantiating the `Client` object and calling |
| :py:meth:`~pyignite.connection.Connection.connect` method with proper |
| parameters. |
| |
| The whole storage room of Ignite cluster is split up into named structures, |
| called caches. For accessing the particular cache in key-value style |
| (a-la Redis or memcached) you should first create |
| the :class:`~pyignite.cache.Cache` object by calling |
| :py:meth:`~pyignite.client.Client.create_cache` or |
| :py:meth:`~pyignite.client.Client.get_or_create_cache()` methods, than call |
| :class:`~pyignite.cache.Cache` methods. If you wish to create a cache object |
| without communicating with server, there is also a |
| :py:meth:`~pyignite.client.Client.get_cache()` method that does just that. |
| |
| For using Ignite SQL, call :py:meth:`~pyignite.client.Client.sql` method. |
| It returns a generator with result rows. |
| |
| :py:meth:`~pyignite.client.Client.register_binary_type` and |
| :py:meth:`~pyignite.client.Client.query_binary_type` methods operates |
| the local (class-wise) registry for Ignite Complex objects. |
| """ |
| import time |
| from collections import defaultdict, OrderedDict |
| import random |
| import re |
| from itertools import chain |
| from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence |
| |
| from .api import cache_get_node_partitions |
| from .api.binary import get_binary_type, put_binary_type |
| from .api.cache_config import cache_get_names |
| from .cluster import Cluster |
| from .cursors import SqlFieldsCursor |
| from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache |
| from .connection import Connection |
| from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER, AFFINITY_RETRIES, AFFINITY_DELAY |
| from .datatypes import BinaryObject, AnyDataObject, TransactionConcurrency, TransactionIsolation |
| from .datatypes.base import IgniteDataType |
| from .datatypes.internal import tc_map |
| from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors |
| from .queries.cache_info import CacheInfo |
| from .stream import BinaryStream, READ_BACKWARD |
| from .transaction import Transaction |
| from .utils import ( |
| cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, |
| get_field_by_id, unsigned |
| ) |
| from .binary import GenericObjectMeta |
| from .monitoring import _EventListeners |
| |
| |
| __all__ = ['Client'] |
| |
| |
| class BaseClient: |
| # used for Complex object data class names sanitizing |
| _identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE) |
| _ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE) |
| |
| def __init__(self, compact_footer: bool = None, partition_aware: bool = False, |
| event_listeners: Optional[Sequence] = None, **kwargs): |
| self._compact_footer = compact_footer |
| self._partition_aware = partition_aware |
| self._connection_args = kwargs |
| self._registry = defaultdict(dict) |
| self._nodes = [] |
| self._current_node = 0 |
| self._partition_aware = partition_aware |
| self.affinity_version = (0, 0) |
| self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)} |
| self._protocol_context = None |
| self._event_listeners = _EventListeners(event_listeners) |
| |
| @property |
| def protocol_context(self): |
| """ |
| Returns protocol context, or None, if no connection to the Ignite |
| cluster was not yet established. |
| |
| This method is not a part of the public API. Unless you wish to |
| extend the `pyignite` capabilities (with additional testing, logging, |
| examining connections, et c.) you probably should not use it. |
| """ |
| return self._protocol_context |
| |
| @protocol_context.setter |
| def protocol_context(self, value): |
| self._protocol_context = value |
| |
| @property |
| def partition_aware(self): |
| return self._partition_aware and self.partition_awareness_supported_by_protocol |
| |
| @property |
| def partition_awareness_supported_by_protocol(self): |
| return self.protocol_context is not None \ |
| and self.protocol_context.is_partition_awareness_supported() |
| |
| @property |
| def compact_footer(self) -> bool: |
| """ |
| This property remembers Complex object schema encoding approach when |
| decoding any Complex object, to use the same approach on Complex |
| object encoding. |
| |
| :return: True if compact schema was used by server or no Complex |
| object decoding has yet taken place, False if full schema was used. |
| """ |
| # this is an ordinary object property, but its backing storage |
| # is a class attribute |
| |
| # use compact schema by default, but leave initial (falsy) backing |
| # value unchanged |
| return self._compact_footer or self._compact_footer is None |
| |
| @compact_footer.setter |
| def compact_footer(self, value: bool): |
| # normally schema approach should not change |
| if self._compact_footer not in (value, None): |
| raise Warning('Can not change client schema approach.') |
| else: |
| self._compact_footer = value |
| |
| @staticmethod |
| def _process_connect_args(*args): |
| if len(args) == 0: |
| # no parameters − use default Ignite host and port |
| return [(IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT)] |
| if len(args) == 1 and is_iterable(args[0]): |
| # iterable of host-port pairs is given |
| return args[0] |
| if len(args) == 2 and isinstance(args[0], str) and isinstance(args[1], int): |
| # host and port are given |
| return [args] |
| |
| raise ConnectionError('Connection parameters are not valid.') |
| |
| def _process_get_binary_type_result(self, result): |
| if result.status != 0 or not result.value['type_exists']: |
| return result |
| |
| binary_fields = result.value.pop('binary_fields') |
| old_format_schemas = result.value.pop('schema') |
| result.value['schemas'] = [] |
| for s_id, field_ids in old_format_schemas.items(): |
| result.value['schemas'].append(self._convert_schema(field_ids, binary_fields)) |
| return result |
| |
| @staticmethod |
| def _convert_type(tc_type: int): |
| try: |
| return tc_map(tc_type.to_bytes(1, PROTOCOL_BYTE_ORDER)) |
| except (KeyError, OverflowError): |
| # if conversion to char or type lookup failed, |
| # we probably have a binary object type ID |
| return BinaryObject |
| |
| def _convert_schema(self, field_ids: list, binary_fields: list) -> OrderedDict: |
| converted_schema = OrderedDict() |
| for field_id in field_ids: |
| binary_field = next(x for x in binary_fields if x['field_id'] == field_id) |
| converted_schema[binary_field['field_name']] = self._convert_type(binary_field['type_id']) |
| return converted_schema |
| |
| @staticmethod |
| def _create_dataclass(type_name: str, schema: OrderedDict = None) -> Type: |
| """ |
| Creates default (generic) class for Ignite Complex object. |
| |
| :param type_name: Complex object type name, |
| :param schema: Complex object schema, |
| :return: the resulting class. |
| """ |
| schema = schema or {} |
| return GenericObjectMeta(type_name, (), {}, schema=schema) |
| |
| @classmethod |
| def _create_type_name(cls, type_name: str) -> str: |
| """ |
| Creates Python data class name from Ignite binary type name. |
| |
| Handles all the special cases found in |
| `java.org.apache.ignite.binary.BinaryBasicNameMapper.simpleName()`. |
| Tries to adhere to PEP8 along the way. |
| """ |
| |
| # general sanitizing |
| type_name = cls._identifier.sub('', type_name) |
| |
| # - name ending with '$' (Scala) |
| # - name + '$' + some digits (anonymous class) |
| # - '$$Lambda$' in the middle |
| type_name = process_delimiter(type_name, '$') |
| |
| # .NET outer/inner class delimiter |
| type_name = process_delimiter(type_name, '+') |
| |
| # Java fully qualified class name |
| type_name = process_delimiter(type_name, '.') |
| |
| # start chars sanitizing |
| type_name = capitalize(cls._ident_start.sub('', type_name)) |
| |
| return type_name |
| |
| def _sync_binary_registry(self, type_id: int, type_info: dict): |
| """ |
| Sync binary registry |
| :param type_id: Complex object type ID. |
| :param type_info: Complex object type info. |
| """ |
| if type_info['type_exists']: |
| for schema in type_info['schemas']: |
| if not self._registry[type_id].get(schema_id(schema), None): |
| data_class = self._create_dataclass( |
| self._create_type_name(type_info['type_name']), |
| schema, |
| ) |
| self._registry[type_id][schema_id(schema)] = data_class |
| |
| def _get_from_registry(self, type_id, schema): |
| """ |
| Get binary type info from registry. |
| |
| :param type_id: Complex object type ID. |
| :param schema: Complex object schema. |
| """ |
| if schema: |
| try: |
| return self._registry[type_id][schema_id(schema)] |
| except KeyError: |
| return None |
| return self._registry[type_id] |
| |
| def register_cache(self, cache_id: int): |
| if self.partition_aware and cache_id not in self._affinity: |
| self._affinity['partition_mapping'][cache_id] = {} |
| |
| def _get_affinity_key(self, cache_id, key, key_hint=None): |
| if key_hint is None: |
| key_hint = AnyDataObject.map_python_type(key) |
| |
| cache_partition_mapping = self._cache_partition_mapping(cache_id) |
| if cache_partition_mapping and cache_partition_mapping.get('is_applicable'): |
| config = cache_partition_mapping.get('cache_config') |
| if config: |
| affinity_key_id = config.get(key_hint.type_id) |
| |
| if affinity_key_id and isinstance(key, GenericObjectMeta): |
| return get_field_by_id(key, affinity_key_id) |
| |
| return key, key_hint |
| |
| def _update_affinity(self, full_affinity): |
| self._affinity['version'] = full_affinity['version'] |
| |
| full_mapping = full_affinity.get('partition_mapping') |
| if full_mapping: |
| self._affinity['partition_mapping'].update(full_mapping) |
| |
| def _caches_to_update_affinity(self): |
| if self._affinity['version'] < self.affinity_version: |
| return list(self._affinity['partition_mapping'].keys()) |
| else: |
| return list(c_id for c_id, c_mapping in self._affinity['partition_mapping'].items() if not c_mapping) |
| |
| def _cache_partition_mapping(self, cache_id): |
| return self._affinity['partition_mapping'][cache_id] |
| |
| def _get_node_by_hashcode(self, cache_id, hashcode, parts): |
| """ |
| Get node by key hashcode. Calculate partition and return node on that it is primary. |
| (algorithm is taken from `RendezvousAffinityFunction.java`) |
| """ |
| |
| # calculate partition for key or affinity key |
| # (algorithm is taken from `RendezvousAffinityFunction.java`) |
| mask = parts - 1 |
| |
| if parts & mask == 0: |
| part = (hashcode ^ (unsigned(hashcode) >> 16)) & mask |
| else: |
| part = abs(hashcode // parts) |
| |
| assert 0 <= part < parts, 'Partition calculation has failed' |
| |
| node_mapping = self._cache_partition_mapping(cache_id).get('node_mapping') |
| if not node_mapping: |
| return None |
| |
| node_uuid, best_conn = None, None |
| for u, p in node_mapping.items(): |
| if part in p: |
| node_uuid = u |
| break |
| |
| if node_uuid: |
| for n in self._nodes: |
| if n.uuid == node_uuid: |
| best_conn = n |
| break |
| if best_conn and best_conn.alive: |
| return best_conn |
| |
| |
| class _ConnectionContextManager: |
| def __init__(self, client, nodes): |
| self.client = client |
| self.nodes = nodes |
| self.client._connect(self.nodes) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.client.close() |
| |
| |
| class Client(BaseClient): |
| """ |
| Synchronous Client implementation. |
| """ |
| |
| def __init__(self, compact_footer: bool = None, partition_aware: bool = True, |
| event_listeners: Optional[Sequence] = None, **kwargs): |
| """ |
| Initialize client. |
| |
| For the use of the SSL-related parameters see |
| https://docs.python.org/3/library/ssl.html#ssl-certificates. |
| |
| :param compact_footer: (optional) use compact (True, recommended) or |
| full (False) schema approach when serializing Complex objects. |
| Default is to use the same approach the server is using (None). |
| Apache Ignite binary protocol documentation on this topic: |
| https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema |
| :param partition_aware: (optional) try to calculate the exact data |
| placement from the key before to issue the key operation to the |
| server node, `True` by default, |
| :param event_listeners: (optional) event listeners, |
| :param timeout: (optional) sets timeout (in seconds) for each socket |
| operation including `connect`. 0 means non-blocking mode, which is |
| virtually guaranteed to fail. Can accept integer or float value. |
| Default is None (blocking mode), |
| :param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection) |
| with node. Default is 10.0 seconds, |
| :param use_ssl: (optional) set to True if Ignite server uses SSL |
| on its binary connector. Defaults to use SSL when username |
| and password has been supplied, not to use SSL otherwise, |
| :param ssl_version: (optional) SSL version constant from standard |
| `ssl` module. Defaults to TLS v1.2, |
| :param ssl_ciphers: (optional) ciphers to use. If not provided, |
| `ssl` default ciphers are used, |
| :param ssl_cert_reqs: (optional) determines how the remote side |
| certificate is treated: |
| |
| * `ssl.CERT_NONE` − remote certificate is ignored (default), |
| * `ssl.CERT_OPTIONAL` − remote certificate will be validated, |
| if provided, |
| * `ssl.CERT_REQUIRED` − valid remote certificate is required, |
| |
| :param ssl_keyfile: (optional) a path to SSL key file to identify |
| local (client) party, |
| :param ssl_keyfile_password: (optional) password for SSL key file, |
| can be provided when key file is encrypted to prevent OpenSSL |
| password prompt, |
| :param ssl_certfile: (optional) a path to ssl certificate file |
| to identify local (client) party, |
| :param ssl_ca_certfile: (optional) a path to a trusted certificate |
| or a certificate chain. Required to check the validity of the remote |
| (server-side) certificate, |
| :param username: (optional) user name to authenticate to Ignite |
| cluster, |
| :param password: (optional) password to authenticate to Ignite cluster. |
| """ |
| super().__init__(compact_footer, partition_aware, event_listeners, **kwargs) |
| |
| def connect(self, *args): |
| """ |
| Connect to Ignite cluster node(s). |
| |
| :param args: (optional) host(s) and port(s) to connect to. |
| """ |
| nodes = self._process_connect_args(*args) |
| return _ConnectionContextManager(self, nodes) |
| |
| def _connect(self, nodes): |
| # the following code is quite twisted, because the protocol version |
| # is initially unknown |
| |
| # TODO: open first node in foreground, others − in background |
| for i, node in enumerate(nodes): |
| host, port = node |
| conn = Connection(self, host, port, **self._connection_args) |
| |
| try: |
| if self.protocol_context is None or self.partition_aware: |
| # open connection before adding to the pool |
| conn.connect() |
| |
| # now we have the protocol version |
| if not self.partition_aware: |
| # do not try to open more nodes |
| self._current_node = i |
| |
| except connection_errors: |
| if self.partition_aware: |
| # schedule the reconnection |
| conn.reconnect() |
| |
| self._nodes.append(conn) |
| |
| if self.protocol_context is None: |
| raise ReconnectError('Can not connect.') |
| |
| def close(self): |
| for conn in self._nodes: |
| conn.close() |
| self._nodes.clear() |
| |
| @property |
| def random_node(self) -> Connection: |
| """ |
| Returns random usable node. |
| |
| This method is not a part of the public API. Unless you wish to |
| extend the `pyignite` capabilities (with additional testing, logging, |
| examining connections, et c.) you probably should not use it. |
| """ |
| if self.partition_aware: |
| # if partition awareness is used just pick a random connected node |
| return self._get_random_node() |
| else: |
| # if partition awareness is not used then just return the current |
| # node if it's alive or the next usable node if connection with the |
| # current is broken |
| node = self._nodes[self._current_node] |
| if node.alive: |
| return node |
| |
| # close current (supposedly failed) node |
| self._nodes[self._current_node].close() |
| |
| # advance the node index |
| self._current_node += 1 |
| if self._current_node >= len(self._nodes): |
| self._current_node = 0 |
| |
| # prepare the list of node indexes to try to connect to |
| num_nodes = len(self._nodes) |
| for i in chain(range(self._current_node, num_nodes), range(self._current_node)): |
| node = self._nodes[i] |
| try: |
| node.connect() |
| except connection_errors: |
| pass |
| else: |
| return node |
| |
| # no nodes left |
| raise ReconnectError('Can not reconnect: out of nodes.') |
| |
| def _get_random_node(self, reconnect=True): |
| alive_nodes = [n for n in self._nodes if n.alive] |
| if alive_nodes: |
| return random.choice(alive_nodes) |
| elif reconnect: |
| for n in self._nodes: |
| n.reconnect() |
| |
| return self._get_random_node(reconnect=False) |
| else: |
| # cannot choose from an empty sequence |
| raise ReconnectError('Can not reconnect: out of nodes.') from None |
| |
| @status_to_exception(BinaryTypeError) |
| def get_binary_type(self, binary_type: Union[str, int]) -> dict: |
| """ |
| Gets the binary type information from the Ignite server. This is quite |
| a low-level implementation of Ignite thin client protocol's |
| `OP_GET_BINARY_TYPE` operation. You would probably want to use |
| :py:meth:`~pyignite.client.Client.query_binary_type` instead. |
| |
| :param binary_type: binary type name or ID, |
| :return: binary type description − a dict with the following fields: |
| |
| - `type_exists`: True if the type is registered, False otherwise. In |
| the latter case all the following fields are omitted, |
| - `type_id`: Complex object type ID, |
| - `type_name`: Complex object type name, |
| - `affinity_key_field`: string value or None, |
| - `is_enum`: False in case of Complex object registration, |
| - `schemas`: a list, containing the Complex object schemas in format: |
| OrderedDict[field name: field type hint]. A schema can be empty. |
| """ |
| result = get_binary_type(self.random_node, binary_type) |
| return self._process_get_binary_type_result(result) |
| |
| @status_to_exception(BinaryTypeError) |
| def put_binary_type( |
| self, type_name: str, affinity_key_field: str = None, |
| is_enum=False, schema: dict = None |
| ): |
| """ |
| Registers binary type information in cluster. Do not update binary |
| registry. This is a literal implementation of Ignite thin client |
| protocol's `OP_PUT_BINARY_TYPE` operation. You would probably want |
| to use :py:meth:`~pyignite.client.Client.register_binary_type` instead. |
| |
| :param type_name: name of the data type being registered, |
| :param affinity_key_field: (optional) name of the affinity key field, |
| :param is_enum: (optional) register enum if True, binary object |
| otherwise. Defaults to False, |
| :param schema: (optional) when register enum, pass a dict |
| of enumerated parameter names as keys and an integers as values. |
| When register binary type, pass a dict of field names: field types. |
| Binary type with no fields is OK. |
| """ |
| return put_binary_type(self.random_node, type_name, affinity_key_field, is_enum, schema) |
| |
| def register_binary_type(self, data_class: Type, affinity_key_field: str = None): |
| """ |
| Register the given class as a representation of a certain Complex |
| object type. Discards autogenerated or previously registered class. |
| |
| :param data_class: Complex object class, |
| :param affinity_key_field: (optional) affinity parameter. |
| """ |
| if not self.query_binary_type(data_class.type_id, data_class.schema_id): |
| self.put_binary_type(data_class.type_name, affinity_key_field, schema=data_class.schema) |
| self._registry[data_class.type_id][data_class.schema_id] = data_class |
| |
| def query_binary_type(self, binary_type: Union[int, str], schema: Union[int, dict] = None): |
| """ |
| Queries the registry of Complex object classes. |
| |
| :param binary_type: Complex object type name or ID, |
| :param schema: (optional) Complex object schema or schema ID |
| :return: found dataclass or None, if `schema` parameter is provided, |
| a dict of {schema ID: dataclass} format otherwise. |
| """ |
| type_id = entity_id(binary_type) |
| |
| result = self._get_from_registry(type_id, schema) |
| if not result: |
| type_info = self.get_binary_type(type_id) |
| self._sync_binary_registry(type_id, type_info) |
| return self._get_from_registry(type_id, schema) |
| |
| return result |
| |
| def unwrap_binary(self, value: Any) -> Any: |
| """ |
| Detects and recursively unwraps Binary Object or collections of BinaryObject. |
| |
| :param value: anything that could be a Binary Object or collection of BinaryObject, |
| :return: the result of the Binary Object unwrapping with all other data |
| left intact. |
| """ |
| if isinstance(value, tuple) and len(value) == 2: |
| if type(value[0]) is bytes and type(value[1]) is int: |
| blob, offset = value |
| with BinaryStream(self, blob) as stream: |
| data_class = BinaryObject.parse(stream) |
| return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client=self) |
| |
| if isinstance(value[0], int): |
| col_type, collection = value |
| if isinstance(collection, list): |
| return col_type, [self.unwrap_binary(v) for v in collection] |
| |
| if isinstance(collection, dict): |
| return col_type, {self.unwrap_binary(k): self.unwrap_binary(v) for k, v in collection.items()} |
| return value |
| |
| @status_to_exception(CacheError) |
| def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict: |
| """ |
| Queries server for affinity mappings. Retries in case |
| of an intermittent error (most probably “Getting affinity for topology |
| version earlier than affinity is calculated”). |
| |
| :param conn: connection to Ignite server, |
| :param caches: Ids of caches, |
| :return: OP_CACHE_PARTITIONS operation result value. |
| """ |
| for _ in range(AFFINITY_RETRIES or 1): |
| result = cache_get_node_partitions(conn, caches) |
| if result.status == 0: |
| break |
| time.sleep(AFFINITY_DELAY) |
| |
| return result |
| |
| def get_best_node( |
| self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'IgniteDataType' = None |
| ) -> 'Connection': |
| """ |
| Returns the node from the list of the nodes, opened by client, that |
| most probably contains the needed key-value pair. See IEP-23. |
| |
| This method is not a part of the public API. Unless you wish to |
| extend the `pyignite` capabilities (with additional testing, logging, |
| examining connections, et c.) you probably should not use it. |
| |
| :param cache: Ignite cache, cache name or cache id, |
| :param key: (optional) pythonic key, |
| :param key_hint: (optional) Ignite data type, for which the given key |
| should be converted, |
| :return: Ignite connection object. |
| """ |
| conn = self.random_node |
| |
| if self.partition_aware and key is not None: |
| caches = self._caches_to_update_affinity() |
| if caches: |
| # update partition mapping |
| while True: |
| try: |
| full_affinity = self._get_affinity(conn, caches) |
| break |
| except connection_errors: |
| # retry if connection failed |
| conn = self.random_node |
| pass |
| except CacheError: |
| # server did not create mapping in time |
| return conn |
| |
| self._update_affinity(full_affinity) |
| |
| for node in self._nodes: |
| if not node.alive: |
| node.reconnect() |
| |
| c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache) |
| parts = self._cache_partition_mapping(c_id).get('number_of_partitions') |
| |
| if not parts: |
| return conn |
| |
| key, key_hint = self._get_affinity_key(c_id, key, key_hint) |
| hashcode = key_hint.hashcode(key, client=self) |
| |
| best_node = self._get_node_by_hashcode(c_id, hashcode, parts) |
| if best_node: |
| return best_node |
| |
| return conn |
| |
| def create_cache(self, settings: Union[str, dict]) -> 'Cache': |
| """ |
| Creates Ignite cache by name. Raises `CacheError` if such a cache is |
| already exists. |
| |
| :param settings: cache name or dict of cache properties' codes |
| and values. All cache properties are documented here: |
| :ref:`cache_props`. See also the |
| :ref:`cache creation example <sql_cache_create>`, |
| :return: :class:`~pyignite.cache.Cache` object. |
| """ |
| return create_cache(self, settings) |
| |
| def get_or_create_cache(self, settings: Union[str, dict]) -> 'Cache': |
| """ |
| Creates Ignite cache, if not exist. |
| |
| :param settings: cache name or dict of cache properties' codes |
| and values. All cache properties are documented here: |
| :ref:`cache_props`. See also the |
| :ref:`cache creation example <sql_cache_create>`, |
| :return: :class:`~pyignite.cache.Cache` object. |
| """ |
| return get_or_create_cache(self, settings) |
| |
| def get_cache(self, settings: Union[str, dict]) -> 'Cache': |
| """ |
| Creates Cache object with a given cache name without checking it up |
| on server. If such a cache does not exist, some kind of exception |
| (most probably `CacheError`) may be raised later. |
| |
| :param settings: cache name or cache properties (but only `PROP_NAME` |
| property is allowed), |
| :return: :class:`~pyignite.cache.Cache` object. |
| """ |
| return get_cache(self, settings) |
| |
| @status_to_exception(CacheError) |
| def get_cache_names(self) -> list: |
| """ |
| Gets existing cache names. |
| |
| :return: list of cache names. |
| """ |
| return cache_get_names(self.random_node) |
| |
| def sql( |
| self, query_str: str, page_size: int = 1024, |
| query_args: Iterable = None, schema: str = 'PUBLIC', |
| statement_type: int = 0, distributed_joins: bool = False, |
| local: bool = False, replicated_only: bool = False, |
| enforce_join_order: bool = False, collocated: bool = False, |
| lazy: bool = False, include_field_names: bool = False, |
| max_rows: int = -1, timeout: int = 0, |
| cache: Union[int, str, Cache] = None |
| ) -> SqlFieldsCursor: |
| """ |
| Runs an SQL query and returns its result. |
| |
| :param query_str: SQL query string, |
| :param page_size: (optional) cursor page size. Default is 1024, which |
| means that client makes one server call per 1024 rows, |
| :param query_args: (optional) query arguments. List of values or |
| (value, type hint) tuples, |
| :param schema: (optional) schema for the query. Defaults to `PUBLIC`, |
| :param statement_type: (optional) statement type. Can be: |
| |
| * StatementType.ALL − any type (default), |
| * StatementType.SELECT − select, |
| * StatementType.UPDATE − update. |
| |
| :param distributed_joins: (optional) distributed joins. Defaults |
| to False, |
| :param local: (optional) pass True if this query should be executed |
| on local node only. Defaults to False, |
| :param replicated_only: (optional) whether query contains only |
| replicated tables or not. Defaults to False, |
| :param enforce_join_order: (optional) enforce join order. Defaults |
| to False, |
| :param collocated: (optional) whether your data is co-located or not. |
| Defaults to False, |
| :param lazy: (optional) lazy query execution. Defaults to False, |
| :param include_field_names: (optional) include field names in result. |
| Defaults to False, |
| :param max_rows: (optional) query-wide maximum of rows. Defaults to -1 |
| (all rows), |
| :param timeout: (optional) non-negative timeout value in ms. |
| Zero disables timeout (default), |
| :param cache: (optional) Name or ID of the cache to use to infer schema. |
| If set, 'schema' argument is ignored, |
| :return: sql fields cursor with result rows as a lists. If |
| `include_field_names` was set, the first row will hold field names. |
| """ |
| if isinstance(cache, (int, str)): |
| c_info = CacheInfo(cache_id=cache_id(cache), protocol_context=self.protocol_context) |
| elif isinstance(cache, Cache): |
| c_info = cache.cache_info |
| else: |
| c_info = CacheInfo(protocol_context=self.protocol_context) |
| |
| if c_info.cache_id: |
| schema = None |
| |
| return SqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type, |
| distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, |
| include_field_names, max_rows, timeout) |
| |
| def get_cluster(self) -> 'Cluster': |
| """ |
| Get client cluster facade. |
| |
| :return: :py:class:`~pyignite.cluster.Cluster` instance. |
| """ |
| return Cluster(self) |
| |
| def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC, |
| isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ, |
| timeout: int = 0, label: Optional[str] = None) -> 'Transaction': |
| """ |
| Start thin client transaction. |
| |
| :param concurrency: (optional) transaction concurrency, see |
| :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`, |
| :param isolation: (optional) transaction isolation level, see |
| :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`, |
| :param timeout: (optional) transaction timeout in milliseconds, |
| :param label: (optional) transaction label. |
| :return: :py:class:`~pyignite.transaction.Transaction` instance. |
| """ |
| return Transaction(self, concurrency, isolation, timeout, label) |