IGNITE-12467 Implement transactions, rewrite async connections using protocol and transport - Fixes #40.
diff --git a/docs/source/pyignite.datatypes.transactions.rst b/docs/source/pyignite.datatypes.transactions.rst
new file mode 100644
index 0000000..9b38468
--- /dev/null
+++ b/docs/source/pyignite.datatypes.transactions.rst
@@ -0,0 +1,21 @@
+.. Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+pyignite.datatypes.transactions module
+=======================================
+
+.. automodule:: pyignite.datatypes.transactions
+ :members:
+ :show-inheritance:
\ No newline at end of file
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.rst
index c2a36fe..2e52500 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.rst
@@ -41,6 +41,7 @@
pyignite.aio_client
pyignite.cluster
pyignite.aio_cluster
+ pyignite.transaction
pyignite.cursors
pyignite.exceptions
diff --git a/docs/source/pyignite.transaction.rst b/docs/source/pyignite.transaction.rst
new file mode 100644
index 0000000..7c6b016
--- /dev/null
+++ b/docs/source/pyignite.transaction.rst
@@ -0,0 +1,22 @@
+.. Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+pyignite.transaction module
+=========================
+
+.. automodule:: pyignite.transaction
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index b6b534b..7a92a9a 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -15,6 +15,7 @@
import asyncio
from typing import Any, Iterable, Optional, Union
+from .api.tx_api import get_tx_connection
from .datatypes import ExpiryPolicy
from .datatypes.internal import AnyDataObject
from .exceptions import CacheCreationError, CacheError, ParameterError
@@ -91,6 +92,9 @@
super().__init__(client, name, expiry_policy)
async def _get_best_node(self, key=None, key_hint=None):
+ tx_conn = get_tx_connection()
+ if tx_conn:
+ return tx_conn
return await self.client.get_best_node(self, key, key_hint)
async def settings(self) -> Optional[dict]:
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 8c2ca56..26d243d 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -14,8 +14,9 @@
# limitations under the License.
import asyncio
import random
+import sys
from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict
+from typing import Iterable, Type, Union, Any, Dict, Optional
from .aio_cluster import AioCluster
from .api import cache_get_node_partitions_async
@@ -27,10 +28,11 @@
from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache
from .connection import AioConnection
from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
-from .datatypes import BinaryObject
-from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
-from .queries.query import CacheInfo
+from .datatypes import BinaryObject, TransactionConcurrency, TransactionIsolation
+from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors, NotSupportedError
+from .queries.cache_info import CacheInfo
from .stream import AioBinaryStream, READ_BACKWARD
+from .transaction import AioTransaction
from .utils import cache_id, entity_id, status_to_exception
@@ -471,9 +473,9 @@
elif isinstance(cache, AioCache):
c_info = cache.cache_info
else:
- c_info = None
+ c_info = CacheInfo(protocol_context=self.protocol_context)
- if c_info:
+ if c_info.cache_id:
schema = None
return AioSqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
@@ -487,3 +489,21 @@
:return: :py:class:`~pyignite.aio_cluster.AioCluster` instance.
"""
return AioCluster(self)
+
+ def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
+ isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
+ timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
+ """
+ Start async thin client transaction.
+
+ :param concurrency: (optional) transaction concurrency, see
+ :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
+ :param isolation: (optional) transaction isolation level, see
+ :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
+ :param timeout: (optional) transaction timeout in seconds if float, in millis if int
+ :param label: (optional) transaction label.
+ :return: :py:class:`~pyignite.transaction.AioTransaction` instance.
+ """
+ if sys.version_info < (3, 7):
+ raise NotSupportedError(f"Transactions are not supported in async client on current python {sys.version}")
+ return AioTransaction(self, concurrency, isolation, timeout, label)
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
index ddf1e7a..30e93ff 100644
--- a/pyignite/api/affinity.py
+++ b/pyignite/api/affinity.py
@@ -68,27 +68,23 @@
])
-def cache_get_node_partitions(conn: 'Connection', caches: Union[int, Iterable[int]], query_id: int = None) -> APIResult:
+def cache_get_node_partitions(conn: 'Connection', caches: Union[int, Iterable[int]]) -> APIResult:
"""
Gets partition mapping for an Ignite cache or a number of caches. See
“IEP-23: Best Effort Affinity for thin clients”.
:param conn: connection to Ignite server,
- :param caches: cache ID(s) the mapping is provided for,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
+ :param caches: cache ID(s) the mapping is provided for
:return: API result data object.
"""
- return __cache_get_node_partitions(conn, caches, query_id)
+ return __cache_get_node_partitions(conn, caches)
-async def cache_get_node_partitions_async(conn: 'AioConnection', caches: Union[int, Iterable[int]],
- query_id: int = None) -> APIResult:
+async def cache_get_node_partitions_async(conn: 'AioConnection', caches: Union[int, Iterable[int]]) -> APIResult:
"""
Async version of cache_get_node_partitions.
"""
- return await __cache_get_node_partitions(conn, caches, query_id)
+ return await __cache_get_node_partitions(conn, caches)
def __post_process_partitions(result):
@@ -135,13 +131,12 @@
return result
-def __cache_get_node_partitions(conn, caches, query_id):
+def __cache_get_node_partitions(conn, caches):
query_struct = Query(
OP_CACHE_PARTITIONS,
[
('cache_ids', cache_ids),
- ],
- query_id=query_id
+ ]
)
if not is_iterable(caches):
caches = [caches]
diff --git a/pyignite/api/binary.py b/pyignite/api/binary.py
index 345e8e8..b49ab8b 100644
--- a/pyignite/api/binary.py
+++ b/pyignite/api/binary.py
@@ -26,34 +26,30 @@
from ..queries.response import BinaryTypeResponse
-def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
+def get_binary_type(conn: 'Connection', binary_type: Union[str, int]) -> APIResult:
"""
Gets the binary type information by type ID.
:param conn: connection to Ignite server,
:param binary_type: binary type name or ID,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object.
"""
- return __get_binary_type(conn, binary_type, query_id)
+ return __get_binary_type(conn, binary_type)
-async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int], query_id=None) -> APIResult:
+async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int]) -> APIResult:
"""
Async version of get_binary_type.
"""
- return await __get_binary_type(conn, binary_type, query_id)
+ return await __get_binary_type(conn, binary_type)
-def __get_binary_type(conn, binary_type, query_id):
+def __get_binary_type(conn, binary_type):
query_struct = Query(
OP_GET_BINARY_TYPE,
[
('type_id', Int),
],
- query_id=query_id,
response_type=BinaryTypeResponse
)
@@ -63,7 +59,7 @@
def put_binary_type(connection: 'Connection', type_name: str, affinity_key_field: str = None,
- is_enum=False, schema: dict = None, query_id=None) -> APIResult:
+ is_enum=False, schema: dict = None) -> APIResult:
"""
Registers binary type information in cluster.
@@ -76,12 +72,9 @@
parameter names as keys and an integers as values. When register binary
type, pass a dict of field names: field types. Binary type with no fields
is OK,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object.
"""
- return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id)
+ return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema)
async def put_binary_type_async(connection: 'AioConnection', type_name: str, affinity_key_field: str = None,
@@ -89,7 +82,7 @@
"""
Async version of put_binary_type.
"""
- return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id)
+ return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema)
def __post_process_put_binary(type_id):
@@ -103,7 +96,7 @@
return internal
-def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id):
+def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema):
# prepare data
if schema is None:
schema = {}
@@ -158,8 +151,7 @@
('is_enum', Bool),
('enums', enum_struct),
('schema', schema_struct),
- ],
- query_id=query_id,
+ ]
)
else:
query_struct = Query(
@@ -171,8 +163,7 @@
('binary_fields', binary_fields_struct),
('is_enum', Bool),
('schema', schema_struct),
- ],
- query_id=query_id,
+ ]
)
return query_perform(query_struct, connection, query_params=data,
post_process_fun=__post_process_put_binary(type_id))
diff --git a/pyignite/api/cache_config.py b/pyignite/api/cache_config.py
index 7f2869b..d4a5f81 100644
--- a/pyignite/api/cache_config.py
+++ b/pyignite/api/cache_config.py
@@ -39,7 +39,7 @@
from .result import APIResult
from ..datatypes.prop_codes import PROP_EXPIRY_POLICY
from ..exceptions import NotSupportedByClusterError
-from ..queries.query import CacheInfo
+from ..queries.cache_info import CacheInfo
def compact_cache_config(cache_config: dict) -> dict:
@@ -60,27 +60,23 @@
return result
-def cache_get_configuration(connection: 'Connection', cache_info: CacheInfo, query_id=None) -> 'APIResult':
+def cache_get_configuration(connection: 'Connection', cache_info: CacheInfo) -> 'APIResult':
"""
Gets configuration for the given cache.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Result value is OrderedDict with
the cache configuration parameters.
"""
- return __cache_get_configuration(connection, cache_info, query_id)
+ return __cache_get_configuration(connection, cache_info)
-async def cache_get_configuration_async(
- connection: 'AioConnection', cache_info: CacheInfo, query_id=None) -> 'APIResult':
+async def cache_get_configuration_async(connection: 'AioConnection', cache_info: CacheInfo) -> 'APIResult':
"""
Async version of cache_get_configuration.
"""
- return await __cache_get_configuration(connection, cache_info, query_id)
+ return await __cache_get_configuration(connection, cache_info)
def __post_process_cache_config(result):
@@ -89,13 +85,12 @@
return result
-def __cache_get_configuration(connection, cache_info, query_id):
+def __cache_get_configuration(connection, cache_info):
query_struct = Query(
OP_CACHE_GET_CONFIGURATION,
[
('cache_info', CacheInfo)
- ],
- query_id=query_id,
+ ]
)
return query_perform(query_struct, connection,
query_params={
@@ -108,106 +103,94 @@
)
-def cache_create(connection: 'Connection', name: str, query_id=None) -> 'APIResult':
+def cache_create(connection: 'Connection', name: str) -> 'APIResult':
"""
Creates a cache with a given name. Returns error if a cache with specified
name already exists.
:param connection: connection to Ignite server,
:param name: cache name,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if a cache is
created successfully, non-zero status and an error description otherwise.
"""
- return __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name, query_id)
+ return __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name)
-async def cache_create_async(connection: 'AioConnection', name: str, query_id=None) -> 'APIResult':
+async def cache_create_async(connection: 'AioConnection', name: str) -> 'APIResult':
"""
Async version of cache_create.
"""
- return await __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name, query_id)
+ return await __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name)
-def cache_get_or_create(connection: 'Connection', name: str, query_id=None) -> 'APIResult':
+def cache_get_or_create(connection: 'Connection', name: str) -> 'APIResult':
"""
Creates a cache with a given name. Does nothing if the cache exists.
:param connection: connection to Ignite server,
:param name: cache name,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if a cache is
created successfully, non-zero status and an error description otherwise.
"""
- return __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name, query_id)
+ return __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name)
-async def cache_get_or_create_async(connection: 'AioConnection', name: str, query_id=None) -> 'APIResult':
+async def cache_get_or_create_async(connection: 'AioConnection', name: str) -> 'APIResult':
"""
Async version of cache_get_or_create.
"""
- return await __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name, query_id)
+ return await __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name)
-def __cache_create_with_name(op_code, conn, name, query_id):
- query_struct = Query(op_code, [('cache_name', String)], query_id=query_id)
+def __cache_create_with_name(op_code, conn, name):
+ query_struct = Query(op_code, [('cache_name', String)])
return query_perform(query_struct, conn, query_params={'cache_name': name})
-def cache_destroy(connection: 'Connection', cache: Union[str, int], query_id=None) -> 'APIResult':
+def cache_destroy(connection: 'Connection', cache: Union[str, int]) -> 'APIResult':
"""
Destroys cache with a given name.
:param connection: connection to Ignite server,
:param cache: name or ID of the cache,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object.
"""
- return __cache_destroy(connection, cache, query_id)
+ return __cache_destroy(connection, cache)
-async def cache_destroy_async(connection: 'AioConnection', cache: Union[str, int], query_id=None) -> 'APIResult':
+async def cache_destroy_async(connection: 'AioConnection', cache: Union[str, int]) -> 'APIResult':
"""
Async version of cache_destroy.
"""
- return await __cache_destroy(connection, cache, query_id)
+ return await __cache_destroy(connection, cache)
-def __cache_destroy(connection, cache, query_id):
- query_struct = Query(OP_CACHE_DESTROY, [('cache_id', Int)], query_id=query_id)
+def __cache_destroy(connection, cache):
+ query_struct = Query(OP_CACHE_DESTROY, [('cache_id', Int)])
return query_perform(query_struct, connection, query_params={'cache_id': cache_id(cache)})
-def cache_get_names(connection: 'Connection', query_id=None) -> 'APIResult':
+def cache_get_names(connection: 'Connection') -> 'APIResult':
"""
Gets existing cache names.
:param connection: connection to Ignite server,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a list of cache
names, non-zero status and an error description otherwise.
"""
- return __cache_get_names(connection, query_id)
+ return __cache_get_names(connection)
-async def cache_get_names_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
+async def cache_get_names_async(connection: 'AioConnection') -> 'APIResult':
"""
Async version of cache_get_names.
"""
- return await __cache_get_names(connection, query_id)
+ return await __cache_get_names(connection)
def __post_process_cache_names(result):
@@ -216,14 +199,14 @@
return result
-def __cache_get_names(connection, query_id):
- query_struct = Query(OP_CACHE_GET_NAMES, query_id=query_id)
+def __cache_get_names(connection):
+ query_struct = Query(OP_CACHE_GET_NAMES)
return query_perform(query_struct, connection,
response_config=[('cache_names', StringArray)],
post_process_fun=__post_process_cache_names)
-def cache_create_with_config(connection: 'Connection', cache_props: dict, query_id=None) -> 'APIResult':
+def cache_create_with_config(connection: 'Connection', cache_props: dict) -> 'APIResult':
"""
Creates cache with provided configuration. An error is returned
if the name is already in use.
@@ -232,23 +215,20 @@
:param cache_props: cache configuration properties to create cache with
in form of dictionary {property code: python value}.
You must supply at least name (PROP_NAME),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if cache was created,
non-zero status and an error description otherwise.
"""
- return __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props, query_id)
+ return __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props)
-async def cache_create_with_config_async(connection: 'AioConnection', cache_props: dict, query_id=None) -> 'APIResult':
+async def cache_create_with_config_async(connection: 'AioConnection', cache_props: dict) -> 'APIResult':
"""
Async version of cache_create_with_config.
"""
- return await __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props, query_id)
+ return await __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props)
-def cache_get_or_create_with_config(connection: 'Connection', cache_props: dict, query_id=None) -> 'APIResult':
+def cache_get_or_create_with_config(connection: 'Connection', cache_props: dict) -> 'APIResult':
"""
Creates cache with provided configuration. Does nothing if the name
is already in use.
@@ -257,25 +237,20 @@
:param cache_props: cache configuration properties to create cache with
in form of dictionary {property code: python value}.
You must supply at least name (PROP_NAME),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if cache was created,
non-zero status and an error description otherwise.
"""
- return __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props, query_id)
+ return __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props)
-async def cache_get_or_create_with_config_async(connection: 'AioConnection', cache_props: dict,
- query_id=None) -> 'APIResult':
+async def cache_get_or_create_with_config_async(connection: 'AioConnection', cache_props: dict) -> 'APIResult':
"""
Async version of cache_get_or_create_with_config.
"""
- return await __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props,
- query_id)
+ return await __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props)
-def __cache_create_with_config(op_code, connection, cache_props, query_id):
+def __cache_create_with_config(op_code, connection, cache_props):
prop_types, prop_values = {}, {}
is_expiry_policy_supported = connection.protocol_context.is_expiry_policy_supported()
for i, prop_item in enumerate(cache_props.items()):
@@ -289,5 +264,5 @@
prop_values['param_count'] = len(cache_props)
following = [('param_count', Short)] + list(prop_types.items())
- query_struct = ConfigQuery(op_code, following, query_id=query_id)
+ query_struct = ConfigQuery(op_code, following)
return query_perform(query_struct, connection, query_params=prop_values)
diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py
index e134239..50c71bd 100644
--- a/pyignite/api/cluster.py
+++ b/pyignite/api/cluster.py
@@ -20,25 +20,22 @@
from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE
-def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
+def cluster_get_state(connection: 'Connection') -> 'APIResult':
"""
Get cluster state.
:param connection: Connection to use,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a state
retrieved on success, non-zero status and an error description on failure.
"""
- return __cluster_get_state(connection, query_id)
+ return __cluster_get_state(connection)
-async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
+async def cluster_get_state_async(connection: 'AioConnection') -> 'APIResult':
"""
Async version of cluster_get_state
"""
- return await __cluster_get_state(connection, query_id)
+ return await __cluster_get_state(connection)
def __post_process_get_state(result):
@@ -47,11 +44,11 @@
return result
-def __cluster_get_state(connection, query_id):
+def __cluster_get_state(connection):
if not connection.protocol_context.is_cluster_api_supported():
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
- query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
+ query_struct = Query(OP_CLUSTER_GET_STATE)
return query_perform(
query_struct, connection,
response_config=[('state', Byte)],
@@ -59,26 +56,23 @@
)
-def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
+def cluster_set_state(connection: 'Connection', state: int) -> 'APIResult':
"""
Set cluster state.
:param connection: Connection to use,
:param state: State to set,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if a value
is written, non-zero status and an error description otherwise.
"""
- return __cluster_set_state(connection, state, query_id)
+ return __cluster_set_state(connection, state)
-async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
+async def cluster_set_state_async(connection: 'AioConnection', state: int) -> 'APIResult':
"""
Async version of cluster_get_state
"""
- return await __cluster_set_state(connection, state, query_id)
+ return await __cluster_set_state(connection, state)
def __post_process_set_state(result):
@@ -87,7 +81,7 @@
return result
-def __cluster_set_state(connection, state, query_id):
+def __cluster_set_state(connection, state):
if not connection.protocol_context.is_cluster_api_supported():
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
@@ -95,8 +89,7 @@
OP_CLUSTER_CHANGE_STATE,
[
('state', Byte)
- ],
- query_id=query_id
+ ]
)
return query_perform(
query_struct, connection,
diff --git a/pyignite/api/key_value.py b/pyignite/api/key_value.py
index 5038051..5b3f72c 100644
--- a/pyignite/api/key_value.py
+++ b/pyignite/api/key_value.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Any, Iterable, Optional, Union
+from typing import Any, Iterable, Union
from pyignite.connection import AioConnection, Connection
from pyignite.queries.op_codes import (
@@ -28,12 +28,11 @@
from pyignite.queries import Query, query_perform
from .result import APIResult
-from ..queries.query import CacheInfo
+from ..queries.cache_info import CacheInfo
def cache_put(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache (overwriting existing value if any).
@@ -45,33 +44,28 @@
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if a value
is written, non-zero status and an error description otherwise.
"""
- return __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return __cache_put(connection, cache_info, key, value, key_hint, value_hint)
async def cache_put_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Async version of cache_put
"""
- return await __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return await __cache_put(connection, cache_info, key, value, key_hint, value_hint)
-def __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_put(connection, cache_info, key, value, key_hint, value_hint):
query_struct = Query(
OP_CACHE_PUT,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -83,8 +77,8 @@
)
-def cache_get(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_get(connection: 'Connection', cache_info: CacheInfo, key: Any,
+ key_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Retrieves a value from cache by key.
@@ -92,32 +86,28 @@
:param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
- should be converted,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
+ should be converted
:return: API result data object. Contains zero status and a value
retrieved on success, non-zero status and an error description on failure.
"""
- return __cache_get(connection, cache_info, key, key_hint, query_id)
+ return __cache_get(connection, cache_info, key, key_hint)
async def cache_get_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
- key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Async version of cache_get
"""
- return await __cache_get(connection, cache_info, key, key_hint, query_id)
+ return await __cache_get(connection, cache_info, key, key_hint)
-def __cache_get(connection, cache_info, key, key_hint, query_id):
+def __cache_get(connection, cache_info, key, key_hint):
query_struct = Query(
OP_CACHE_GET,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -132,40 +122,34 @@
)
-def cache_get_all(connection: 'Connection', cache_info: CacheInfo, keys: Iterable,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_all(connection: 'Connection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Retrieves multiple key-value pairs from cache.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
:param keys: list of keys or tuples of (key, key_hint),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a dict, made of
retrieved key-value pairs, non-zero status and an error description
on failure.
"""
- return __cache_get_all(connection, cache_info, keys, query_id)
+ return __cache_get_all(connection, cache_info, keys)
-async def cache_get_all_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_all_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Async version of cache_get_all.
"""
- return await __cache_get_all(connection, cache_info, keys, query_id)
+ return await __cache_get_all(connection, cache_info, keys)
-def __cache_get_all(connection, cache_info, keys, query_id):
+def __cache_get_all(connection, cache_info, keys):
query_struct = Query(
OP_CACHE_GET_ALL,
[
('cache_info', CacheInfo),
('keys', AnyDataArray()),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -180,8 +164,7 @@
)
-def cache_put_all(connection: 'Connection', cache_info: CacheInfo, pairs: dict,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_put_all(connection: 'Connection', cache_info: CacheInfo, pairs: dict) -> 'APIResult':
"""
Puts multiple key-value pairs to cache (overwriting existing associations
if any).
@@ -191,31 +174,26 @@
:param pairs: dictionary type parameters, contains key-value pairs to save.
Each key or value can be an item of representable Python type or a tuple
of (item, hint),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status if key-value pairs
are written, non-zero status and an error description otherwise.
"""
- return __cache_put_all(connection, cache_info, pairs, query_id)
+ return __cache_put_all(connection, cache_info, pairs)
-async def cache_put_all_async(connection: 'AioConnection', cache_info: CacheInfo, pairs: dict,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_put_all_async(connection: 'AioConnection', cache_info: CacheInfo, pairs: dict) -> 'APIResult':
"""
Async version of cache_put_all.
"""
- return await __cache_put_all(connection, cache_info, pairs, query_id)
+ return await __cache_put_all(connection, cache_info, pairs)
-def __cache_put_all(connection, cache_info, pairs, query_id):
+def __cache_put_all(connection, cache_info, pairs):
query_struct = Query(
OP_CACHE_PUT_ALL,
[
('cache_info', CacheInfo),
('data', Map),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -226,8 +204,8 @@
)
-def cache_contains_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_contains_key(connection: 'Connection', cache_info: CacheInfo, key: Any,
+ key_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Returns a value indicating whether given key is present in cache.
@@ -235,33 +213,29 @@
:param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
- should be converted,
- :param query_id: a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
+ should be converted
:return: API result data object. Contains zero status and a bool value
retrieved on success: `True` when key is present, `False` otherwise,
non-zero status and an error description on failure.
"""
- return __cache_contains_key(connection, cache_info, key, key_hint, query_id)
+ return __cache_contains_key(connection, cache_info, key, key_hint)
async def cache_contains_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
- key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Async version of cache_contains_key.
"""
- return await __cache_contains_key(connection, cache_info, key, key_hint, query_id)
+ return await __cache_contains_key(connection, cache_info, key, key_hint)
-def __cache_contains_key(connection, cache_info, key, key_hint, query_id):
+def __cache_contains_key(connection, cache_info, key, key_hint):
query_struct = Query(
OP_CACHE_CONTAINS_KEY,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -277,39 +251,34 @@
def cache_contains_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable,
- query_id: Optional[int] = None) -> 'APIResult':
+ ) -> 'APIResult':
"""
Returns a value indicating whether all given keys are present in cache.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
:param keys: a list of keys or (key, type hint) tuples,
- :param query_id: a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a bool value
retrieved on success: `True` when all keys are present, `False` otherwise,
non-zero status and an error description on failure.
"""
- return __cache_contains_keys(connection, cache_info, keys, query_id)
+ return __cache_contains_keys(connection, cache_info, keys)
-async def cache_contains_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_contains_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Async version of cache_contains_keys.
"""
- return await __cache_contains_keys(connection, cache_info, keys, query_id)
+ return await __cache_contains_keys(connection, cache_info, keys)
-def __cache_contains_keys(connection, cache_info, keys, query_id):
+def __cache_contains_keys(connection, cache_info, keys):
query_struct = Query(
OP_CACHE_CONTAINS_KEYS,
[
('cache_info', CacheInfo),
('keys', AnyDataArray()),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -325,8 +294,7 @@
def cache_get_and_put(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache_info, and returns the previous value
for that key, or null value if there was not such key.
@@ -339,26 +307,24 @@
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param query_id: a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and an old value
or None if a value is written, non-zero status and an error description
in case of error.
"""
- return __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint)
-async def cache_get_and_put_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_and_put_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_get_and_put.
"""
- return await __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return await __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint)
-def __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint):
query_struct = Query(
OP_CACHE_GET_AND_PUT,
[
@@ -366,7 +332,6 @@
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
- query_id=query_id,
)
return query_perform(
query_struct, connection,
@@ -383,8 +348,7 @@
def cache_get_and_replace(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache, returning previous value
for that key, if and only if there is a value currently mapped
@@ -398,32 +362,29 @@
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param query_id: a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and an old value
or None on success, non-zero status and an error description otherwise.
"""
- return __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id)
+ return __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint)
-async def cache_get_and_replace_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_and_replace_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_get_and_replace.
"""
- return await __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id)
+ return await __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint)
-def __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id):
+def __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint):
query_struct = Query(
OP_CACHE_GET_AND_REPLACE, [
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -439,8 +400,9 @@
)
-def cache_get_and_remove(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_and_remove(
+ connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Removes the cache entry with specified key, returning the value.
@@ -449,28 +411,24 @@
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param query_id: a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and an old value
or None, non-zero status and an error description otherwise.
"""
- return __cache_get_and_remove(connection, cache_info, key, key_hint, query_id)
+ return __cache_get_and_remove(connection, cache_info, key, key_hint)
async def cache_get_and_remove_async(
- connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
- return await __cache_get_and_remove(connection, cache_info, key, key_hint, query_id)
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
+ return await __cache_get_and_remove(connection, cache_info, key, key_hint)
-def __cache_get_and_remove(connection, cache_info, key, key_hint, query_id):
+def __cache_get_and_remove(connection, cache_info, key, key_hint):
query_struct = Query(
OP_CACHE_GET_AND_REMOVE, [
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -486,8 +444,7 @@
def cache_put_if_absent(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key
does not already exist.
@@ -500,33 +457,30 @@
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
-async def cache_put_if_absent_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_put_if_absent_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_put_if_absent.
"""
- return await __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return await __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
-def __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint):
query_struct = Query(
OP_CACHE_PUT_IF_ABSENT,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -543,8 +497,7 @@
def cache_get_and_put_if_absent(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key does not
already exist.
@@ -557,33 +510,30 @@
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and an old value
or None on success, non-zero status and an error description otherwise.
"""
- return __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
-async def cache_get_and_put_if_absent_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_and_put_if_absent_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_get_and_put_if_absent.
"""
- return await __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return await __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
-def __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint):
query_struct = Query(
OP_CACHE_GET_AND_PUT_IF_ABSENT,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -600,8 +550,7 @@
def cache_replace(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key already exist.
@@ -613,34 +562,31 @@
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param query_id: a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a boolean
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return __cache_replace(connection, cache_info, key, value, key_hint, value_hint)
-async def cache_replace_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_replace_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_replace.
"""
- return await __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id)
+ return await __cache_replace(connection, cache_info, key, value, key_hint, value_hint)
-def __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_replace(connection, cache_info, key, value, key_hint, value_hint):
query_struct = Query(
OP_CACHE_REPLACE,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -658,7 +604,7 @@
def cache_replace_if_equals(connection: 'Connection', cache_info: CacheInfo, key: Any, sample: Any, value: Any,
key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
- value_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+ value_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key already exists
and value equals provided sample.
@@ -674,29 +620,26 @@
the given sample should be converted
:param value_hint: (optional) Ignite data type, for which the given value
should be converted,
- :param query_id: (optional) a value generated by client and returned
- as-is in response.query_id. When the parameter is omitted, a random
- value is generated,
:return: API result data object. Contains zero status and a boolean
success code, or non-zero status and an error description if something
has gone wrong.
"""
return __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint,
- sample_hint, value_hint, query_id)
+ sample_hint, value_hint)
async def cache_replace_if_equals_async(
connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any, value: Any,
- key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_replace_if_equals.
"""
return await __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint,
- sample_hint, value_hint, query_id)
+ sample_hint, value_hint)
-def __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint, sample_hint, value_hint, query_id):
+def __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint, sample_hint, value_hint):
query_struct = Query(
OP_CACHE_REPLACE_IF_EQUALS,
[
@@ -704,8 +647,7 @@
('key', key_hint or AnyDataObject),
('sample', sample_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -722,36 +664,31 @@
)
-def cache_clear(connection: 'Connection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear(connection: 'Connection', cache_info: CacheInfo) -> 'APIResult':
"""
Clears the cache without notifying listeners or cache writers.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
- :param query_id: (optional) a value generated by client and returned
- as-is in response.query_id. When the parameter is omitted, a random
- value is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_clear(connection, cache_info, query_id)
+ return __cache_clear(connection, cache_info)
-async def cache_clear_async(
- connection: 'AioConnection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_async(connection: 'AioConnection', cache_info: CacheInfo) -> 'APIResult':
"""
Async version of cache_clear.
"""
- return await __cache_clear(connection, cache_info, query_id)
+ return await __cache_clear(connection, cache_info)
-def __cache_clear(connection, cache_info, query_id):
+def __cache_clear(connection, cache_info):
query_struct = Query(
OP_CACHE_CLEAR,
[
('cache_info', CacheInfo),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -761,8 +698,9 @@
)
-def cache_clear_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear_key(
+ connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Clears the cache key without notifying listeners or cache writers.
@@ -771,31 +709,28 @@
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param query_id: (optional) a value generated by client and returned
- as-is in response.query_id. When the parameter is omitted, a random
- value is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_clear_key(connection, cache_info, key, key_hint, query_id)
+ return __cache_clear_key(connection, cache_info, key, key_hint)
-async def cache_clear_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
- key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_key_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_clear_key.
"""
- return await __cache_clear_key(connection, cache_info, key, key_hint, query_id)
+ return await __cache_clear_key(connection, cache_info, key, key_hint)
-def __cache_clear_key(connection, cache_info, key, key_hint, query_id):
+def __cache_clear_key(connection, cache_info, key, key_hint):
query_struct = Query(
OP_CACHE_CLEAR_KEY,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -806,40 +741,33 @@
)
-def cache_clear_keys(
- connection: 'Connection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Clears the cache keys without notifying listeners or cache writers.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
:param keys: list of keys or tuples of (key, key_hint),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_clear_keys(connection, cache_info, keys, query_id)
+ return __cache_clear_keys(connection, cache_info, keys)
-async def cache_clear_keys_async(
- connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None
-) -> 'APIResult':
+async def cache_clear_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Async version of cache_clear_keys.
"""
- return await __cache_clear_keys(connection, cache_info, keys, query_id)
+ return await __cache_clear_keys(connection, cache_info, keys)
-def __cache_clear_keys(connection, cache_info, keys, query_id):
+def __cache_clear_keys(connection, cache_info, keys):
query_struct = Query(
OP_CACHE_CLEAR_KEYS,
[
('cache_info', CacheInfo),
('keys', AnyDataArray()),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -850,8 +778,9 @@
)
-def cache_remove_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_key(
+ connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Clears the cache key without notifying listeners or cache writers.
@@ -859,33 +788,30 @@
:param cache_info: cache meta info,
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
- should be converted,
- :param query_id: (optional) a value generated by client and returned
- as-is in response.query_id. When the parameter is omitted, a random
- value is generated,
+ should be converted
:return: API result data object. Contains zero status and a boolean
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_remove_key(connection, cache_info, key, key_hint, query_id)
+ return __cache_remove_key(connection, cache_info, key, key_hint)
-async def cache_remove_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
- key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_key_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_remove_key.
"""
- return await __cache_remove_key(connection, cache_info, key, key_hint, query_id)
+ return await __cache_remove_key(connection, cache_info, key, key_hint)
-def __cache_remove_key(connection, cache_info, key, key_hint, query_id):
+def __cache_remove_key(connection, cache_info, key, key_hint):
query_struct = Query(
OP_CACHE_REMOVE_KEY,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -901,8 +827,7 @@
def cache_remove_if_equals(connection: 'Connection', cache_info: CacheInfo, key: Any, sample: Any,
- key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
- query_id: Optional[int] = None) -> 'APIResult':
+ key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None) -> 'APIResult':
"""
Removes an entry with a given key if provided value is equal to
actual value, notifying listeners and cache writers.
@@ -914,35 +839,32 @@
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param sample_hint: (optional) Ignite data type, for whic
- the given sample should be converted
- :param query_id: (optional) a value generated by client and returned
- as-is in response.query_id. When the parameter is omitted, a random
- value is generated,
+ the given sample should be converted,
:return: API result data object. Contains zero status and a boolean
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id)
+ return __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint)
async def cache_remove_if_equals_async(
- connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any, key_hint: 'IgniteDataType' = None,
- sample_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any,
+ key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None
+) -> 'APIResult':
"""
Async version of cache_remove_if_equals.
"""
- return await __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id)
+ return await __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint)
-def __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id):
+def __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint):
query_struct = Query(
OP_CACHE_REMOVE_IF_EQUALS,
[
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('sample', sample_hint or AnyDataObject),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -958,40 +880,33 @@
)
-def cache_remove_keys(
- connection: 'Connection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Removes entries with given keys, notifying listeners and cache writers.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
:param keys: list of keys or tuples of (key, key_hint),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_remove_keys(connection, cache_info, keys, query_id)
+ return __cache_remove_keys(connection, cache_info, keys)
-async def cache_remove_keys_async(
- connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None
-) -> 'APIResult':
+async def cache_remove_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
"""
Async version of cache_remove_keys.
"""
- return await __cache_remove_keys(connection, cache_info, keys, query_id)
+ return await __cache_remove_keys(connection, cache_info, keys)
-def __cache_remove_keys(connection, cache_info, keys, query_id):
+def __cache_remove_keys(connection, cache_info, keys):
query_struct = Query(
OP_CACHE_REMOVE_KEYS,
[
('cache_info', CacheInfo),
('keys', AnyDataArray()),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -1002,36 +917,31 @@
)
-def cache_remove_all(connection: 'Connection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_all(connection: 'Connection', cache_info: CacheInfo) -> 'APIResult':
"""
Removes all entries from cache_info, notifying listeners and cache writers.
:param connection: connection to Ignite server,
:param cache_info: cache meta info,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_remove_all(connection, cache_info, query_id)
+ return __cache_remove_all(connection, cache_info)
-async def cache_remove_all_async(
- connection: 'AioConnection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_all_async(connection: 'AioConnection', cache_info: CacheInfo) -> 'APIResult':
"""
Async version of cache_remove_all.
"""
- return await __cache_remove_all(connection, cache_info, query_id)
+ return await __cache_remove_all(connection, cache_info)
-def __cache_remove_all(connection, cache_info, query_id):
+def __cache_remove_all(connection, cache_info):
query_struct = Query(
OP_CACHE_REMOVE_ALL,
[
('cache_info', CacheInfo),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -1041,8 +951,9 @@
)
-def cache_get_size(connection: 'Connection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_size(
+ connection: 'Connection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None
+) -> 'APIResult':
"""
Gets the number of entries in cache.
@@ -1051,24 +962,20 @@
:param peek_modes: (optional) limit count to near cache partition
(PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
(PeekModes.BACKUP). Defaults to pimary cache partitions (PeekModes.PRIMARY),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a number of
cache entries on success, non-zero status and an error description
otherwise.
"""
- return __cache_get_size(connection, cache_info, peek_modes, query_id)
+ return __cache_get_size(connection, cache_info, peek_modes)
async def cache_get_size_async(
- connection: 'AioConnection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None,
- query_id: Optional[int] = None
+ connection: 'AioConnection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None
) -> 'APIResult':
- return await __cache_get_size(connection, cache_info, peek_modes, query_id)
+ return await __cache_get_size(connection, cache_info, peek_modes)
-def __cache_get_size(connection, cache_info, peek_modes, query_id):
+def __cache_get_size(connection, cache_info, peek_modes):
if peek_modes is None:
peek_modes = []
elif not isinstance(peek_modes, (list, tuple)):
@@ -1079,8 +986,7 @@
[
('cache_info', CacheInfo),
('peek_modes', ByteArray),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, connection,
@@ -1095,8 +1001,10 @@
)
-def cache_local_peek(conn: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- peek_modes: Union[int, list, tuple] = None, query_id: Optional[int] = None) -> 'APIResult':
+def cache_local_peek(
+ conn: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ peek_modes: Union[int, list, tuple] = None
+) -> 'APIResult':
"""
Peeks at in-memory cached value using default optional peek mode.
@@ -1111,26 +1019,23 @@
:param peek_modes: (optional) limit count to near cache partition
(PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
(PeekModes.BACKUP). Defaults to primary cache partitions (PeekModes.PRIMARY),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a peeked value
(null if not found).
"""
- return __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id)
+ return __cache_local_peek(conn, cache_info, key, key_hint, peek_modes)
async def cache_local_peek_async(
conn: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
- peek_modes: Union[int, list, tuple] = None, query_id: Optional[int] = None
+ peek_modes: Union[int, list, tuple] = None,
) -> 'APIResult':
"""
Async version of cache_local_peek.
"""
- return await __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id)
+ return await __cache_local_peek(conn, cache_info, key, key_hint, peek_modes)
-def __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id):
+def __cache_local_peek(conn, cache_info, key, key_hint, peek_modes):
if peek_modes is None:
peek_modes = []
elif not isinstance(peek_modes, (list, tuple)):
@@ -1142,8 +1047,7 @@
('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('peek_modes', ByteArray),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, conn,
diff --git a/pyignite/api/sql.py b/pyignite/api/sql.py
index 267bc5b..0f41194 100644
--- a/pyignite/api/sql.py
+++ b/pyignite/api/sql.py
@@ -23,12 +23,12 @@
)
from pyignite.utils import deprecated
from .result import APIResult
-from ..queries.query import CacheInfo
+from ..queries.cache_info import CacheInfo
from ..queries.response import SQLResponse
-def scan(conn: 'Connection', cache_info: CacheInfo, page_size: int, partitions: int = -1, local: bool = False,
- query_id: int = None) -> APIResult:
+def scan(conn: 'Connection', cache_info: CacheInfo, page_size: int, partitions: int = -1,
+ local: bool = False) -> APIResult:
"""
Performs scan query.
@@ -39,9 +39,6 @@
(negative to query entire cache),
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
@@ -53,15 +50,15 @@
* `more`: bool, True if more data is available for subsequent
‘scan_cursor_get_page’ calls.
"""
- return __scan(conn, cache_info, page_size, partitions, local, query_id)
+ return __scan(conn, cache_info, page_size, partitions, local)
async def scan_async(conn: 'AioConnection', cache_info: CacheInfo, page_size: int, partitions: int = -1,
- local: bool = False, query_id: int = None) -> APIResult:
+ local: bool = False) -> APIResult:
"""
Async version of scan.
"""
- return await __scan(conn, cache_info, page_size, partitions, local, query_id)
+ return await __scan(conn, cache_info, page_size, partitions, local)
def __query_result_post_process(result):
@@ -70,7 +67,7 @@
return result
-def __scan(conn, cache_info, page_size, partitions, local, query_id):
+def __scan(conn, cache_info, page_size, partitions, local):
query_struct = Query(
OP_QUERY_SCAN,
[
@@ -79,8 +76,7 @@
('page_size', Int),
('partitions', Int),
('local', Bool),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, conn,
@@ -100,16 +96,13 @@
)
-def scan_cursor_get_page(conn: 'Connection', cursor: int, query_id: int = None) -> APIResult:
+def scan_cursor_get_page(conn: 'Connection', cursor: int) -> APIResult:
"""
Fetches the next scan query cursor page by cursor ID that is obtained
from `scan` function.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
@@ -120,20 +113,19 @@
* `more`: bool, True if more data is available for subsequent
‘scan_cursor_get_page’ calls.
"""
- return __scan_cursor_get_page(conn, cursor, query_id)
+ return __scan_cursor_get_page(conn, cursor)
-async def scan_cursor_get_page_async(conn: 'AioConnection', cursor: int, query_id: int = None) -> APIResult:
- return await __scan_cursor_get_page(conn, cursor, query_id)
+async def scan_cursor_get_page_async(conn: 'AioConnection', cursor: int) -> APIResult:
+ return await __scan_cursor_get_page(conn, cursor)
-def __scan_cursor_get_page(conn, cursor, query_id):
+def __scan_cursor_get_page(conn, cursor):
query_struct = Query(
OP_QUERY_SCAN_CURSOR_GET_PAGE,
[
('cursor', Long),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, conn,
@@ -154,7 +146,7 @@
conn: 'Connection', cache_info: CacheInfo,
table_name: str, query_str: str, page_size: int, query_args=None,
distributed_joins: bool = False, replicated_only: bool = False,
- local: bool = False, timeout: int = 0, query_id: int = None
+ local: bool = False, timeout: int = 0
) -> APIResult:
"""
Executes an SQL query over data stored in the cluster. The query returns
@@ -173,9 +165,6 @@
on local node only. Defaults to False,
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout (default),
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
@@ -203,8 +192,7 @@
('replicated_only', Bool),
('page_size', Int),
('timeout', Long),
- ],
- query_id=query_id,
+ ]
)
result = query_struct.perform(
conn,
@@ -232,17 +220,12 @@
@deprecated(version='1.2.0', reason="This API is deprecated and will be removed in the following major release. "
"Use sql_fields instead")
-def sql_cursor_get_page(
- conn: 'Connection', cursor: int, query_id: int = None,
-) -> APIResult:
+def sql_cursor_get_page(conn: 'Connection', cursor: int) -> APIResult:
"""
Retrieves the next SQL query cursor page by cursor ID from `sql`.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
@@ -258,8 +241,7 @@
OP_QUERY_SQL_CURSOR_GET_PAGE,
[
('cursor', Long),
- ],
- query_id=query_id,
+ ]
)
result = query_struct.perform(
conn,
@@ -283,7 +265,7 @@
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
- timeout: int = 0, query_id: int = None
+ timeout: int = 0
) -> APIResult:
"""
Performs SQL fields query.
@@ -313,9 +295,6 @@
:param max_rows: (optional) query-wide maximum of rows.
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout.
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
@@ -329,7 +308,7 @@
"""
return __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins,
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows,
- timeout, query_id)
+ timeout)
async def sql_fields_async(
@@ -339,19 +318,19 @@
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
- timeout: int = 0, query_id: int = None
+ timeout: int = 0
) -> APIResult:
"""
Async version of sql_fields.
"""
return await __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type,
distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy,
- include_field_names, max_rows, timeout, query_id)
+ include_field_names, max_rows, timeout)
def __sql_fields(
conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins, local,
- replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout, query_id
+ replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout
):
if query_args is None:
query_args = []
@@ -375,7 +354,6 @@
('timeout', Long),
('include_field_names', Bool),
],
- query_id=query_id,
response_type=SQLResponse
)
@@ -403,16 +381,13 @@
)
-def sql_fields_cursor_get_page(conn: 'Connection', cursor: int, field_count: int, query_id: int = None) -> APIResult:
+def sql_fields_cursor_get_page(conn: 'Connection', cursor: int, field_count: int) -> APIResult:
"""
Retrieves the next query result page by cursor ID from `sql_fields`.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
:param field_count: a number of fields in a row,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
@@ -423,24 +398,22 @@
* `more`: bool, True if more data is available for subsequent
‘sql_fields_cursor_get_page’ calls.
"""
- return __sql_fields_cursor_get_page(conn, cursor, field_count, query_id)
+ return __sql_fields_cursor_get_page(conn, cursor, field_count)
-async def sql_fields_cursor_get_page_async(conn: 'AioConnection', cursor: int, field_count: int,
- query_id: int = None) -> APIResult:
+async def sql_fields_cursor_get_page_async(conn: 'AioConnection', cursor: int, field_count: int) -> APIResult:
"""
Async version sql_fields_cursor_get_page.
"""
- return await __sql_fields_cursor_get_page(conn, cursor, field_count, query_id)
+ return await __sql_fields_cursor_get_page(conn, cursor, field_count)
-def __sql_fields_cursor_get_page(conn, cursor, field_count, query_id):
+def __sql_fields_cursor_get_page(conn, cursor, field_count):
query_struct = Query(
OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE,
[
('cursor', Long),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, conn,
@@ -469,32 +442,28 @@
return result
-def resource_close(conn: 'Connection', cursor: int, query_id: int = None) -> APIResult:
+def resource_close(conn: 'Connection', cursor: int) -> APIResult:
"""
Closes a resource, such as query cursor.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
- :param query_id: (optional) a value generated by client and returned as-is
- in response.query_id. When the parameter is omitted, a random value
- is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __resource_close(conn, cursor, query_id)
+ return __resource_close(conn, cursor)
-async def resource_close_async(conn: 'AioConnection', cursor: int, query_id: int = None) -> APIResult:
- return await __resource_close(conn, cursor, query_id)
+async def resource_close_async(conn: 'AioConnection', cursor: int) -> APIResult:
+ return await __resource_close(conn, cursor)
-def __resource_close(conn, cursor, query_id):
+def __resource_close(conn, cursor):
query_struct = Query(
OP_RESOURCE_CLOSE,
[
('cursor', Long),
- ],
- query_id=query_id,
+ ]
)
return query_perform(
query_struct, conn,
diff --git a/pyignite/api/tx_api.py b/pyignite/api/tx_api.py
new file mode 100644
index 0000000..ee8de07
--- /dev/null
+++ b/pyignite/api/tx_api.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextvars
+
+import attr
+
+from pyignite.datatypes import Byte, String, Long, Int, Bool
+from pyignite.exceptions import CacheError
+from pyignite.queries import Query, query_perform
+from pyignite.queries.op_codes import OP_TX_START, OP_TX_END
+
+__CURRENT_TX = contextvars.ContextVar('current_tx', default=None)
+
+
+def get_tx_id():
+ ctx = __CURRENT_TX.get() if __CURRENT_TX else None
+ return ctx.tx_id if ctx else None
+
+
+def get_tx_connection():
+ ctx = __CURRENT_TX.get() if __CURRENT_TX else None
+ return ctx.conn if ctx else None
+
+
+@attr.s
+class TransactionContext:
+ tx_id = attr.ib(type=int, default=None)
+ conn = attr.ib(default=None)
+
+
+def tx_start(conn, concurrency, isolation, timeout: int = 0, label: str = None):
+ result = __tx_start(conn, concurrency, isolation, timeout, label)
+ return __tx_start_post_process(result, conn)
+
+
+async def tx_start_async(conn, concurrency, isolation, timeout: int = 0, label: str = None):
+ result = await __tx_start(conn, concurrency, isolation, timeout, label)
+ return __tx_start_post_process(result, conn)
+
+
+def __tx_start(conn, concurrency, isolation, timeout, label):
+ query_struct = Query(
+ OP_TX_START,
+ [
+ ('concurrency', Byte),
+ ('isolation', Byte),
+ ('timeout', Long),
+ ('label', String)
+ ]
+ )
+ return query_perform(
+ query_struct, conn,
+ query_params={
+ 'concurrency': concurrency,
+ 'isolation': isolation,
+ 'timeout': timeout,
+ 'label': label
+ },
+ response_config=[
+ ('tx_id', Int)
+ ]
+ )
+
+
+def tx_end(tx_id, committed):
+ ctx = __CURRENT_TX.get()
+
+ if not ctx or ctx.tx_id != tx_id:
+ raise CacheError("Cannot commit transaction from different thread or coroutine")
+
+ try:
+ return __tx_end(ctx.conn, tx_id, committed)
+ finally:
+ __CURRENT_TX.set(None)
+
+
+async def tx_end_async(tx_id, committed):
+ ctx = __CURRENT_TX.get()
+
+ if not ctx or ctx.tx_id != tx_id:
+ raise CacheError("Cannot commit transaction from different thread or coroutine")
+
+ try:
+ return await __tx_end(ctx.conn, tx_id, committed)
+ finally:
+ __CURRENT_TX.set(None)
+
+
+def __tx_end(conn, tx_id, committed):
+ query_struct = Query(
+ OP_TX_END,
+ [
+ ('tx_id', Int),
+ ('committed', Bool)
+ ],
+ )
+ return query_perform(
+ query_struct, conn,
+ query_params={
+ 'tx_id': tx_id,
+ 'committed': committed
+ }
+ )
+
+
+def __tx_start_post_process(result, conn):
+ if result.status == 0:
+ tx_id = result.value['tx_id']
+ __CURRENT_TX.set(TransactionContext(tx_id, conn))
+ result.value = tx_id
+ return result
diff --git a/pyignite/cache.py b/pyignite/cache.py
index c0aaaec..79fa0f5 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -15,10 +15,11 @@
from typing import Any, Iterable, Optional, Tuple, Union
+from .api.tx_api import get_tx_connection
from .datatypes import prop_codes, ExpiryPolicy
from .datatypes.internal import AnyDataObject
from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError, NotSupportedByClusterError
-from .queries.query import CacheInfo
+from .queries.cache_info import CacheInfo
from .utils import cache_id, status_to_exception
from .api.cache_config import (
cache_create, cache_create_with_config, cache_get_or_create, cache_get_or_create_with_config, cache_destroy,
@@ -177,7 +178,8 @@
super().__init__(client, name, expiry_policy)
def _get_best_node(self, key=None, key_hint=None):
- return self.client.get_best_node(self, key, key_hint)
+ tx_conn = get_tx_connection()
+ return tx_conn if tx_conn else self.client.get_best_node(self, key, key_hint)
@property
def settings(self) -> Optional[dict]:
diff --git a/pyignite/client.py b/pyignite/client.py
index 01ee373..b411a2b 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -44,7 +44,7 @@
import random
import re
from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict
+from typing import Iterable, Type, Union, Any, Dict, Optional
from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
@@ -54,12 +54,13 @@
from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
from .connection import Connection
from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER, AFFINITY_RETRIES, AFFINITY_DELAY
-from .datatypes import BinaryObject, AnyDataObject
+from .datatypes import BinaryObject, AnyDataObject, TransactionConcurrency, TransactionIsolation
from .datatypes.base import IgniteDataType
from .datatypes.internal import tc_map
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
-from .queries.query import CacheInfo
+from .queries.cache_info import CacheInfo
from .stream import BinaryStream, READ_BACKWARD
+from .transaction import Transaction
from .utils import (
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable,
get_field_by_id, unsigned
@@ -734,9 +735,9 @@
elif isinstance(cache, Cache):
c_info = cache.cache_info
else:
- c_info = None
+ c_info = CacheInfo(protocol_context=self.protocol_context)
- if c_info:
+ if c_info.cache_id:
schema = None
return SqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
@@ -750,3 +751,19 @@
:return: :py:class:`~pyignite.cluster.Cluster` instance.
"""
return Cluster(self)
+
+ def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
+ isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
+ timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction':
+ """
+ Start thin client transaction.
+
+ :param concurrency: (optional) transaction concurrency, see
+ :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
+ :param isolation: (optional) transaction isolation level, see
+ :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
+ :param timeout: (optional) transaction timeout in seconds if float, in millis if int
+ :param label: (optional) transaction label.
+ :return: :py:class:`~pyignite.transaction.Transaction` instance.
+ """
+ return Transaction(self, concurrency, isolation, timeout, label)
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index 020f8d4..86993ba 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -29,9 +29,7 @@
# limitations under the License.
import asyncio
-from asyncio import Lock
from collections import OrderedDict
-from io import BytesIO
from typing import Union
from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
@@ -39,10 +37,66 @@
from .bitmask_feature import BitmaskFeature
from .connection import BaseConnection
-from .handshake import HandshakeRequest, HandshakeResponse
+from .handshake import HandshakeRequest, HandshakeResponse, OP_HANDSHAKE
from .protocol_context import ProtocolContext
from .ssl import create_ssl_context
-from ..stream import AioBinaryStream
+from ..stream.binary_stream import BinaryStreamBase
+
+
+class BaseProtocol(asyncio.Protocol):
+ def __init__(self, conn, handshake_fut):
+ super().__init__()
+ self._buffer = bytearray()
+ self._conn = conn
+ self._handshake_fut = handshake_fut
+
+ def connection_lost(self, exc):
+ self.__process_connection_error(exc if exc else SocketError("Connection closed"))
+
+ def connection_made(self, transport: asyncio.WriteTransport) -> None:
+ try:
+ self.__send_handshake(transport, self._conn)
+ except Exception as e:
+ self._handshake_fut.set_exception(e)
+
+ def data_received(self, data: bytes) -> None:
+ self._buffer += data
+ while self.__has_full_response():
+ packet_sz = self.__packet_size(self._buffer)
+ packet = self._buffer[0:packet_sz]
+ if not self._handshake_fut.done():
+ hs_response = self.__parse_handshake(packet, self._conn.client)
+ self._handshake_fut.set_result(hs_response)
+ else:
+ self._conn.on_message(packet)
+ self._buffer = self._buffer[packet_sz:len(self._buffer)]
+
+ def __has_full_response(self):
+ if len(self._buffer) > 4:
+ response_len = int.from_bytes(self._buffer[0:4], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
+ return response_len + 4 <= len(self._buffer)
+
+ @staticmethod
+ def __packet_size(buffer):
+ return int.from_bytes(buffer[0:4], byteorder=PROTOCOL_BYTE_ORDER, signed=True) + 4
+
+ def __process_connection_error(self, exc):
+ connected = self._handshake_fut.done()
+ if not connected:
+ self._handshake_fut.set_exception(exc)
+ self._conn.on_connection_lost(exc, connected)
+
+ @staticmethod
+ def __send_handshake(transport, conn):
+ hs_request = HandshakeRequest(conn.protocol_context, conn.username, conn.password)
+ with BinaryStreamBase(client=conn.client) as stream:
+ hs_request.from_python(stream)
+ transport.write(stream.getvalue())
+
+ @staticmethod
+ def __parse_handshake(data, client):
+ with BinaryStreamBase(client, data) as stream:
+ return HandshakeResponse.parse(stream, client.protocol_context)
class AioConnection(BaseConnection):
@@ -94,21 +148,22 @@
:param password: (optional) password to authenticate to Ignite cluster.
"""
super().__init__(client, host, port, username, password, **ssl_params)
- self._mux = Lock()
- self._reader = None
- self._writer = None
+ self._pending_reqs = {}
+ self._transport = None
+ self._loop = asyncio.get_event_loop()
+ self._closed = False
@property
def closed(self) -> bool:
""" Tells if socket is closed. """
- return self._writer is None
+ return self._closed or not self._transport or self._transport.is_closing()
async def connect(self) -> Union[dict, OrderedDict]:
"""
Connect to the given server node with protocol version fallback.
"""
- async with self._mux:
- return await self._connect()
+ self._closed = False
+ return await self._connect()
async def _connect(self) -> Union[dict, OrderedDict]:
detecting_protocol = False
@@ -139,6 +194,20 @@
self.failed = False
return result
+ def on_connection_lost(self, error, reconnect=False):
+ self.failed = True
+ for _, fut in self._pending_reqs.items():
+ fut.set_exception(error)
+ self._pending_reqs.clear()
+ if reconnect and not self._closed:
+ self._loop.create_task(self._reconnect())
+
+ def on_message(self, data):
+ req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
+ if req_id in self._pending_reqs:
+ self._pending_reqs[req_id].set_result(data)
+ del self._pending_reqs[req_id]
+
async def _connect_version(self) -> Union[dict, OrderedDict]:
"""
Connect to the given server node using protocol version
@@ -146,122 +215,56 @@
"""
ssl_context = create_ssl_context(self.ssl_params)
- self._reader, self._writer = await asyncio.open_connection(self.host, self.port, ssl=ssl_context)
+ handshake_fut = self._loop.create_future()
+ self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut),
+ host=self.host, port=self.port, ssl=ssl_context)
+ hs_response = await handshake_fut
- protocol_context = self.client.protocol_context
+ if hs_response.op_code == 0:
+ self._close_transport()
+ self._process_handshake_error(hs_response)
- hs_request = HandshakeRequest(
- protocol_context,
- self.username,
- self.password
- )
-
- with AioBinaryStream(self.client) as stream:
- await hs_request.from_python_async(stream)
- await self._send(stream.getvalue(), reconnect=False)
-
- with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
- hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
-
- if hs_response.op_code == 0:
- self._close()
- self._process_handshake_error(hs_response)
-
- return hs_response
+ return hs_response
async def reconnect(self):
- async with self._mux:
- await self._reconnect()
+ await self._reconnect()
async def _reconnect(self):
if self.alive:
return
- self._close()
-
+ self._close_transport()
# connect and silence the connection errors
try:
await self._connect()
except connection_errors:
pass
- async def request(self, data: Union[bytes, bytearray]) -> bytearray:
+ async def request(self, query_id, data: Union[bytes, bytearray]) -> bytearray:
"""
Perform request.
-
+ :param query_id: id of query.
:param data: bytes to send.
"""
- async with self._mux:
- await self._send(data)
- return await self._recv()
-
- async def _send(self, data: Union[bytes, bytearray], reconnect=True):
- if self.closed:
+ if not self.alive:
raise SocketError('Attempt to use closed connection.')
- try:
- self._writer.write(data)
- await self._writer.drain()
- except connection_errors:
- self.failed = True
- if reconnect:
- await self._reconnect()
- raise
+ return await self._send(query_id, data)
- async def _recv(self, reconnect=True) -> bytearray:
- if self.closed:
- raise SocketError('Attempt to use closed connection.')
-
- data = bytearray(1024)
- buffer = memoryview(data)
- bytes_total_received, bytes_to_receive = 0, 0
- while True:
- try:
- chunk = await self._reader.read(len(buffer))
- bytes_received = len(chunk)
- if bytes_received == 0:
- raise SocketError('Connection broken.')
-
- buffer[0:bytes_received] = chunk
- bytes_total_received += bytes_received
- except connection_errors:
- self.failed = True
- if reconnect:
- await self._reconnect()
- raise
-
- if bytes_total_received < 4:
- continue
- elif bytes_to_receive == 0:
- response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
- bytes_to_receive = response_len
-
- if response_len + 4 > len(data):
- buffer.release()
- data.extend(bytearray(response_len + 4 - len(data)))
- buffer = memoryview(data)[bytes_total_received:]
- continue
-
- if bytes_total_received >= bytes_to_receive:
- buffer.release()
- break
-
- buffer = buffer[bytes_received:]
-
- return data
+ async def _send(self, query_id, data):
+ fut = self._loop.create_future()
+ self._pending_reqs[query_id] = fut
+ self._transport.write(data)
+ return await fut
async def close(self):
- async with self._mux:
- self._close()
+ self._closed = True
+ self._close_transport()
- def _close(self):
+ def _close_transport(self):
"""
Close connection.
"""
- if self._writer:
- try:
- self._writer.close()
- except connection_errors:
- pass
-
- self._writer, self._reader = None, None
+ if self._transport:
+ self._transport.close()
+ self._transport = None
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
index be23e56..0f43aa4 100644
--- a/pyignite/connection/protocol_context.py
+++ b/pyignite/connection/protocol_context.py
@@ -87,6 +87,12 @@
"""
return self.version >= (1, 4, 0)
+ def is_transactions_supported(self) -> bool:
+ """
+ Check whether transactions supported by the current protocol.
+ """
+ return self.version >= (1, 6, 0)
+
def is_feature_flags_supported(self) -> bool:
"""
Check whether feature flags supported by the current protocol.
diff --git a/pyignite/datatypes/__init__.py b/pyignite/datatypes/__init__.py
index 4f78dce..0ebe56a 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/datatypes/__init__.py
@@ -27,3 +27,4 @@
from .standard import *
from .cluster_state import ClusterState
from .expiry_policy import ExpiryPolicy
+from .transactions import TransactionIsolation, TransactionConcurrency
diff --git a/pyignite/datatypes/cache_config.py b/pyignite/datatypes/cache_config.py
index a2b4322..4ac28e4 100644
--- a/pyignite/datatypes/cache_config.py
+++ b/pyignite/datatypes/cache_config.py
@@ -21,6 +21,7 @@
__all__ = [
'get_cache_config_struct', 'CacheMode', 'PartitionLossPolicy',
'RebalanceMode', 'WriteSynchronizationMode', 'IndexType',
+ 'CacheAtomicityMode'
]
diff --git a/pyignite/datatypes/transactions.py b/pyignite/datatypes/transactions.py
new file mode 100644
index 0000000..83e6c06
--- /dev/null
+++ b/pyignite/datatypes/transactions.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from enum import IntEnum
+
+
+class TransactionConcurrency(IntEnum):
+ """
+ Defines different cache transaction concurrency control.
+ """
+
+ #: Optimistic concurrency control.
+ OPTIMISTIC = 0
+
+ #: Pessimistic concurrency control.
+ PESSIMISTIC = 1
+
+
+class TransactionIsolation(IntEnum):
+ """
+ Defines different cache transaction isolation levels.
+ """
+
+ #: Read committed isolation level.Read committed isolation level.
+ READ_COMMITTED = 0
+
+ #: Repeatable read isolation level.
+ REPEATABLE_READ = 1
+
+ #: Serializable isolation level.
+ SERIALIZABLE = 2
diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py
index 215ccd0..fdf1261 100644
--- a/pyignite/exceptions.py
+++ b/pyignite/exceptions.py
@@ -103,10 +103,18 @@
class NotSupportedByClusterError(Exception):
"""
- This exception is raised, whenever cluster is not supported specific
+ This exception is raised, whenever cluster does not supported specific
operation probably because it is outdated.
"""
pass
+class NotSupportedError(Exception):
+ """
+ This exception is raised, whenever client does not support specific
+ operation.
+ """
+ pass
+
+
connection_errors = (IOError, OSError, EOFError)
diff --git a/pyignite/queries/cache_info.py b/pyignite/queries/cache_info.py
new file mode 100644
index 0000000..6caf3ce
--- /dev/null
+++ b/pyignite/queries/cache_info.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import attr
+
+from pyignite.api.tx_api import get_tx_id
+from pyignite.connection.protocol_context import ProtocolContext
+from pyignite.constants import PROTOCOL_BYTE_ORDER
+from pyignite.datatypes import ExpiryPolicy
+from pyignite.exceptions import NotSupportedByClusterError
+
+
+@attr.s
+class CacheInfo:
+ cache_id = attr.ib(kw_only=True, type=int, default=0)
+ expiry_policy = attr.ib(kw_only=True, type=ExpiryPolicy, default=None)
+ protocol_context = attr.ib(kw_only=True, type=ProtocolContext)
+
+ TRANSACTIONS_MASK = 0x02
+ EXPIRY_POLICY_MASK = 0x04
+
+ @classmethod
+ async def from_python_async(cls, stream, value):
+ return cls.from_python(stream, value)
+
+ @classmethod
+ def from_python(cls, stream, value):
+ cache_id = value.cache_id if value else 0
+ expiry_policy = value.expiry_policy if value else None
+ flags = 0
+
+ stream.write(cache_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
+
+ if expiry_policy:
+ if not value.protocol_context.is_expiry_policy_supported():
+ raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
+ flags |= cls.EXPIRY_POLICY_MASK
+
+ tx_id = get_tx_id()
+ if value.protocol_context.is_transactions_supported() and tx_id:
+ flags |= cls.TRANSACTIONS_MASK
+
+ stream.write(flags.to_bytes(1, byteorder=PROTOCOL_BYTE_ORDER))
+
+ if expiry_policy:
+ ExpiryPolicy.write_policy(stream, expiry_policy)
+
+ if flags & cls.TRANSACTIONS_MASK:
+ stream.write(tx_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py
index c152f7c..cf19b11 100644
--- a/pyignite/queries/op_codes.py
+++ b/pyignite/queries/op_codes.py
@@ -66,5 +66,8 @@
OP_GET_BINARY_TYPE = 3002
OP_PUT_BINARY_TYPE = 3003
+OP_TX_START = 4000
+OP_TX_END = 4001
+
OP_CLUSTER_GET_STATE = 5000
OP_CLUSTER_CHANGE_STATE = 5001
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index d971eef..4bcab9f 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -15,16 +15,12 @@
import ctypes
from io import SEEK_CUR
-from random import randint
import attr
from pyignite.api.result import APIResult
from pyignite.connection import Connection, AioConnection
-from pyignite.connection.protocol_context import ProtocolContext
-from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED, PROTOCOL_BYTE_ORDER
-from pyignite.datatypes import ExpiryPolicy
-from pyignite.exceptions import NotSupportedByClusterError
+from pyignite.constants import MAX_LONG, RHF_TOPOLOGY_CHANGED
from pyignite.queries.response import Response
from pyignite.stream import AioBinaryStream, BinaryStream, READ_BACKWARD
@@ -47,42 +43,29 @@
return _internal()
-@attr.s
-class CacheInfo:
- cache_id = attr.ib(kw_only=True, type=int)
- expiry_policy = attr.ib(kw_only=True, type=ExpiryPolicy, default=None)
- protocol_context = attr.ib(kw_only=True, type=ProtocolContext)
+_QUERY_COUNTER = 0
- @classmethod
- async def from_python_async(cls, stream, value):
- return cls.from_python(stream, value)
- @classmethod
- def from_python(cls, stream, value):
- cache_id = value.cache_id if value else 0
- expiry_policy = value.expiry_policy if value else None
- flags = 0
-
- stream.write(cache_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
-
- if expiry_policy:
- if not value.protocol_context.is_expiry_policy_supported():
- raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
- flags |= 0x04
-
- stream.write(flags.to_bytes(1, byteorder=PROTOCOL_BYTE_ORDER))
- if expiry_policy:
- ExpiryPolicy.write_policy(stream, expiry_policy)
+def _get_query_id():
+ global _QUERY_COUNTER
+ if _QUERY_COUNTER >= MAX_LONG:
+ return 0
+ _QUERY_COUNTER += 1
+ return _QUERY_COUNTER
@attr.s
class Query:
op_code = attr.ib(type=int)
following = attr.ib(type=list, factory=list)
- query_id = attr.ib(type=int, default=None)
+ query_id = attr.ib(type=int)
response_type = attr.ib(type=type(Response), default=Response)
_query_c_type = None
+ @query_id.default
+ def _set_query_id(self):
+ return _get_query_id()
+
@classmethod
def build_c_type(cls):
if cls._query_c_type is None:
@@ -119,14 +102,14 @@
self.__write_header(stream, header, init_pos)
def _build_header(self, stream):
+ global _QUERY_COUNTER
header_class = self.build_c_type()
header_len = ctypes.sizeof(header_class)
stream.seek(header_len, SEEK_CUR)
header = header_class()
header.op_code = self.op_code
- if self.query_id is None:
- header.query_id = randint(MIN_LONG, MAX_LONG)
+ header.query_id = self.query_id
return header
@@ -185,7 +168,7 @@
"""
with AioBinaryStream(conn.client) as stream:
await self.from_python_async(stream, query_params)
- data = await conn.request(stream.getvalue())
+ data = await conn.request(self.query_id, stream.getvalue())
response_struct = self.response_type(protocol_context=conn.protocol_context,
following=response_config, **kwargs)
diff --git a/pyignite/transaction.py b/pyignite/transaction.py
new file mode 100644
index 0000000..5bafa6b
--- /dev/null
+++ b/pyignite/transaction.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+from typing import Union
+
+from pyignite.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async
+from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
+from pyignite.exceptions import CacheError
+from pyignite.utils import status_to_exception
+
+
+def _convert_to_millis(timeout: Union[int, float]) -> int:
+ if isinstance(timeout, float):
+ return math.floor(timeout * 1000)
+ return timeout
+
+
+class Transaction:
+ """
+ Thin client transaction.
+ """
+ def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
+ isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
+ self.client, self.concurrency = client, concurrency
+ self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
+ self.label, self.closed = label, False
+ self.tx_id = self.__start_tx()
+
+ def commit(self) -> None:
+ """
+ Commit transaction.
+ """
+ if not self.closed:
+ self.closed = True
+ return self.__end_tx(True)
+
+ def rollback(self) -> None:
+ """
+ Rollback transaction.
+ """
+ self.close()
+
+ def close(self) -> None:
+ """
+ Close transaction.
+ """
+ if not self.closed:
+ self.closed = True
+ return self.__end_tx(False)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ @status_to_exception(CacheError)
+ def __start_tx(self):
+ conn = self.client.random_node
+ return tx_start(conn, self.concurrency, self.isolation, self.timeout, self.label)
+
+ @status_to_exception(CacheError)
+ def __end_tx(self, committed):
+ return tx_end(self.tx_id, committed)
+
+
+class AioTransaction:
+ """
+ Async thin client transaction.
+ """
+ def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
+ isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
+ self.client, self.concurrency = client, concurrency
+ self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
+ self.label, self.closed = label, False
+
+ def __await__(self):
+ return (yield from self.__aenter__().__await__())
+
+ async def commit(self) -> None:
+ """
+ Commit transaction.
+ """
+ if not self.closed:
+ self.closed = True
+ return await self.__end_tx(True)
+
+ async def rollback(self) -> None:
+ """
+ Rollback transaction.
+ """
+ await self.close()
+
+ async def close(self) -> None:
+ """
+ Close transaction.
+ """
+ if not self.closed:
+ self.closed = True
+ return await self.__end_tx(False)
+
+ async def __aenter__(self):
+ self.tx_id = await self.__start_tx()
+ self.closed = False
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.close()
+
+ @status_to_exception(CacheError)
+ async def __start_tx(self):
+ conn = await self.client.random_node()
+ return await tx_start_async(conn, self.concurrency, self.isolation, self.timeout, self.label)
+
+ @status_to_exception(CacheError)
+ async def __end_tx(self, committed):
+ return await tx_end_async(self.tx_id, committed)
diff --git a/requirements/install.txt b/requirements/install.txt
index feb4eb6..aa8290f 100644
--- a/requirements/install.txt
+++ b/requirements/install.txt
@@ -1,3 +1,4 @@
# these pip packages are necessary for the pyignite to run
attrs>=20.3.0
+contextvars>=2.4;python_version<"3.7"
diff --git a/tests/affinity/conftest.py b/tests/affinity/conftest.py
index da645c1..eca31b2 100644
--- a/tests/affinity/conftest.py
+++ b/tests/affinity/conftest.py
@@ -54,7 +54,7 @@
@pytest.fixture
-async def async_client(connection_param):
+async def async_client(connection_param, event_loop):
client = AioClient(partition_aware=True)
try:
await client.connect(connection_param)
diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py
index 90c71b2..0d0ec24 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -52,7 +52,7 @@
async def patched_send_async(self, *args, **kwargs):
"""Patched send function that push to queue idx of server to which request is routed."""
- buf = args[0]
+ buf = args[1]
if buf and len(buf) >= 6:
op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
# Filter only caches operation.
@@ -229,7 +229,7 @@
@pytest.fixture
-async def async_client_routed():
+async def async_client_routed(event_loop):
client = AioClient(partition_aware=True)
try:
await client.connect(client_routed_connection_string)
diff --git a/tests/affinity/test_affinity_single_connection.py b/tests/affinity/test_affinity_single_connection.py
index c3d2473..c679bdd 100644
--- a/tests/affinity/test_affinity_single_connection.py
+++ b/tests/affinity/test_affinity_single_connection.py
@@ -29,7 +29,7 @@
@pytest.fixture
-async def async_client():
+async def async_client(event_loop):
client = AioClient(partition_aware=True)
try:
await client.connect('127.0.0.1', 10801)
diff --git a/tests/common/test_transactions.py b/tests/common/test_transactions.py
new file mode 100644
index 0000000..0cfa46a
--- /dev/null
+++ b/tests/common/test_transactions.py
@@ -0,0 +1,231 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import itertools
+import sys
+import time
+
+import pytest
+
+from pyignite import AioClient, Client
+from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
+from pyignite.datatypes.cache_config import CacheAtomicityMode
+from pyignite.datatypes.prop_codes import PROP_NAME, PROP_CACHE_ATOMICITY_MODE
+from pyignite.exceptions import CacheError
+
+
+@pytest.fixture
+def connection_param():
+ return [('127.0.0.1', 10800 + i) for i in range(1, 4)]
+
+
+@pytest.fixture(params=['with-partition-awareness', 'without-partition-awareness'])
+async def async_client(request, connection_param, event_loop):
+ client = AioClient(partition_aware=request.param == 'with-partition-awareness')
+ try:
+ await client.connect(connection_param)
+ if not client.protocol_context.is_transactions_supported():
+ pytest.skip(f'skipped {request.node.name}, transaction api is not supported.')
+ elif sys.version_info < (3, 7):
+ pytest.skip(f'skipped {request.node.name}, transaction api is not supported'
+ f'for async client on python {sys.version}')
+ else:
+ yield client
+ finally:
+ await client.close()
+
+
+@pytest.fixture(params=['with-partition-awareness', 'without-partition-awareness'])
+def client(request, connection_param):
+ client = Client(partition_aware=request.param == 'with-partition-awareness')
+ try:
+ client.connect(connection_param)
+ if not client.protocol_context.is_transactions_supported():
+ pytest.skip(f'skipped {request.node.name}, transaction api is not supported.')
+ else:
+ yield client
+ finally:
+ client.close()
+
+
+@pytest.fixture
+def tx_cache(client):
+ cache = client.get_or_create_cache({
+ PROP_NAME: 'tx_cache',
+ PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL
+ })
+ yield cache
+ cache.destroy()
+
+
+@pytest.fixture
+async def async_tx_cache(async_client):
+ cache = await async_client.get_or_create_cache({
+ PROP_NAME: 'tx_cache',
+ PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL
+ })
+ yield cache
+ await cache.destroy()
+
+
+@pytest.mark.parametrize(
+ ['iso_level', 'concurrency'],
+ itertools.product(
+ [iso_level for iso_level in TransactionIsolation],
+ [concurrency for concurrency in TransactionConcurrency]
+ )
+)
+def test_simple_transaction(client, tx_cache, iso_level, concurrency):
+ with client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ tx_cache.put(1, 1)
+ tx.commit()
+
+ assert tx_cache.get(1) == 1
+
+ with client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ tx_cache.put(1, 10)
+ tx.rollback()
+
+ assert tx_cache.get(1) == 1
+
+ with client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ tx_cache.put(1, 10)
+
+ assert tx_cache.get(1) == 1
+
+
+@pytest.mark.parametrize(
+ ['iso_level', 'concurrency'],
+ itertools.product(
+ [iso_level for iso_level in TransactionIsolation],
+ [concurrency for concurrency in TransactionConcurrency]
+ )
+)
+@pytest.mark.asyncio
+async def test_simple_transaction_async(async_client, async_tx_cache, iso_level, concurrency):
+ async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ await async_tx_cache.put(1, 1)
+ await tx.commit()
+
+ assert await async_tx_cache.get(1) == 1
+
+ async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ await async_tx_cache.put(1, 10)
+ await tx.rollback()
+
+ assert await async_tx_cache.get(1) == 1
+
+ async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ async_tx_cache.put(1, 10)
+
+ assert await async_tx_cache.get(1) == 1
+
+
+def test_transactions_timeout(client, tx_cache):
+ with client.tx_start(timeout=2.0, label='tx-sync') as tx:
+ tx_cache.put(1, 1)
+ time.sleep(3.0)
+ with pytest.raises(CacheError) as to_error:
+ tx.commit()
+ assert 'tx-sync' in str(to_error) and 'timed out' in str(to_error)
+
+
+@pytest.mark.asyncio
+async def test_transactions_timeout_async(async_client, async_tx_cache):
+ async def update(i, timeout):
+ async with async_client.tx_start(
+ label=f'tx-{i}', timeout=timeout, isolation=TransactionIsolation.READ_COMMITTED,
+ concurrency=TransactionConcurrency.PESSIMISTIC
+ ) as tx:
+ k1, k2 = (1, 2) if i % 2 == 0 else (2, 1)
+ v = f'value-{i}'
+
+ await async_tx_cache.put(k1, v)
+ await async_tx_cache.put(k2, v)
+
+ await tx.commit()
+
+ task = asyncio.gather(*[update(i, 2.0) for i in range(20)], return_exceptions=True)
+ await asyncio.sleep(5.0)
+ assert task.done() # Check that all transactions completed or rolled-back on timeout
+ for i, ex in enumerate(task.result()):
+ if ex:
+ assert 'TransactionTimeoutException' in str(ex) or \
+ 'Cache transaction timed out' # check that transaction was rolled back.
+ assert f'tx-{i}' in str(ex) # check that tx label presents in error
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize('iso_level', [iso_level for iso_level in TransactionIsolation])
+async def test_concurrent_pessimistic_transactions_same_key(async_client, async_tx_cache, iso_level):
+ async def update(i):
+ async with async_client.tx_start(
+ label=f'tx_lbl_{i}', isolation=iso_level, concurrency=TransactionConcurrency.PESSIMISTIC
+ ) as tx:
+ await async_tx_cache.put(1, f'test-{i}')
+ await tx.commit()
+
+ res = await asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
+ assert not any(res) # Checks that all transactions proceeds
+
+
+@pytest.mark.asyncio
+async def test_concurrent_optimistic_transactions_no_deadlock(async_client, async_tx_cache, event_loop):
+ """
+ Check that optimistic transactions are deadlock safe.
+ """
+ async def update(i):
+ async with async_client.tx_start(
+ label=f'tx-{i}', isolation=TransactionIsolation.SERIALIZABLE,
+ concurrency=TransactionConcurrency.OPTIMISTIC
+ ) as tx:
+ k1, k2 = (1, 2) if i % 2 == 0 else (2, 1)
+ v = f'value-{i}'
+
+ await async_tx_cache.put(k1, v)
+ await async_tx_cache.put(k2, v)
+
+ await tx.commit()
+
+ task = asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
+ await asyncio.sleep(2.0)
+ assert task.done() # Check that there are not any deadlock.
+ assert not all(task.result()) # Check that some (or all) transactions proceeds.
+ for i, ex in enumerate(task.result()):
+ if ex:
+ assert 'lock conflict' in str(ex) # check optimistic prepare phase failed
+ assert f'tx-{i}' in str(ex) # check that tx label presents in error
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+ ['iso_level', 'concurrency'],
+ itertools.product(
+ [iso_level for iso_level in TransactionIsolation],
+ [concurrency for concurrency in TransactionConcurrency]
+ )
+)
+async def test_concurrent_transactions(async_client, async_tx_cache, iso_level, concurrency):
+ async def update(i):
+ async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+ await async_tx_cache.put(i, f'test-{i}')
+ if i % 2 == 0:
+ await tx.commit()
+ else:
+ await tx.rollback()
+
+ await asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
+ assert await async_tx_cache.get_all(list(range(20))) == {i: f'test-{i}' for i in range(20) if i % 2 == 0}
diff --git a/tests/config/ignite-config.xml.jinja2 b/tests/config/ignite-config.xml.jinja2
index 325a581..22b103e 100644
--- a/tests/config/ignite-config.xml.jinja2
+++ b/tests/config/ignite-config.xml.jinja2
@@ -60,6 +60,7 @@
<property name="host" value="127.0.0.1"/>
<property name="port" value="{{ ignite_client_port }}"/>
<property name="portRange" value="0"/>
+ <property name="threadPoolSize" value="100"/>
{% if use_ssl %}
<property name="sslEnabled" value="true"/>
<property name="useIgniteSslContextFactory" value="false"/>
diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py
index f1ffcfd..e94853a 100644
--- a/tests/custom/test_cluster.py
+++ b/tests/custom/test_cluster.py
@@ -47,10 +47,9 @@
@pytest.fixture(autouse=True)
def cluster_api_supported(request, server1):
client = Client()
- client.connect('127.0.0.1', 10801)
-
- if not client.protocol_context.is_cluster_api_supported():
- pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.')
+ with client.connect('127.0.0.1', 10801):
+ if not client.protocol_context.is_cluster_api_supported():
+ pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.')
def test_cluster_set_active(with_persistence):