blob: a91a3cf67421817ae501ca672b81f322e13b961a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from typing import Any, Dict, Iterable, Optional, Tuple, Union
from .constants import *
from .binary import GenericObjectMeta, unwrap_binary
from .datatypes import prop_codes
from .datatypes.internal import AnyDataObject
from .exceptions import (
CacheCreationError, CacheError, ParameterError, SQLError,
connection_errors,
)
from .utils import (
cache_id, get_field_by_id, is_wrapped,
status_to_exception, unsigned
)
from .api.cache_config import (
cache_create, cache_create_with_config,
cache_get_or_create, cache_get_or_create_with_config,
cache_destroy, cache_get_configuration,
)
from .api.key_value import (
cache_get, cache_put, cache_get_all, cache_put_all, cache_replace,
cache_clear, cache_clear_key, cache_clear_keys,
cache_contains_key, cache_contains_keys,
cache_get_and_put, cache_get_and_put_if_absent, cache_put_if_absent,
cache_get_and_remove, cache_get_and_replace,
cache_remove_key, cache_remove_keys, cache_remove_all,
cache_remove_if_equals, cache_replace_if_equals, cache_get_size,
)
from .api.sql import scan, scan_cursor_get_page, sql, sql_cursor_get_page
from .api.affinity import cache_get_node_partitions
PROP_CODES = set([
getattr(prop_codes, x)
for x in dir(prop_codes)
if x.startswith('PROP_')
])
CACHE_CREATE_FUNCS = {
True: {
True: cache_get_or_create_with_config,
False: cache_create_with_config,
},
False: {
True: cache_get_or_create,
False: cache_create,
},
}
class Cache:
"""
Ignite cache abstraction. Users should never use this class directly,
but construct its instances with
:py:meth:`~pyignite.client.Client.create_cache`,
:py:meth:`~pyignite.client.Client.get_or_create_cache` or
:py:meth:`~pyignite.client.Client.get_cache` methods instead. See
:ref:`this example <create_cache>` on how to do it.
"""
affinity = None
_cache_id = None
_name = None
_client = None
_settings = None
@staticmethod
def _validate_settings(
settings: Union[str, dict] = None, get_only: bool = False,
):
if any([
not settings,
type(settings) not in (str, dict),
type(settings) is dict and prop_codes.PROP_NAME not in settings,
]):
raise ParameterError('You should supply at least cache name')
if all([
type(settings) is dict,
not set(settings).issubset(PROP_CODES),
]):
raise ParameterError('One or more settings was not recognized')
if get_only and type(settings) is dict and len(settings) != 1:
raise ParameterError('Only cache name allowed as a parameter')
def __init__(
self, client: 'Client', settings: Union[str, dict] = None,
with_get: bool = False, get_only: bool = False,
):
"""
Initialize cache object.
:param client: Ignite client,
:param settings: cache settings. Can be a string (cache name) or a dict
of cache properties and their values. In this case PROP_NAME is
mandatory,
:param with_get: (optional) do not raise exception, if the cache
is already exists. Defaults to False,
:param get_only: (optional) do not communicate with Ignite server
at all, only create Cache instance. Defaults to False.
"""
self._client = client
self._validate_settings(settings)
if type(settings) == str:
self._name = settings
else:
self._name = settings[prop_codes.PROP_NAME]
if not get_only:
func = CACHE_CREATE_FUNCS[type(settings) is dict][with_get]
result = func(client.random_node, settings)
if result.status != 0:
raise CacheCreationError(result.message)
self._cache_id = cache_id(self._name)
self.affinity = {
'version': (0, 0),
}
def get_protocol_version(self) -> Optional[Tuple]:
"""
Returns the tuple of major, minor, and revision numbers of the used
thin protocol version, or None, if no connection to the Ignite cluster
was not yet established.
This method is not a part of the public API. Unless you wish to
extend the `pyignite` capabilities (with additional testing, logging,
examining connections, et c.) you probably should not use it.
"""
return self.client.protocol_version
@property
def settings(self) -> Optional[dict]:
"""
Lazy Cache settings. See the :ref:`example <sql_cache_read>`
of reading this property.
All cache properties are documented here: :ref:`cache_props`.
:return: dict of cache properties and their values.
"""
if self._settings is None:
config_result = cache_get_configuration(
self.get_best_node(),
self._cache_id
)
if config_result.status == 0:
self._settings = config_result.value
else:
raise CacheError(config_result.message)
return self._settings
@property
def name(self) -> str:
"""
Lazy cache name.
:return: cache name string.
"""
if self._name is None:
self._name = self.settings[prop_codes.PROP_NAME]
return self._name
@property
def client(self) -> 'Client':
"""
Ignite :class:`~pyignite.client.Client` object.
:return: Client object, through which the cache is accessed.
"""
return self._client
@property
def cache_id(self) -> int:
"""
Cache ID.
:return: integer value of the cache ID.
"""
return self._cache_id
def _process_binary(self, value: Any) -> Any:
"""
Detects and recursively unwraps Binary Object.
:param value: anything that could be a Binary Object,
:return: the result of the Binary Object unwrapping with all other data
left intact.
"""
if is_wrapped(value):
return unwrap_binary(self._client, value)
return value
@status_to_exception(CacheError)
def destroy(self):
"""
Destroys cache with a given name.
"""
return cache_destroy(self.get_best_node(), self._cache_id)
@status_to_exception(CacheError)
def _get_affinity(self, conn: 'Connection') -> Dict:
"""
Queries server for affinity mappings. Retries in case
of an intermittent error (most probably “Getting affinity for topology
version earlier than affinity is calculated”).
:param conn: connection to Igneite server,
:return: OP_CACHE_PARTITIONS operation result value.
"""
for _ in range(AFFINITY_RETRIES or 1):
result = cache_get_node_partitions(conn, self._cache_id)
if result.status == 0 and result.value['partition_mapping']:
break
time.sleep(AFFINITY_DELAY)
return result
def get_best_node(
self, key: Any = None, key_hint: 'IgniteDataType' = None,
) -> 'Connection':
"""
Returns the node from the list of the nodes, opened by client, that
most probably contains the needed key-value pair. See IEP-23.
This method is not a part of the public API. Unless you wish to
extend the `pyignite` capabilities (with additional testing, logging,
examining connections, et c.) you probably should not use it.
:param key: (optional) pythonic key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:return: Ignite connection object.
"""
conn = self._client.random_node
if self.client.partition_aware and key is not None:
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
if self.affinity['version'] < self._client.affinity_version:
# update partition mapping
while True:
try:
self.affinity = self._get_affinity(conn)
break
except connection_errors:
# retry if connection failed
conn = self._client.random_node
pass
except CacheError:
# server did not create mapping in time
return conn
# flatten it a bit
try:
self.affinity.update(self.affinity['partition_mapping'][0])
except IndexError:
return conn
del self.affinity['partition_mapping']
# calculate the number of partitions
parts = 0
if 'node_mapping' in self.affinity:
for p in self.affinity['node_mapping'].values():
parts += len(p)
self.affinity['number_of_partitions'] = parts
for conn in self.client._nodes:
if not conn.alive:
conn.reconnect()
else:
# get number of partitions
parts = self.affinity.get('number_of_partitions')
if not parts:
return conn
if self.affinity['is_applicable']:
affinity_key_id = self.affinity['cache_config'].get(
key_hint.type_id,
None
)
if affinity_key_id and isinstance(key, GenericObjectMeta):
key, key_hint = get_field_by_id(key, affinity_key_id)
# calculate partition for key or affinity key
# (algorithm is taken from `RendezvousAffinityFunction.java`)
base_value = key_hint.hashcode(key, self._client)
mask = parts - 1
if parts & mask == 0:
part = (base_value ^ (unsigned(base_value) >> 16)) & mask
else:
part = abs(base_value // parts)
assert 0 <= part < parts, 'Partition calculation has failed'
# search for connection
try:
node_uuid, best_conn = None, None
for u, p in self.affinity['node_mapping'].items():
if part in p:
node_uuid = u
break
if node_uuid:
for n in conn.client._nodes:
if n.uuid == node_uuid:
best_conn = n
break
if best_conn and best_conn.alive:
conn = best_conn
except KeyError:
pass
return conn
@status_to_exception(CacheError)
def get(self, key, key_hint: object = None) -> Any:
"""
Retrieves a value from cache by key.
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:return: value retrieved.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_get(
self.get_best_node(key, key_hint),
self._cache_id,
key,
key_hint=key_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def put(
self, key, value, key_hint: object = None, value_hint: object = None
):
"""
Puts a value with a given key to cache (overwriting existing value
if any).
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
return cache_put(
self.get_best_node(key, key_hint),
self._cache_id, key, value,
key_hint=key_hint, value_hint=value_hint
)
@status_to_exception(CacheError)
def get_all(self, keys: list) -> list:
"""
Retrieves multiple key-value pairs from cache.
:param keys: list of keys or tuples of (key, key_hint),
:return: a dict of key-value pairs.
"""
result = cache_get_all(self.get_best_node(), self._cache_id, keys)
if result.value:
for key, value in result.value.items():
result.value[key] = self._process_binary(value)
return result
@status_to_exception(CacheError)
def put_all(self, pairs: dict):
"""
Puts multiple key-value pairs to cache (overwriting existing
associations if any).
:param pairs: dictionary type parameters, contains key-value pairs
to save. Each key or value can be an item of representable
Python type or a tuple of (item, hint),
"""
return cache_put_all(self.get_best_node(), self._cache_id, pairs)
@status_to_exception(CacheError)
def replace(
self, key, value, key_hint: object = None, value_hint: object = None
):
"""
Puts a value with a given key to cache only if the key already exist.
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_replace(
self.get_best_node(key, key_hint),
self._cache_id, key, value,
key_hint=key_hint, value_hint=value_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def clear(self, keys: Optional[list] = None):
"""
Clears the cache without notifying listeners or cache writers.
:param keys: (optional) list of cache keys or (key, key type
hint) tuples to clear (default: clear all).
"""
conn = self.get_best_node()
if keys:
return cache_clear_keys(conn, self._cache_id, keys)
else:
return cache_clear(conn, self._cache_id)
@status_to_exception(CacheError)
def clear_key(self, key, key_hint: object = None):
"""
Clears the cache key without notifying listeners or cache writers.
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
return cache_clear_key(
self.get_best_node(key, key_hint),
self._cache_id,
key,
key_hint=key_hint
)
@status_to_exception(CacheError)
def contains_key(self, key, key_hint=None) -> bool:
"""
Returns a value indicating whether given key is present in cache.
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:return: boolean `True` when key is present, `False` otherwise.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
return cache_contains_key(
self.get_best_node(key, key_hint),
self._cache_id,
key,
key_hint=key_hint
)
@status_to_exception(CacheError)
def contains_keys(self, keys: Iterable) -> bool:
"""
Returns a value indicating whether all given keys are present in cache.
:param keys: a list of keys or (key, type hint) tuples,
:return: boolean `True` when all keys are present, `False` otherwise.
"""
return cache_contains_keys(self._client, self._cache_id, keys)
@status_to_exception(CacheError)
def get_and_put(self, key, value, key_hint=None, value_hint=None) -> Any:
"""
Puts a value with a given key to cache, and returns the previous value
for that key, or null value if there was not such key.
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
:return: old value or None.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_put(
self.get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def get_and_put_if_absent(
self, key, value, key_hint=None, value_hint=None
):
"""
Puts a value with a given key to cache only if the key does not
already exist.
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted,
:return: old value or None.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_put_if_absent(
self.get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def put_if_absent(self, key, value, key_hint=None, value_hint=None):
"""
Puts a value with a given key to cache only if the key does not
already exist.
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
return cache_put_if_absent(
self.get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
)
@status_to_exception(CacheError)
def get_and_remove(self, key, key_hint=None) -> Any:
"""
Removes the cache entry with specified key, returning the value.
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:return: old value or None.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_remove(
self.get_best_node(key, key_hint),
self._cache_id,
key,
key_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def get_and_replace(
self, key, value, key_hint=None, value_hint=None
) -> Any:
"""
Puts a value with a given key to cache, returning previous value
for that key, if and only if there is a value currently mapped
for that key.
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
:return: old value or None.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_replace(
self.get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def remove_key(self, key, key_hint=None):
"""
Clears the cache key without notifying listeners or cache writers.
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
return cache_remove_key(
self.get_best_node(key, key_hint), self._cache_id, key, key_hint
)
@status_to_exception(CacheError)
def remove_keys(self, keys: list):
"""
Removes cache entries by given list of keys, notifying listeners
and cache writers.
:param keys: list of keys or tuples of (key, key_hint) to remove.
"""
return cache_remove_keys(
self.get_best_node(), self._cache_id, keys
)
@status_to_exception(CacheError)
def remove_all(self):
"""
Removes all cache entries, notifying listeners and cache writers.
"""
return cache_remove_all(self.get_best_node(), self._cache_id)
@status_to_exception(CacheError)
def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
"""
Removes an entry with a given key if provided value is equal to
actual value, notifying listeners and cache writers.
:param key: key for the cache entry,
:param sample: a sample to compare the stored value with,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param sample_hint: (optional) Ignite data type, for whic
the given sample should be converted.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
return cache_remove_if_equals(
self.get_best_node(key, key_hint),
self._cache_id,
key, sample,
key_hint, sample_hint
)
@status_to_exception(CacheError)
def replace_if_equals(
self, key, sample, value,
key_hint=None, sample_hint=None, value_hint=None
) -> Any:
"""
Puts a value with a given key to cache only if the key already exists
and value equals provided sample.
:param key: key for the cache entry,
:param sample: a sample to compare the stored value with,
:param value: new value for the given key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param sample_hint: (optional) Ignite data type, for whic
the given sample should be converted
:param value_hint: (optional) Ignite data type, for which the given
value should be converted,
:return: boolean `True` when key is present, `False` otherwise.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
result = cache_replace_if_equals(
self.get_best_node(key, key_hint),
self._cache_id,
key, sample, value,
key_hint, sample_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
def get_size(self, peek_modes=0):
"""
Gets the number of entries in cache.
:param peek_modes: (optional) limit count to near cache partition
(PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
(PeekModes.BACKUP). Defaults to all cache partitions (PeekModes.ALL),
:return: integer number of cache entries.
"""
return cache_get_size(
self.get_best_node(), self._cache_id, peek_modes
)
def scan(
self, page_size: int = 1, partitions: int = -1, local: bool = False
):
"""
Returns all key-value pairs from the cache, similar to `get_all`, but
with internal pagination, which is slower, but safer.
:param page_size: (optional) page size. Default size is 1 (slowest
and safest),
:param partitions: (optional) number of partitions to query
(negative to query entire cache),
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
:return: generator with key-value pairs.
"""
node = self.get_best_node()
result = scan(
node,
self._cache_id,
page_size,
partitions,
local
)
if result.status != 0:
raise CacheError(result.message)
cursor = result.value['cursor']
for k, v in result.value['data'].items():
k = self._process_binary(k)
v = self._process_binary(v)
yield k, v
while result.value['more']:
result = scan_cursor_get_page(node, cursor)
if result.status != 0:
raise CacheError(result.message)
for k, v in result.value['data'].items():
k = self._process_binary(k)
v = self._process_binary(v)
yield k, v
def select_row(
self, query_str: str, page_size: int = 1,
query_args: Optional[list] = None, distributed_joins: bool = False,
replicated_only: bool = False, local: bool = False, timeout: int = 0
):
"""
Executes a simplified SQL SELECT query over data stored in the cache.
The query returns the whole record (key and value).
:param query_str: SQL query string,
:param page_size: (optional) cursor page size. Default is 1, which
means that client makes one server call per row,
:param query_args: (optional) query arguments,
:param distributed_joins: (optional) distributed joins. Defaults
to False,
:param replicated_only: (optional) whether query contains only
replicated tables or not. Defaults to False,
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
:param timeout: (optional) non-negative timeout value in ms. Zero
disables timeout (default),
:return: generator with key-value pairs.
"""
node = self.get_best_node()
def generate_result(value):
cursor = value['cursor']
more = value['more']
for k, v in value['data'].items():
k = self._process_binary(k)
v = self._process_binary(v)
yield k, v
while more:
inner_result = sql_cursor_get_page(node, cursor)
if result.status != 0:
raise SQLError(result.message)
more = inner_result.value['more']
for k, v in inner_result.value['data'].items():
k = self._process_binary(k)
v = self._process_binary(v)
yield k, v
type_name = self.settings[
prop_codes.PROP_QUERY_ENTITIES
][0]['value_type_name']
if not type_name:
raise SQLError('Value type is unknown')
result = sql(
node,
self._cache_id,
type_name,
query_str,
page_size,
query_args,
distributed_joins,
replicated_only,
local,
timeout
)
if result.status != 0:
raise SQLError(result.message)
return generate_result(result.value)