IGNITE-14911 Unify timeouts, add support for datetime.timedelta for expiry_policy - Fixes #44.
diff --git a/docs/async_examples.rst b/docs/async_examples.rst index af61a75..4ce65ce 100644 --- a/docs/async_examples.rst +++ b/docs/async_examples.rst
@@ -63,12 +63,12 @@ .. literalinclude:: ../examples/expiry_policy.py :language: python :dedent: 12 - :lines: 72-75 + :lines: 73-76 .. literalinclude:: ../examples/expiry_policy.py :language: python :dedent: 12 - :lines: 81-89 + :lines: 82-90 Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use :py:meth:`~pyignite.cache.BaseCache.with_expire_policy` @@ -76,7 +76,7 @@ .. literalinclude:: ../examples/expiry_policy.py :language: python :dedent: 12 - :lines: 96-105 + :lines: 97-106 Transactions ------------
diff --git a/docs/examples.rst b/docs/examples.rst index e01f112..4ca0910 100644 --- a/docs/examples.rst +++ b/docs/examples.rst
@@ -97,12 +97,12 @@ .. literalinclude:: ../examples/expiry_policy.py :language: python :dedent: 12 - :lines: 31-34 + :lines: 32-35 .. literalinclude:: ../examples/expiry_policy.py :language: python :dedent: 12 - :lines: 40-46 + :lines: 41-47 Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use :py:meth:`~pyignite.cache.BaseCache.with_expire_policy` @@ -110,7 +110,7 @@ .. literalinclude:: ../examples/expiry_policy.py :language: python :dedent: 12 - :lines: 53-60 + :lines: 54-61 Scan ====
diff --git a/examples/expiry_policy.py b/examples/expiry_policy.py index 2002da1..3dbe54b 100644 --- a/examples/expiry_policy.py +++ b/examples/expiry_policy.py
@@ -14,6 +14,7 @@ # limitations under the License. import asyncio import time +from datetime import timedelta from pyignite import Client, AioClient from pyignite.datatypes import ExpiryPolicy @@ -30,7 +31,7 @@ try: ttl_cache = client.create_cache({ PROP_NAME: 'test', - PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0) + PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0)) }) except NotSupportedByClusterError: print("'ExpiryPolicy' API is not supported by cluster. Finishing...") @@ -50,7 +51,7 @@ print("Create simple Cache and set TTL through `with_expire_policy`") simple_cache = client.create_cache('test') try: - ttl_cache = simple_cache.with_expire_policy(access=1.0) + ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0)) ttl_cache.put(1, 1) time.sleep(0.5) print(f"key = {1}, value = {ttl_cache.get(1)}") @@ -71,7 +72,7 @@ try: ttl_cache = await client.create_cache({ PROP_NAME: 'test', - PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0) + PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0)) }) except NotSupportedByClusterError: print("'ExpiryPolicy' API is not supported by cluster. Finishing...") @@ -93,7 +94,7 @@ print("Create simple Cache and set TTL through `with_expire_policy`") simple_cache = await client.create_cache('test') try: - ttl_cache = simple_cache.with_expire_policy(access=1.0) + ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0)) await ttl_cache.put(1, 1) await asyncio.sleep(0.5) value = await ttl_cache.get(1)
diff --git a/examples/transactions.py b/examples/transactions.py index ef9b08c..53da05f 100644 --- a/examples/transactions.py +++ b/examples/transactions.py
@@ -62,7 +62,7 @@ # rollback transaction on timeout. try: - async with client.tx_start(timeout=1.0, label='long-tx') as tx: + async with client.tx_start(timeout=1000, label='long-tx') as tx: await cache.put(key, 'fail') await asyncio.sleep(2.0) await tx.commit() @@ -114,7 +114,7 @@ # rollback transaction on timeout. try: - with client.tx_start(timeout=1.0, label='long-tx') as tx: + with client.tx_start(timeout=1000, label='long-tx') as tx: cache.put(key, 'fail') time.sleep(2.0) tx.commit()
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 2bc850b..0bb2b8c 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py
@@ -489,15 +489,15 @@ def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC, isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ, - timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction': + timeout: int = 0, label: Optional[str] = None) -> 'AioTransaction': """ Start async thin client transaction. **Supported only python 3.7+** :param concurrency: (optional) transaction concurrency, see - :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency` + :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`, :param isolation: (optional) transaction isolation level, see - :py:class:`~pyignite.datatypes.transactions.TransactionIsolation` - :param timeout: (optional) transaction timeout in seconds if float, in millis if int + :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`, + :param timeout: (optional) transaction timeout in milliseconds, :param label: (optional) transaction label. :return: :py:class:`~pyignite.transaction.AioTransaction` instance. """
diff --git a/pyignite/cache.py b/pyignite/cache.py index 79fa0f5..51f07c9 100644 --- a/pyignite/cache.py +++ b/pyignite/cache.py
@@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime from typing import Any, Iterable, Optional, Tuple, Union from .api.tx_api import get_tx_connection @@ -136,16 +136,16 @@ def with_expire_policy( self, expiry_policy: Optional[ExpiryPolicy] = None, - create: Union[int, float] = ExpiryPolicy.UNCHANGED, - update: Union[int, float] = ExpiryPolicy.UNCHANGED, - access: Union[int, float] = ExpiryPolicy.UNCHANGED + create: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED, + update: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED, + access: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED ): """ :param expiry_policy: optional :class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy` - object. If it is set, other params will be ignored. - :param create: create TTL in seconds (float) or milliseconds (int), - :param update: Create TTL in seconds (float) or milliseconds (int), - :param access: Create TTL in seconds (float) or milliseconds (int). + object. If it is set, other params will be ignored, + :param create: TTL for create in milliseconds or :py:class:`~time.timedelta`, + :param update: TTL for update in milliseconds or :py:class:`~time.timedelta`, + :param access: TTL for access in milliseconds or :py:class:`~time.timedelta`, :return: cache decorator with expiry policy set. """ if not self.client.protocol_context.is_expiry_policy_supported():
diff --git a/pyignite/client.py b/pyignite/client.py index f848bcc..6a499a3 100644 --- a/pyignite/client.py +++ b/pyignite/client.py
@@ -744,15 +744,15 @@ def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC, isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ, - timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction': + timeout: int = 0, label: Optional[str] = None) -> 'Transaction': """ Start thin client transaction. :param concurrency: (optional) transaction concurrency, see - :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency` + :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`, :param isolation: (optional) transaction isolation level, see - :py:class:`~pyignite.datatypes.transactions.TransactionIsolation` - :param timeout: (optional) transaction timeout in seconds if float, in millis if int + :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`, + :param timeout: (optional) transaction timeout in milliseconds, :param label: (optional) transaction label. :return: :py:class:`~pyignite.transaction.Transaction` instance. """
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py index 49327a3..0d7f402 100644 --- a/pyignite/datatypes/cache_properties.py +++ b/pyignite/datatypes/cache_properties.py
@@ -14,6 +14,8 @@ # limitations under the License. import ctypes +import math +from typing import Union from . import ExpiryPolicy from .prop_codes import * @@ -137,6 +139,20 @@ return cls.from_python(stream, value) +class TimeoutProp(PropBase): + prop_data_class = Long + + @classmethod + def from_python(cls, stream, value: int): + if not isinstance(value, int) or value < 0: + raise ValueError(f'Timeout value should be a positive integer, {value} passed instead') + return super().from_python(stream, value) + + @classmethod + async def from_python_async(cls, stream, value): + return cls.from_python(stream, value) + + class PropName(PropBase): prop_code = PROP_NAME prop_data_class = String @@ -227,9 +243,8 @@ prop_data_class = Long -class PropRebalanceTimeout(PropBase): +class PropRebalanceTimeout(TimeoutProp): prop_code = PROP_REBALANCE_TIMEOUT - prop_data_class = Long class PropRebalanceBatchSize(PropBase): @@ -262,9 +277,8 @@ prop_data_class = CacheKeyConfiguration -class PropDefaultLockTimeout(PropBase): +class PropDefaultLockTimeout(TimeoutProp): prop_code = PROP_DEFAULT_LOCK_TIMEOUT - prop_data_class = Long class PropMaxConcurrentAsyncOperation(PropBase):
diff --git a/pyignite/datatypes/expiry_policy.py b/pyignite/datatypes/expiry_policy.py index d729da5..95e37db 100644 --- a/pyignite/datatypes/expiry_policy.py +++ b/pyignite/datatypes/expiry_policy.py
@@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import ctypes +import math +from datetime import timedelta from io import SEEK_CUR from typing import Union @@ -22,13 +24,16 @@ def _positive(_, attrib, value): + if isinstance(value, timedelta): + value = value.total_seconds() * 1000 + if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]: raise ValueError(f"'{attrib.name}' value must not be negative") def _write_duration(stream, value): - if isinstance(value, float): - value = int(value * 1000) + if isinstance(value, timedelta): + value = math.floor(value.total_seconds() * 1000) stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True)) @@ -44,17 +49,17 @@ #: Set TTL eternal. ETERNAL = -1 - #: Set TTL for create in seconds(float) or millis(int) - create = attr.ib(kw_only=True, default=UNCHANGED, - validator=[attr.validators.instance_of((int, float)), _positive]) + #: Set TTL for create in milliseconds or :py:class:`~time.timedelta` + create = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta], + validator=[attr.validators.instance_of((int, timedelta)), _positive]) - #: Set TTL for update in seconds(float) or millis(int) - update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float], - validator=[attr.validators.instance_of((int, float)), _positive]) + #: Set TTL for update in milliseconds or :py:class:`~time.timedelta` + update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta], + validator=[attr.validators.instance_of((int, timedelta)), _positive]) - #: Set TTL for access in seconds(float) or millis(int) - access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float], - validator=[attr.validators.instance_of((int, float)), _positive]) + #: Set TTL for access in milliseconds or :py:class:`~time.timedelta` + access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta], + validator=[attr.validators.instance_of((int, timedelta)), _positive]) class _CType(ctypes.LittleEndianStructure): _pack_ = 1
diff --git a/pyignite/transaction.py b/pyignite/transaction.py index 5bafa6b..eb77f8d 100644 --- a/pyignite/transaction.py +++ b/pyignite/transaction.py
@@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import math -from typing import Union +from enum import IntEnum +from typing import Union, Type from pyignite.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async from pyignite.datatypes import TransactionIsolation, TransactionConcurrency @@ -22,21 +22,41 @@ from pyignite.utils import status_to_exception -def _convert_to_millis(timeout: Union[int, float]) -> int: - if isinstance(timeout, float): - return math.floor(timeout * 1000) - return timeout +def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]): + if value not in cls: + raise ValueError(f'{value} not in {cls}') + return value -class Transaction: +def _validate_timeout(value): + if not isinstance(value, int) or value < 0: + raise ValueError(f'Timeout value should be a positive integer, {value} passed instead') + return value + + +def _validate_label(value): + if value and not isinstance(value, str): + raise ValueError(f'Label should be str, {type(value)} passed instead') + return value + + +class _BaseTransaction: + def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC, + isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None): + self.client = client + self.concurrency = _validate_int_enum_param(concurrency, TransactionConcurrency) + self.isolation = _validate_int_enum_param(isolation, TransactionIsolation) + self.timeout = _validate_timeout(timeout) + self.label, self.closed = _validate_label(label), False + + +class Transaction(_BaseTransaction): """ Thin client transaction. """ def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC, isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None): - self.client, self.concurrency = client, concurrency - self.isolation, self.timeout = isolation, _convert_to_millis(timeout) - self.label, self.closed = label, False + super().__init__(client, concurrency, isolation, timeout, label) self.tx_id = self.__start_tx() def commit(self) -> None: @@ -77,15 +97,13 @@ return tx_end(self.tx_id, committed) -class AioTransaction: +class AioTransaction(_BaseTransaction): """ Async thin client transaction. """ def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC, isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None): - self.client, self.concurrency = client, concurrency - self.isolation, self.timeout = isolation, _convert_to_millis(timeout) - self.label, self.closed = label, False + super().__init__(client, concurrency, isolation, timeout, label) def __await__(self): return (yield from self.__aenter__().__await__())
diff --git a/tests/common/test_expiry_policy.py b/tests/common/test_expiry_policy.py index 9dc4152..939a380 100644 --- a/tests/common/test_expiry_policy.py +++ b/tests/common/test_expiry_policy.py
@@ -14,6 +14,7 @@ # limitations under the License. import asyncio import time +from datetime import timedelta import pytest @@ -23,11 +24,11 @@ @pytest.mark.skip_if_no_expiry_policy def test_expiry_policy(cache): - ttl, num_retries = 0.6, 10 + ttl, num_retries = timedelta(seconds=0.6), 10 cache_eternal = cache.with_expire_policy(create=ExpiryPolicy.ETERNAL) - cache_created = cache.with_expire_policy(create=0.6) - cache_updated = cache.with_expire_policy(update=0.6) - cache_accessed = cache.with_expire_policy(access=0.6) + cache_created = cache.with_expire_policy(create=ttl) + cache_updated = cache.with_expire_policy(update=ttl) + cache_accessed = cache.with_expire_policy(access=ttl) for _ in range(num_retries): cache.clear() @@ -39,11 +40,11 @@ cache_updated.put(2, 2) cache_accessed.put(3, 3) - time.sleep(ttl * 2 / 3) + time.sleep(ttl.total_seconds() * 2 / 3) result = [cache.contains_key(k) for k in range(4)] - if time.time() - start >= ttl: + if time.time() - start >= ttl.total_seconds(): continue assert all(result) @@ -55,20 +56,20 @@ cache_updated.put(2, 3) # Check that update policy works. cache_accessed.get(3) # Check that access policy works. - time.sleep(ttl * 2 / 3) + time.sleep(ttl.total_seconds() * 2 / 3) result = [cache.contains_key(k) for k in range(4)] - if time.time() - start >= ttl: + if time.time() - start >= ttl.total_seconds(): continue assert result == [True, False, True, True] - time.sleep(ttl * 2 / 3) + time.sleep(ttl.total_seconds() * 2 / 3) cache_updated.get(2) # Check that access doesn't matter for updated policy. - time.sleep(ttl * 2 / 3) + time.sleep(ttl.total_seconds() * 2 / 3) result = [cache.contains_key(k) for k in range(0, 4)] assert result == [True, False, False, False] @@ -77,11 +78,11 @@ @pytest.mark.asyncio @pytest.mark.skip_if_no_expiry_policy async def test_expiry_policy_async(async_cache): - ttl, num_retries = 0.6, 10 + ttl, num_retries = timedelta(seconds=0.6), 10 cache_eternal = async_cache.with_expire_policy(create=ExpiryPolicy.ETERNAL) - cache_created = async_cache.with_expire_policy(create=0.6) - cache_updated = async_cache.with_expire_policy(update=0.6) - cache_accessed = async_cache.with_expire_policy(access=0.6) + cache_created = async_cache.with_expire_policy(create=ttl) + cache_updated = async_cache.with_expire_policy(update=ttl) + cache_accessed = async_cache.with_expire_policy(access=ttl) for _ in range(num_retries): await async_cache.clear() @@ -95,11 +96,11 @@ cache_accessed.put(3, 3) ) - await asyncio.sleep(ttl * 2 / 3) + await asyncio.sleep(ttl.total_seconds() * 2 / 3) result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)]) - if time.time() - start >= ttl: + if time.time() - start >= ttl.total_seconds(): continue assert all(result) @@ -113,20 +114,20 @@ cache_accessed.get(3) # Check that access policy works. ) - await asyncio.sleep(ttl * 2 / 3) + await asyncio.sleep(ttl.total_seconds() * 2 / 3) result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)]) - if time.time() - start >= ttl: + if time.time() - start >= ttl.total_seconds(): continue assert result == [True, False, True, True] - await asyncio.sleep(ttl * 2 / 3) + await asyncio.sleep(ttl.total_seconds() * 2 / 3) await cache_updated.get(2) # Check that access doesn't matter for updated policy. - await asyncio.sleep(ttl * 2 / 3) + await asyncio.sleep(ttl.total_seconds() * 2 / 3) result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)]) assert result == [True, False, False, False] @@ -169,3 +170,17 @@ assert settings[PROP_EXPIRY_POLICY] == expiry_policy finally: await cache.destroy() + + +@pytest.mark.skip_if_no_expiry_policy +@pytest.mark.parametrize( + 'params', + [ + {'create': timedelta(seconds=-1), 'update': timedelta(seconds=-1), 'delete': timedelta(seconds=-1)}, + {'create': 0.6}, + {'create': -3} + ] +) +def test_expiry_policy_param_validation(params): + with pytest.raises((TypeError, ValueError)): + ExpiryPolicy(**params)
diff --git a/tests/common/test_transactions.py b/tests/common/test_transactions.py index 57874b6..e879f60 100644 --- a/tests/common/test_transactions.py +++ b/tests/common/test_transactions.py
@@ -25,6 +25,7 @@ from pyignite.datatypes.cache_config import CacheAtomicityMode from pyignite.datatypes.prop_codes import PROP_NAME, PROP_CACHE_ATOMICITY_MODE from pyignite.exceptions import CacheError +from pyignite.transaction import Transaction, AioTransaction @pytest.fixture @@ -137,7 +138,7 @@ def test_transactions_timeout(client, tx_cache): - with client.tx_start(timeout=2.0, label='tx-sync') as tx: + with client.tx_start(timeout=2000, label='tx-sync') as tx: tx_cache.put(1, 1) time.sleep(3.0) with pytest.raises(CacheError) as to_error: @@ -160,7 +161,7 @@ await tx.commit() - task = asyncio.gather(*[update(i, 2.0) for i in range(20)], return_exceptions=True) + task = asyncio.gather(*[update(i, 2000) for i in range(20)], return_exceptions=True) await asyncio.sleep(5.0) assert task.done() # Check that all transactions completed or rolled-back on timeout for i, ex in enumerate(task.result()): @@ -231,3 +232,21 @@ await asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True) assert await async_tx_cache.get_all(list(range(20))) == {i: f'test-{i}' for i in range(20) if i % 2 == 0} + + +@pytest.mark.parametrize( + "params", + [ + {'isolation': 25}, + {'concurrency': 45}, + {'timeout': 2.0}, + {'timeout': -10}, + {'label': 100500} + ] +) +def test_tx_parameter_validation(params): + with pytest.raises((TypeError, ValueError)): + Transaction(None, **params) + + with pytest.raises((TypeError, ValueError)): + AioTransaction(None, **params)