IGNITE-11303: Partition Awareness for Python Thin
diff --git a/.gitignore b/.gitignore
index d9268c3..a779771 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,7 @@
+.idea
.eggs
.pytest_cache
.tox
pyignite.egg-info
+ignite-log-*
+__pycache__
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index 230a9f1..f884bdb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -14,16 +14,20 @@
# limitations under the License.
sudo: required
-services:
- - docker
+
+addons:
+ apt:
+ packages:
+ - openjdk-8-jdk
env:
- COMPOSE_VERSION: 1.27.4
+ - IGNITE_VERSION=2.9.1 IGNITE_HOME=/opt/ignite
before_install:
- - curl -L https://github.com/docker/compose/releases/download/${COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
- - chmod +x docker-compose
- - sudo mv docker-compose /usr/local/bin
+ - curl -L https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip > ignite.zip
+ - unzip ignite.zip -d /opt
+ - mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
+ - mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
language: python
python:
diff --git a/docker-compose.yml b/docker-compose.yml
deleted file mode 100644
index 2517d25..0000000
--- a/docker-compose.yml
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-services:
- ignite:
- image: apacheignite/ignite:latest
- ports:
- - 10800:10800
- restart: always
- network_mode: host
-
- ignite-ssl:
- image: apacheignite/ignite:latest
- ports:
- - 10800:10800
- restart: always
- network_mode: host
- volumes:
- - ./tests/config:/config
- environment:
- CONFIG_URI: /config/ssl.xml
- PYTHON_TEST_CONFIG_PATH: /config
diff --git a/docs/datatypes/parsers.rst b/docs/datatypes/parsers.rst
index a717f4c..71f9aac 100644
--- a/docs/datatypes/parsers.rst
+++ b/docs/datatypes/parsers.rst
@@ -47,94 +47,94 @@
of interoperability, you may have to sneak one or the other class, along
with your data, in to some API function as a *type conversion hint*.
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|`type_code`|Apache Ignite |Python type |Parser/constructor |
-| |docs reference |or class |class |
-+===========+====================+===============================+=================================================================+
-|*Primitive data types* |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x01 |Byte_ |int |:class:`~pyignite.datatypes.primitive_objects.ByteObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x02 |Short_ |int |:class:`~pyignite.datatypes.primitive_objects.ShortObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x03 |Int_ |int |:class:`~pyignite.datatypes.primitive_objects.IntObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x04 |Long_ |int |:class:`~pyignite.datatypes.primitive_objects.LongObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x05 |Float_ |float |:class:`~pyignite.datatypes.primitive_objects.FloatObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x06 |Double_ |float |:class:`~pyignite.datatypes.primitive_objects.DoubleObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x07 |Char_ |str |:class:`~pyignite.datatypes.primitive_objects.CharObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x08 |Bool_ |bool |:class:`~pyignite.datatypes.primitive_objects.BoolObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x65 |Null_ |NoneType |:class:`~pyignite.datatypes.null_object.Null` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|*Standard objects* |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x09 |String_ |Str |:class:`~pyignite.datatypes.standard.String` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x0a |UUID_ |uuid.UUID |:class:`~pyignite.datatypes.standard.UUIDObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x21 |Timestamp_ |tuple |:class:`~pyignite.datatypes.standard.TimestampObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x0b |Date_ |datetime.datetime |:class:`~pyignite.datatypes.standard.DateObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x24 |Time_ |datetime.timedelta |:class:`~pyignite.datatypes.standard.TimeObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x1e |Decimal_ |decimal.Decimal |:class:`~pyignite.datatypes.standard.DecimalObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x1c |Enum_ |tuple |:class:`~pyignite.datatypes.standard.EnumObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x67 |`Binary enum`_ |tuple |:class:`~pyignite.datatypes.standard.BinaryEnumObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|*Arrays of primitives* |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x0c |`Byte array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.ByteArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x0d |`Short array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.ShortArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x0e |`Int array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.IntArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x0f |`Long array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.LongArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x10 |`Float array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.FloatArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x11 |`Double array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.DoubleArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x12 |`Char array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.CharArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x13 |`Bool array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.BoolArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|*Arrays of standard objects* |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x14 |`String array`_ |iterable/list |:class:`~pyignite.datatypes.standard.StringArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x15 |`UUID array`_ |iterable/list |:class:`~pyignite.datatypes.standard.UUIDArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x22 |`Timestamp array`_ |iterable/list |:class:`~pyignite.datatypes.standard.TimestampArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x16 |`Date array`_ |iterable/list |:class:`~pyignite.datatypes.standard.DateArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x23 |`Time array`_ |iterable/list |:class:`~pyignite.datatypes.standard.TimeArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x1f |`Decimal array`_ |iterable/list |:class:`~pyignite.datatypes.standard.DecimalArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|*Object collections, special types, and complex object* |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x17 |`Object array`_ |iterable/list |:class:`~pyignite.datatypes.complex.ObjectArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x18 |`Collection`_ |tuple |:class:`~pyignite.datatypes.complex.CollectionObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x19 |`Map`_ |dict, collections.OrderedDict |:class:`~pyignite.datatypes.complex.MapObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x1d |`Enum array`_ |iterable/list |:class:`~pyignite.datatypes.standard.EnumArrayObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x67 |`Complex object`_ |object |:class:`~pyignite.datatypes.complex.BinaryObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
-|0x1b |`Wrapped data`_ |tuple |:class:`~pyignite.datatypes.complex.WrappedDataObject` |
-+-----------+--------------------+-------------------------------+-----------------------------------------------------------------+
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|`type_code`|Apache Ignite |Python type |Parser/constructor |
+| |docs reference |or class |class |
++===========+====================+===============================+==================================================================+
+|*Primitive data types* |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x01 |Byte_ |int |:class:`~pyignite.datatypes.primitive_objects.ByteObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x02 |Short_ |int |:class:`~pyignite.datatypes.primitive_objects.ShortObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x03 |Int_ |int |:class:`~pyignite.datatypes.primitive_objects.IntObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x04 |Long_ |int |:class:`~pyignite.datatypes.primitive_objects.LongObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x05 |Float_ |float |:class:`~pyignite.datatypes.primitive_objects.FloatObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x06 |Double_ |float |:class:`~pyignite.datatypes.primitive_objects.DoubleObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x07 |Char_ |str |:class:`~pyignite.datatypes.primitive_objects.CharObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x08 |Bool_ |bool |:class:`~pyignite.datatypes.primitive_objects.BoolObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x65 |Null_ |NoneType |:class:`~pyignite.datatypes.null_object.Null` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|*Standard objects* |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x09 |String_ |Str |:class:`~pyignite.datatypes.standard.String` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x0a |UUID_ |uuid.UUID |:class:`~pyignite.datatypes.standard.UUIDObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x21 |Timestamp_ |tuple |:class:`~pyignite.datatypes.standard.TimestampObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x0b |Date_ |datetime.datetime |:class:`~pyignite.datatypes.standard.DateObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x24 |Time_ |datetime.timedelta |:class:`~pyignite.datatypes.standard.TimeObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x1e |Decimal_ |decimal.Decimal |:class:`~pyignite.datatypes.standard.DecimalObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x1c |Enum_ |tuple |:class:`~pyignite.datatypes.standard.EnumObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x67 |`Binary enum`_ |tuple |:class:`~pyignite.datatypes.standard.BinaryEnumObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|*Arrays of primitives* |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x0c |`Byte array`_ |iterable/bytearray |:class:`~pyignite.datatypes.primitive_arrays.ByteArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x0d |`Short array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.ShortArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x0e |`Int array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.IntArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x0f |`Long array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.LongArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x10 |`Float array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.FloatArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x11 |`Double array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.DoubleArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x12 |`Char array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.CharArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x13 |`Bool array`_ |iterable/list |:class:`~pyignite.datatypes.primitive_arrays.BoolArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|*Arrays of standard objects* |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x14 |`String array`_ |iterable/list |:class:`~pyignite.datatypes.standard.StringArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x15 |`UUID array`_ |iterable/list |:class:`~pyignite.datatypes.standard.UUIDArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x22 |`Timestamp array`_ |iterable/list |:class:`~pyignite.datatypes.standard.TimestampArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x16 |`Date array`_ |iterable/list |:class:`~pyignite.datatypes.standard.DateArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x23 |`Time array`_ |iterable/list |:class:`~pyignite.datatypes.standard.TimeArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x1f |`Decimal array`_ |iterable/list |:class:`~pyignite.datatypes.standard.DecimalArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|*Object collections, special types, and complex object* |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x17 |`Object array`_ |tuple[int, iterable/list] |:class:`~pyignite.datatypes.complex.ObjectArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x18 |`Collection`_ |tuple[int, iterable/list] |:class:`~pyignite.datatypes.complex.CollectionObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x19 |`Map`_ |tuple[int, dict/OrderedDict] |:class:`~pyignite.datatypes.complex.MapObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x1d |`Enum array`_ |iterable/list |:class:`~pyignite.datatypes.standard.EnumArrayObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x67 |`Complex object`_ |object |:class:`~pyignite.datatypes.complex.BinaryObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
+|0x1b |`Wrapped data`_ |tuple[int, bytes] |:class:`~pyignite.datatypes.complex.WrappedDataObject` |
++-----------+--------------------+-------------------------------+------------------------------------------------------------------+
.. _Byte: https://apacheignite.readme.io/docs/binary-client-protocol-data-format#section-byte
.. _Short: https://apacheignite.readme.io/docs/binary-client-protocol-data-format#section-short
diff --git a/docs/examples.rst b/docs/examples.rst
index 3d8d2d9..39deef3 100644
--- a/docs/examples.rst
+++ b/docs/examples.rst
@@ -121,6 +121,62 @@
.. _sql_examples:
+Object collections
+------------------
+
+File: `get_and_put_complex.py`_.
+
+Ignite collection types are represented in `pyignite` as two-tuples.
+First comes collection type ID or deserialization hint, which is specific for
+each of the collection type. Second comes the data value.
+
+.. literalinclude:: ../examples/get_and_put_complex.py
+ :language: python
+ :lines: 19-21
+
+Map
+===
+
+For Python prior to 3.6, it might be important to distinguish between ordered
+(`collections.OrderedDict`) and unordered (`dict`) dictionary types, so you
+could use :py:attr:`~pyignite.datatypes.complex.Map.LINKED_HASH_MAP`
+for the former and :py:attr:`~pyignite.datatypes.complex.Map.HASH_MAP`
+for the latter.
+
+Since CPython 3.6 all dictionaries became de facto ordered. You can always use
+`LINKED_HASH_MAP` as a safe default.
+
+.. literalinclude:: ../examples/get_and_put_complex.py
+ :language: python
+ :lines: 29-41
+
+Collection
+==========
+
+See :class:`~pyignite.datatypes.complex.CollectionObject` and Ignite
+documentation on `Collection`_ type for the description of various Java
+collection types. Note that not all of them have a direct Python
+representative. For example, Python do not have ordered sets (it is indeed
+recommended to use `OrderedDict`'s keys and disregard its values).
+
+As for the `pyignite`, the rules are simple: pass any iterable as a data,
+and you always get `list` back.
+
+.. literalinclude:: ../examples/get_and_put_complex.py
+ :language: python
+ :lines: 43-57
+
+Object array
+============
+
+:class:`~pyignite.datatypes.complex.ObjectArrayObject` has a very limited
+functionality in `pyignite`, since no type checks can be enforced on its
+contents. But it still can be used for interoperability with Java.
+
+.. literalinclude:: ../examples/get_and_put_complex.py
+ :language: python
+ :lines: 59-68
+
SQL
---
File: `sql.py`_.
@@ -241,7 +297,23 @@
`attrs`_ package internally for creating nice `__init__()` and `__repr__()`
methods.
-You can reuse the autogenerated class for subsequent writes:
+In this case the autogenerated dataclass's name `Person` is exactly matches
+the type name of the Complex object it represents (the content of the
+:py:attr:`~pyignite.datatypes.base.IgniteDataTypeProps.type_name` property).
+But when Complex object's class name contains characters, that can not be used
+in a Python identifier, for example:
+
+- `.`, when fully qualified Java class names are used,
+- `$`, a common case for Scala classes,
+- `+`, internal class name separator in C#,
+
+then `pyignite` can not maintain this match. In such cases `pyignite` tries
+to sanitize a type name to derive a “good” dataclass name from it.
+
+If your code needs consistent naming between the server and the client, make
+sure that your Ignite cluster is configured to use `simple class names`_.
+
+Anyway, you can reuse the autogenerated dataclass for subsequent writes:
.. literalinclude:: ../examples/binary_basics.py
:language: python
@@ -445,27 +517,24 @@
(`OSError` or `SocketError`), but keeps its constructor's parameters intact
and tries to reconnect transparently.
-When there's no way for :class:`~pyignite.client.Client` to reconnect, it
-raises a special :class:`~pyignite.exceptions.ReconnectError` exception.
+When :class:`~pyignite.client.Client` detects that all nodes in the list are
+failed without the possibility of restoring connection, it raises a special
+:class:`~pyignite.exceptions.ReconnectError` exception.
-The following example features a simple node list traversal failover mechanism.
Gather 3 Ignite nodes on `localhost` into one cluster and run:
.. literalinclude:: ../examples/failover.py
:language: python
- :lines: 16-49
+ :lines: 16-51
Then try shutting down and restarting nodes, and see what happens.
.. literalinclude:: ../examples/failover.py
:language: python
- :lines: 51-61
+ :lines: 53-65
Client reconnection do not require an explicit user action, like calling
-a special method or resetting a parameter. Note, however, that reconnection
-is lazy: it happens only if (and when) it is needed. In this example,
-the automatic reconnection happens, when the script checks upon the last
-saved value:
+a special method or resetting a parameter.
.. literalinclude:: ../examples/failover.py
:language: python
@@ -475,29 +544,6 @@
`pyignite` user to just try the supposed data operations and catch
the resulting exception.
-:py:meth:`~pyignite.connection.Connection.connect` method accepts any
-iterable, not just list. It means that you can implement any reconnection
-policy (round-robin, nodes prioritization, pause on reconnect or graceful
-backoff) with a generator.
-
-`pyignite` comes with a sample
-:class:`~pyignite.connection.generators.RoundRobin` generator. In the above
-example try to replace
-
-.. literalinclude:: ../examples/failover.py
- :language: python
- :lines: 29
-
-with
-
-.. code-block:: python3
-
- client.connect(RoundRobin(nodes, max_reconnects=20))
-
-The client will try to reconnect to node 1 after node 3 is crashed, then to
-node 2, et c. At least one node should be active for the
-:class:`~pyignite.connection.generators.RoundRobin` to work properly.
-
SSL/TLS
-------
@@ -604,21 +650,24 @@
# pyignite.exceptions.HandshakeError: Handshake error: Unauthenticated sessions are prohibited.
-.. _get_and_put.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/get_and_put.py
-.. _type_hints.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/type_hints.py
-.. _failover.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/failover.py
-.. _scans.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/scans.py
-.. _sql.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/sql.py
-.. _binary_basics.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/binary_basics.py
-.. _read_binary.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/read_binary.py
-.. _create_binary.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/create_binary.py
-.. _migrate_binary.py: https://github.com/apache/ignite/tree/master/modules/platforms/python/examples/migrate_binary.py
-.. _Getting Started: https://apacheignite-sql.readme.io/docs/getting-started
-.. _Ignite GitHub repository: https://github.com/apache/ignite/blob/master/examples/sql/world.sql
-.. _Complex object: https://apacheignite.readme.io/docs/binary-client-protocol-data-format#section-complex-object
+.. _get_and_put.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/get_and_put.py
+.. _type_hints.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/type_hints.py
+.. _failover.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/failover.py
+.. _scans.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/scans.py
+.. _sql.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/sql.py
+.. _binary_basics.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/binary_basics.py
+.. _read_binary.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/read_binary.py
+.. _create_binary.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/create_binary.py
+.. _migrate_binary.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/migrate_binary.py
+.. _Getting Started: https://ignite.apache.org/docs/latest/thin-clients/python-thin-client
+.. _PyIgnite GitHub repository: https://github.com/apache/ignite-python-thin-client/blob/master
+.. _Complex object: https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#complex-object
.. _Java keytool: https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html
-.. _Securing Connection Between Nodes: https://apacheignite.readme.io/docs/ssltls#section-securing-connection-between-nodes
+.. _Securing Connection Between Nodes: https://ignite.apache.org/docs/latest/security/ssl-tls#ssltls-for-nodes
.. _ClientConnectorConfiguration: https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/configuration/ClientConnectorConfiguration.html
.. _openssl: https://www.openssl.org/docs/manmaster/man1/openssl.html
-.. _Authentication: https://apacheignite.readme.io/docs/advanced-security#section-authentication
+.. _Authentication: https://ignite.apache.org/docs/latest/security/authentication
.. _attrs: https://pypi.org/project/attrs/
+.. _get_and_put_complex.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/get_and_put.py
+.. _Collection: https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#collection
+.. _simple class names: https://ignite.apache.org/docs/latest/data-modeling/binary-marshaller#binary-name-mapper-and-binary-id-mapper
diff --git a/docs/readme.rst b/docs/readme.rst
index f91274e..81298ae 100644
--- a/docs/readme.rst
+++ b/docs/readme.rst
@@ -141,6 +141,10 @@
``--timeout`` − timeout (in seconds) for each socket operation, including
`connect`. Accepts integer or float value. Default is None (blocking mode),
+``--partition-aware`` − experimental; off by default; turns on the partition
+awareness: a way for the thin client to calculate a data placement for the
+given key.
+
``--username`` and ``--password`` − credentials to authenticate to Ignite
cluster. Used in conjunction with `authenticationEnabled` property in cluster
configuration.
diff --git a/examples/failover.py b/examples/failover.py
index 3a5fcce..7911ce0 100644
--- a/examples/failover.py
+++ b/examples/failover.py
@@ -27,35 +27,39 @@
client = Client(timeout=4.0)
client.connect(nodes)
-print('Connected to {}'.format(client))
+print('Connected')
my_cache = client.get_or_create_cache({
PROP_NAME: 'my_cache',
- PROP_CACHE_MODE: CacheMode.REPLICATED,
+ PROP_CACHE_MODE: CacheMode.PARTITIONED,
+ PROP_BACKUPS_NUMBER: 2,
})
my_cache.put('test_key', 0)
+test_value = 0
# abstract main loop
while True:
try:
# do the work
- test_value = my_cache.get('test_key')
+ test_value = my_cache.get('test_key') or 0
my_cache.put('test_key', test_value + 1)
except (OSError, SocketError) as e:
# recover from error (repeat last command, check data
# consistency or just continue − depends on the task)
print('Error: {}'.format(e))
- print('Last value: {}'.format(my_cache.get('test_key')))
- print('Reconnected to {}'.format(client))
+ print('Last value: {}'.format(test_value))
+ print('Reconnecting')
-# Connected to 127.0.0.1:10800
-# Error: [Errno 104] Connection reset by peer
-# Last value: 6999
-# Reconnected to 127.0.0.1:10801
-# Error: Socket connection broken.
-# Last value: 12302
-# Reconnected to 127.0.0.1:10802
-# Error: [Errno 111] Client refused
+# Connected
+# Error: Connection broken.
+# Last value: 2650
+# Reconnecting
+# Error: Connection broken.
+# Last value: 10204
+# Reconnecting
+# Error: Connection broken.
+# Last value: 18932
+# Reconnecting
# Traceback (most recent call last):
-# ...
-# pyignite.exceptions.ReconnectError: Can not reconnect: out of nodes
+# ...
+# pyignite.exceptions.ReconnectError: Can not reconnect: out of nodes.
diff --git a/examples/get_and_put_complex.py b/examples/get_and_put_complex.py
new file mode 100644
index 0000000..2444612
--- /dev/null
+++ b/examples/get_and_put_complex.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+
+from pyignite import Client
+from pyignite.datatypes import (
+ CollectionObject, MapObject, ObjectArrayObject,
+)
+
+
+client = Client()
+client.connect('127.0.0.1', 10800)
+
+my_cache = client.get_or_create_cache('my cache')
+
+value = OrderedDict([(1, 'test'), ('key', 2.0)])
+
+# saving ordered dictionary
+type_id = MapObject.LINKED_HASH_MAP
+my_cache.put('my dict', (type_id, value))
+result = my_cache.get('my dict')
+print(result) # (2, OrderedDict([(1, 'test'), ('key', 2.0)]))
+
+# saving unordered dictionary
+type_id = MapObject.HASH_MAP
+my_cache.put('my dict', (type_id, value))
+result = my_cache.get('my dict')
+print(result) # (1, {'key': 2.0, 1: 'test'})
+
+type_id = CollectionObject.LINKED_LIST
+value = [1, '2', 3.0]
+
+my_cache.put('my list', (type_id, value))
+
+result = my_cache.get('my list')
+print(result) # (2, [1, '2', 3.0])
+
+type_id = CollectionObject.HASH_SET
+value = [4, 4, 'test', 5.6]
+
+my_cache.put('my set', (type_id, value))
+
+result = my_cache.get('my set')
+print(result) # (3, [5.6, 4, 'test'])
+
+type_id = ObjectArrayObject.OBJECT
+value = [7, '8', 9.0]
+
+my_cache.put(
+ 'my array of objects',
+ (type_id, value),
+ value_hint=ObjectArrayObject # this hint is mandatory!
+)
+result = my_cache.get('my array of objects')
+print(result) # (-1, [7, '8', 9.0])
diff --git a/pyignite/api/__init__.py b/pyignite/api/__init__.py
index 01437f0..7dbef0a 100644
--- a/pyignite/api/__init__.py
+++ b/pyignite/api/__init__.py
@@ -23,6 +23,9 @@
stable end user API see :mod:`pyignite.client` module.
"""
+from .affinity import (
+ cache_get_node_partitions,
+)
from .cache_config import (
cache_create,
cache_get_names,
@@ -54,6 +57,7 @@
cache_remove_keys,
cache_remove_all,
cache_get_size,
+ cache_local_peek,
)
from .sql import (
scan,
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
new file mode 100644
index 0000000..d28cfb8
--- /dev/null
+++ b/pyignite/api/affinity.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Iterable, Union
+
+from pyignite.datatypes import Bool, Int, Long, UUIDObject
+from pyignite.datatypes.internal import StructArray, Conditional, Struct
+from pyignite.queries import Query
+from pyignite.queries.op_codes import OP_CACHE_PARTITIONS
+from pyignite.utils import is_iterable
+from .result import APIResult
+
+
+cache_ids = StructArray([
+ ('cache_id', Int),
+])
+
+cache_config = StructArray([
+ ('key_type_id', Int),
+ ('affinity_key_field_id', Int),
+])
+
+node_partitions = StructArray([
+ ('partition_id', Int),
+])
+
+node_mapping = StructArray([
+ ('node_uuid', UUIDObject),
+ ('node_partitions', node_partitions)
+])
+
+cache_mapping = StructArray([
+ ('cache_id', Int),
+ ('cache_config', cache_config),
+])
+
+empty_cache_mapping = StructArray([
+ ('cache_id', Int)
+])
+
+empty_node_mapping = Struct([])
+
+partition_mapping = StructArray([
+ ('is_applicable', Bool),
+
+ ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
+ lambda ctx: ctx['is_applicable'] is True,
+ cache_mapping, empty_cache_mapping)),
+
+ ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
+ lambda ctx: ctx['is_applicable'] is True,
+ node_mapping, empty_node_mapping)),
+])
+
+
+def cache_get_node_partitions(
+ conn: 'Connection', caches: Union[int, Iterable[int]],
+ query_id: int = None,
+) -> APIResult:
+ """
+ Gets partition mapping for an Ignite cache or a number of caches. See
+ “IEP-23: Best Effort Affinity for thin clients”.
+
+ :param conn: connection to Ignite server,
+ :param caches: cache ID(s) the mapping is provided for,
+ :param query_id: (optional) a value generated by client and returned as-is
+ in response.query_id. When the parameter is omitted, a random value
+ is generated,
+ :return: API result data object.
+ """
+ query_struct = Query(
+ OP_CACHE_PARTITIONS,
+ [
+ ('cache_ids', cache_ids),
+ ],
+ query_id=query_id
+ )
+ if not is_iterable(caches):
+ caches = [caches]
+
+ result = query_struct.perform(
+ conn,
+ query_params={
+ 'cache_ids': [{'cache_id': cache} for cache in caches],
+ },
+ response_config=[
+ ('version_major', Long),
+ ('version_minor', Int),
+ ('partition_mapping', partition_mapping),
+ ],
+ )
+ if result.status == 0:
+ # tidying up the result
+ value = {
+ 'version': (
+ result.value['version_major'],
+ result.value['version_minor']
+ ),
+ 'partition_mapping': [],
+ }
+ for i, partition_map in enumerate(result.value['partition_mapping']):
+ cache_id = partition_map['cache_mapping'][0]['cache_id']
+ value['partition_mapping'].insert(
+ i,
+ {
+ 'cache_id': cache_id,
+ 'is_applicable': partition_map['is_applicable'],
+ }
+ )
+ if partition_map['is_applicable']:
+ value['partition_mapping'][i]['cache_config'] = {
+ a['key_type_id']: a['affinity_key_field_id']
+ for a in partition_map['cache_mapping'][0]['cache_config']
+ }
+ value['partition_mapping'][i]['node_mapping'] = {
+ p['node_uuid']: [
+ x['partition_id'] for x in p['node_partitions']
+ ]
+ for p in partition_map['node_mapping']
+ }
+ result.value = value
+
+ return result
diff --git a/pyignite/api/binary.py b/pyignite/api/binary.py
index f0a5831..97f9fbd 100644
--- a/pyignite/api/binary.py
+++ b/pyignite/api/binary.py
@@ -20,7 +20,7 @@
body_struct, enum_struct, schema_struct, binary_fields_struct,
)
from pyignite.datatypes import String, Int, Bool
-from pyignite.queries import Query, Response
+from pyignite.queries import Query, get_response_class
from pyignite.queries.op_codes import *
from pyignite.utils import int_overflow, entity_id
from .result import APIResult
@@ -53,7 +53,7 @@
})
connection.send(send_buffer)
- response_head_struct = Response([
+ response_head_struct = get_response_class(connection)([
('type_exists', Bool),
])
response_head_type, recv_buffer = response_head_struct.parse(connection)
diff --git a/pyignite/api/key_value.py b/pyignite/api/key_value.py
index 56f5378..25601e9 100644
--- a/pyignite/api/key_value.py
+++ b/pyignite/api/key_value.py
@@ -13,20 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Iterable, Union
+from typing import Any, Iterable, Optional, Union
from pyignite.queries.op_codes import *
from pyignite.datatypes import (
Map, Bool, Byte, Int, Long, AnyDataArray, AnyDataObject,
)
from pyignite.datatypes.key_value import PeekModes
-from pyignite.queries import Query, Response
+from pyignite.queries import Query
from pyignite.utils import cache_id
def cache_put(
- connection: 'Connection', cache: Union[str, int], key, value,
- key_hint=None, value_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache (overwriting existing value if any).
@@ -67,8 +68,9 @@
def cache_get(
- connection: 'Connection', cache: Union[str, int], key,
- key_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any,
+ key_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Retrieves a value from cache by key.
@@ -115,7 +117,7 @@
def cache_get_all(
connection: 'Connection', cache: Union[str, int], keys: Iterable,
- binary=False, query_id=None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Retrieves multiple key-value pairs from cache.
@@ -160,7 +162,7 @@
def cache_put_all(
connection: 'Connection', cache: Union[str, int], pairs: dict,
- binary=False, query_id=None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts multiple key-value pairs to cache (overwriting existing associations
@@ -200,8 +202,9 @@
def cache_contains_key(
- connection: 'Connection', cache: Union[str, int], key,
- key_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any,
+ key_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Returns a value indicating whether given key is present in cache.
@@ -248,7 +251,7 @@
def cache_contains_keys(
connection: 'Connection', cache: Union[str, int], keys: Iterable,
- binary=False, query_id=None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Returns a value indicating whether all given keys are present in cache.
@@ -292,8 +295,9 @@
def cache_get_and_put(
- connection: 'Connection', cache: Union[str, int], key, value,
- key_hint=None, value_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache, and returns the previous value
@@ -345,8 +349,9 @@
def cache_get_and_replace(
- connection: 'Connection', cache: Union[str, int], key, value,
- key_hint=None, value_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache, returning previous value
@@ -397,8 +402,9 @@
def cache_get_and_remove(
- connection: 'Connection', cache: Union[str, int], key,
- key_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any,
+ key_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Removes the cache entry with specified key, returning the value.
@@ -442,8 +448,9 @@
def cache_put_if_absent(
- connection: 'Connection', cache: Union[str, int], key, value,
- key_hint=None, value_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key
@@ -494,8 +501,9 @@
def cache_get_and_put_if_absent(
- connection: 'Connection', cache: Union[str, int], key, value,
- key_hint=None, value_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key does not
@@ -546,8 +554,9 @@
def cache_replace(
- connection: 'Connection', cache: Union[str, int], key, value,
- key_hint=None, value_hint=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key already exist.
@@ -598,9 +607,10 @@
def cache_replace_if_equals(
- connection: 'Connection', cache: Union[str, int], key, sample, value,
- key_hint=None, sample_hint=None, value_hint=None,
- binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int],
+ key: Any, sample: Any, value: Any, key_hint: 'IgniteDatatType' = None,
+ sample_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key already exists
@@ -657,8 +667,8 @@
def cache_clear(
- connection: 'Connection', cache: Union[str, int], binary=False,
- query_id=None,
+ connection: 'Connection', cache: Union[str, int],
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Clears the cache without notifying listeners or cache writers.
@@ -692,8 +702,9 @@
def cache_clear_key(
- connection: 'Connection', cache: Union[str, int], key,
- key_hint: object=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any,
+ key_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Clears the cache key without notifying listeners or cache writers.
@@ -733,7 +744,7 @@
def cache_clear_keys(
connection: 'Connection', cache: Union[str, int], keys: list,
- binary=False, query_id=None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Clears the cache keys without notifying listeners or cache writers.
@@ -770,8 +781,9 @@
def cache_remove_key(
- connection: 'Connection', cache: Union[str, int], key,
- key_hint: object=None, binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any,
+ key_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Clears the cache key without notifying listeners or cache writers.
@@ -817,9 +829,9 @@
def cache_remove_if_equals(
- connection: 'Connection', cache: Union[str, int], key, sample,
- key_hint=None, sample_hint=None,
- binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], key: Any, sample: Any,
+ key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Removes an entry with a given key if provided value is equal to
@@ -872,7 +884,7 @@
def cache_remove_keys(
connection: 'Connection', cache: Union[str, int], keys: Iterable,
- binary=False, query_id=None,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Removes entries with given keys, notifying listeners and cache writers.
@@ -909,8 +921,8 @@
def cache_remove_all(
- connection: 'Connection', cache: Union[str, int], binary=False,
- query_id=None,
+ connection: 'Connection', cache: Union[str, int],
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Removes all entries from cache, notifying listeners and cache writers.
@@ -944,8 +956,8 @@
def cache_get_size(
- connection: 'Connection', cache: Union[str, int], peek_modes=0,
- binary=False, query_id=None,
+ connection: 'Connection', cache: Union[str, int], peek_modes: int = 0,
+ binary: bool = False, query_id: Optional[int] = None,
) -> 'APIResult':
"""
Gets the number of entries in cache.
@@ -965,10 +977,7 @@
otherwise.
"""
if not isinstance(peek_modes, (list, tuple)):
- if peek_modes == 0:
- peek_modes = []
- else:
- peek_modes = [peek_modes]
+ peek_modes = [peek_modes] if peek_modes else []
query_struct = Query(
OP_CACHE_GET_SIZE,
@@ -993,3 +1002,61 @@
if result.status == 0:
result.value = result.value['count']
return result
+
+
+def cache_local_peek(
+ conn: 'Connection', cache: Union[str, int],
+ key: Any, key_hint: 'IgniteDataType' = None, peek_modes: int = 0,
+ binary: bool = False, query_id: Optional[int] = None,
+) -> 'APIResult':
+ """
+ Peeks at in-memory cached value using default optional peek mode.
+
+ This method will not load value from any persistent store or from a remote
+ node.
+
+ :param conn: connection: connection to Ignite server,
+ :param cache: name or ID of the cache,
+ :param key: entry key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param peek_modes: (optional) limit count to near cache partition
+ (PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
+ (PeekModes.BACKUP). Defaults to all cache partitions (PeekModes.ALL),
+ :param binary: (optional) pass True to keep the value in binary form.
+ False by default,
+ :param query_id: (optional) a value generated by client and returned as-is
+ in response.query_id. When the parameter is omitted, a random value
+ is generated,
+ :return: API result data object. Contains zero status and a peeked value
+ (null if not found).
+ """
+ if not isinstance(peek_modes, (list, tuple)):
+ peek_modes = [peek_modes] if peek_modes else []
+
+ query_struct = Query(
+ OP_CACHE_LOCAL_PEEK,
+ [
+ ('hash_code', Int),
+ ('flag', Byte),
+ ('key', key_hint or AnyDataObject),
+ ('peek_modes', PeekModes),
+ ],
+ query_id=query_id,
+ )
+ result = query_struct.perform(
+ conn,
+ query_params={
+ 'hash_code': cache_id(cache),
+ 'flag': 1 if binary else 0,
+ 'key': key,
+ 'peek_modes': peek_modes,
+ },
+ response_config=[
+ ('value', AnyDataObject),
+ ],
+ )
+ if result.status != 0:
+ return result
+ result.value = result.value['value']
+ return result
diff --git a/pyignite/api/result.py b/pyignite/api/result.py
index 864ef61..f60a437 100644
--- a/pyignite/api/result.py
+++ b/pyignite/api/result.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from pyignite.queries.op_codes import OP_SUCCESS
from pyignite.datatypes import String
@@ -32,7 +33,7 @@
value = None
def __init__(self, response: 'Response'):
- self.status = response.status_code
+ self.status = getattr(response, 'status_code', OP_SUCCESS)
self.query_id = response.query_id
if hasattr(response, 'error_message'):
self.message = String.to_python(response.error_message)
diff --git a/pyignite/api/sql.py b/pyignite/api/sql.py
index 1a18496..ebb3e30 100644
--- a/pyignite/api/sql.py
+++ b/pyignite/api/sql.py
@@ -20,25 +20,27 @@
from typing import Union
+from pyignite.constants import *
from pyignite.datatypes import (
AnyDataArray, AnyDataObject, Bool, Byte, Int, Long, Map, Null, String,
StructArray,
)
from pyignite.datatypes.sql import StatementType
-from pyignite.queries import Query, Response, SQLResponse
+from pyignite.queries import Query
from pyignite.queries.op_codes import *
from pyignite.utils import cache_id
from .result import APIResult
def scan(
- connection: 'Connection', cache: Union[str, int], page_size: int,
- partitions: int=-1, local: bool=False, binary: bool=False, query_id=None,
+ conn: 'Connection', cache: Union[str, int], page_size: int,
+ partitions: int = -1, local: bool = False, binary: bool = False,
+ query_id: int = None,
) -> APIResult:
"""
Performs scan query.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cache: name or ID of the cache,
:param page_size: cursor page size,
:param partitions: (optional) number of partitions to query
@@ -75,7 +77,7 @@
query_id=query_id,
)
result = query_struct.perform(
- connection,
+ conn,
query_params={
'hash_code': cache_id(cache),
'flag': 1 if binary else 0,
@@ -96,13 +98,13 @@
def scan_cursor_get_page(
- connection: 'Connection', cursor: int, query_id=None,
+ conn: 'Connection', cursor: int, query_id: int = None,
) -> APIResult:
"""
Fetches the next scan query cursor page by cursor ID that is obtained
from `scan` function.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cursor: cursor ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
@@ -126,7 +128,7 @@
query_id=query_id,
)
result = query_struct.perform(
- connection,
+ conn,
query_params={
'cursor': cursor,
},
@@ -141,16 +143,17 @@
def sql(
- connection: 'Connection', cache: Union[str, int],
+ conn: 'Connection', cache: Union[str, int],
table_name: str, query_str: str, page_size: int, query_args=None,
- distributed_joins: bool=False, replicated_only: bool=False,
- local: bool=False, timeout: int=0, binary: bool=False, query_id=None
+ distributed_joins: bool = False, replicated_only: bool = False,
+ local: bool = False, timeout: int = 0, binary: bool = False,
+ query_id: int = None
) -> APIResult:
"""
Executes an SQL query over data stored in the cluster. The query returns
the whole record (key and value).
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cache: name or ID of the cache,
:param table_name: name of a type or SQL table,
:param query_str: SQL query string,
@@ -200,7 +203,7 @@
query_id=query_id,
)
result = query_struct.perform(
- connection,
+ conn,
query_params={
'hash_code': cache_id(cache),
'flag': 1 if binary else 0,
@@ -225,12 +228,12 @@
def sql_cursor_get_page(
- connection: 'Connection', cursor: int, query_id=None,
+ conn: 'Connection', cursor: int, query_id: int = None,
) -> APIResult:
"""
Retrieves the next SQL query cursor page by cursor ID from `sql`.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cursor: cursor ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
@@ -254,7 +257,7 @@
query_id=query_id,
)
result = query_struct.perform(
- connection,
+ conn,
query_params={
'cursor': cursor,
},
@@ -269,18 +272,18 @@
def sql_fields(
- connection: 'Connection', cache: Union[str, int],
- query_str: str, page_size: int, query_args=None, schema: str=None,
- statement_type: int=StatementType.ANY, distributed_joins: bool=False,
- local: bool=False, replicated_only: bool=False,
- enforce_join_order: bool=False, collocated: bool=False, lazy: bool=False,
- include_field_names: bool=False, max_rows: int=-1, timeout: int=0,
- binary: bool=False, query_id=None
+ conn: 'Connection', cache: Union[str, int],
+ query_str: str, page_size: int, query_args=None, schema: str = None,
+ statement_type: int = StatementType.ANY, distributed_joins: bool = False,
+ local: bool = False, replicated_only: bool = False,
+ enforce_join_order: bool = False, collocated: bool = False,
+ lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
+ timeout: int = 0, binary: bool = False, query_id: int = None
) -> APIResult:
"""
Performs SQL fields query.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cache: name or ID of the cache,
:param query_str: SQL query string,
:param page_size: cursor page size,
@@ -351,48 +354,39 @@
query_id=query_id,
)
- _, send_buffer = query_struct.from_python({
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
- 'schema': schema,
- 'page_size': page_size,
- 'max_rows': max_rows,
- 'query_str': query_str,
- 'query_args': query_args,
- 'statement_type': statement_type,
- 'distributed_joins': distributed_joins,
- 'local': local,
- 'replicated_only': replicated_only,
- 'enforce_join_order': enforce_join_order,
- 'collocated': collocated,
- 'lazy': lazy,
- 'timeout': timeout,
- 'include_field_names': include_field_names,
- })
-
- connection.send(send_buffer)
-
- response_struct = SQLResponse(
+ return query_struct.perform(
+ conn,
+ query_params={
+ 'hash_code': cache_id(cache),
+ 'flag': 1 if binary else 0,
+ 'schema': schema,
+ 'page_size': page_size,
+ 'max_rows': max_rows,
+ 'query_str': query_str,
+ 'query_args': query_args,
+ 'statement_type': statement_type,
+ 'distributed_joins': distributed_joins,
+ 'local': local,
+ 'replicated_only': replicated_only,
+ 'enforce_join_order': enforce_join_order,
+ 'collocated': collocated,
+ 'lazy': lazy,
+ 'timeout': timeout,
+ 'include_field_names': include_field_names,
+ },
+ sql=True,
include_field_names=include_field_names,
has_cursor=True,
)
- response_class, recv_buffer = response_struct.parse(connection)
- response = response_class.from_buffer_copy(recv_buffer)
-
- result = APIResult(response)
- if result.status != 0:
- return result
- result.value = response_struct.to_python(response)
- return result
def sql_fields_cursor_get_page(
- connection: 'Connection', cursor: int, field_count: int, query_id=None,
+ conn: 'Connection', cursor: int, field_count: int, query_id: int = None,
) -> APIResult:
"""
Retrieves the next query result page by cursor ID from `sql_fields`.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cursor: cursor ID,
:param field_count: a number of fields in a row,
:param query_id: (optional) a value generated by client and returned as-is
@@ -416,26 +410,20 @@
],
query_id=query_id,
)
-
- _, send_buffer = query_struct.from_python({
- 'cursor': cursor,
- })
-
- connection.send(send_buffer)
-
- response_struct = Response([
- ('data', StructArray([
- ('field_{}'.format(i), AnyDataObject) for i in range(field_count)
- ])),
- ('more', Bool),
- ])
- response_class, recv_buffer = response_struct.parse(connection)
- response = response_class.from_buffer_copy(recv_buffer)
-
- result = APIResult(response)
+ result = query_struct.perform(
+ conn,
+ query_params={
+ 'cursor': cursor,
+ },
+ response_config=[
+ ('data', StructArray([(f'field_{i}', AnyDataObject) for i in range(field_count)])),
+ ('more', Bool),
+ ]
+ )
if result.status != 0:
return result
- value = response_struct.to_python(response)
+
+ value = result.value
result.value = {
'data': [],
'more': value['more']
@@ -446,12 +434,12 @@
def resource_close(
- connection: 'Connection', cursor: int, query_id=None
+ conn: 'Connection', cursor: int, query_id: int = None
) -> APIResult:
"""
Closes a resource, such as query cursor.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param cursor: cursor ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
@@ -468,7 +456,7 @@
query_id=query_id,
)
return query_struct.perform(
- connection,
+ conn,
query_params={
'cursor': cursor,
},
diff --git a/pyignite/binary.py b/pyignite/binary.py
index e726730..99f2f02 100644
--- a/pyignite/binary.py
+++ b/pyignite/binary.py
@@ -26,13 +26,16 @@
"""
from collections import OrderedDict
+import ctypes
from typing import Any
import attr
+from pyignite.constants import *
from .datatypes import *
+from .datatypes.base import IgniteDataTypeProps
from .exceptions import ParseError
-from .utils import entity_id, schema_id
+from .utils import entity_id, hashcode, schema_id
ALLOWED_FIELD_TYPES = [
@@ -48,22 +51,12 @@
]
-class GenericObjectPropsMixin:
+class GenericObjectProps(IgniteDataTypeProps):
"""
This class is mixed both to metaclass and to resulting class to make class
properties universally available. You should not subclass it directly.
"""
@property
- def type_name(self) -> str:
- """ Binary object type name. """
- return self._type_name
-
- @property
- def type_id(self) -> int:
- """ Binary object type ID. """
- return entity_id(self._type_name)
-
- @property
def schema(self) -> OrderedDict:
""" Binary object schema. """
return self._schema.copy()
@@ -76,20 +69,21 @@
def __new__(cls, *args, **kwargs) -> Any:
# allow all items in Binary Object schema to be populated as optional
# arguments to `__init__()` with sensible defaults.
- if cls is not GenericObjectMeta:
- attributes = {
- k: attr.ib(
- type=getattr(v, 'pythonic', type(None)),
- default=getattr(v, 'default', None),
- ) for k, v in cls.schema.items()
- }
- attributes.update({'version': attr.ib(type=int, default=1)})
- cls = attr.s(cls, these=attributes)
+ attributes = {}
+ for k, v in cls.schema.items():
+ attributes[k] = attr.ib(type=getattr(v, 'pythonic', type(None)), default=getattr(v, 'default', None))
+
+ attributes.update({'version': attr.ib(type=int, default=1)})
+ cls = attr.s(cls, these=attributes)
# skip parameters
return super().__new__(cls)
-class GenericObjectMeta(type, GenericObjectPropsMixin):
+class GenericObjectPropsMeta(type, GenericObjectProps):
+ pass
+
+
+class GenericObjectMeta(GenericObjectPropsMeta):
"""
Complex (or Binary) Object metaclass. It is aimed to help user create
classes, which objects could serve as a pythonic representation of the
@@ -103,10 +97,95 @@
mcs: Any, name: str, base_classes: tuple, namespace: dict, **kwargs
) -> Any:
""" Sort out class creation arguments. """
- return super().__new__(
- mcs, name, (GenericObjectPropsMixin, )+base_classes, namespace
+
+ result = super().__new__(
+ mcs, name, (GenericObjectProps, )+base_classes, namespace
)
+ def _build(self, client: 'Client' = None) -> int:
+ """
+ Method for building binary representation of the Generic object
+ and calculating a hashcode from it.
+
+ :param self: Generic object instance,
+ :param client: (optional) connection to Ignite cluster,
+ """
+ if client is None:
+ compact_footer = True
+ else:
+ compact_footer = client.compact_footer
+
+ # prepare header
+ header_class = BinaryObject.build_header()
+ header = header_class()
+ header.type_code = int.from_bytes(
+ BinaryObject.type_code,
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ header.flags = BinaryObject.USER_TYPE | BinaryObject.HAS_SCHEMA
+ if compact_footer:
+ header.flags |= BinaryObject.COMPACT_FOOTER
+ header.version = self.version
+ header.type_id = self.type_id
+ header.schema_id = self.schema_id
+
+ # create fields and calculate offsets
+ offsets = [ctypes.sizeof(header_class)]
+ field_buffer = bytearray()
+ schema_items = list(self.schema.items())
+ for field_name, field_type in schema_items:
+ partial_buffer = field_type.from_python(
+ getattr(
+ self, field_name, getattr(field_type, 'default', None)
+ )
+ )
+ offsets.append(max(offsets) + len(partial_buffer))
+ field_buffer += partial_buffer
+
+ offsets = offsets[:-1]
+
+ # create footer
+ if max(offsets, default=0) < 255:
+ header.flags |= BinaryObject.OFFSET_ONE_BYTE
+ elif max(offsets) < 65535:
+ header.flags |= BinaryObject.OFFSET_TWO_BYTES
+ schema_class = BinaryObject.schema_type(header.flags) * len(offsets)
+ schema = schema_class()
+ if compact_footer:
+ for i, offset in enumerate(offsets):
+ schema[i] = offset
+ else:
+ for i, offset in enumerate(offsets):
+ schema[i].field_id = entity_id(schema_items[i][0])
+ schema[i].offset = offset
+
+ # calculate size and hash code
+ header.schema_offset = (
+ ctypes.sizeof(header_class)
+ + len(field_buffer)
+ )
+ header.length = header.schema_offset + ctypes.sizeof(schema_class)
+ header.hash_code = hashcode(field_buffer + bytes(schema))
+
+ # reuse the results
+ self._buffer = bytes(header) + field_buffer + bytes(schema)
+ self._hashcode = header.hash_code
+
+ def _setattr(self, attr_name: str, attr_value: Any):
+ # reset binary representation, if any field is changed
+ if attr_name in self._schema.keys():
+ self._buffer = None
+ self._hashcode = None
+
+ # `super()` is really need these parameters
+ super(result, self).__setattr__(attr_name, attr_value)
+
+ setattr(result, _build.__name__, _build)
+ setattr(result, '__setattr__', _setattr)
+ setattr(result, '_buffer', None)
+ setattr(result, '_hashcode', None)
+ return result
+
@staticmethod
def _validate_schema(schema: dict):
for field_type in schema.values():
@@ -117,7 +196,7 @@
def __init__(
cls, name: str, base_classes: tuple, namespace: dict,
- type_name: str=None, schema: OrderedDict=None, **kwargs
+ type_name: str = None, schema: OrderedDict = None, **kwargs
):
"""
Initializes binary object class.
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 6cd7377..64093e8 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -13,13 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Any, Iterable, Optional, Union
+import time
+from typing import Any, Dict, Iterable, Optional, Tuple, Union
+from .constants import *
+from .binary import GenericObjectMeta
from .datatypes import prop_codes
+from .datatypes.internal import AnyDataObject
from .exceptions import (
CacheCreationError, CacheError, ParameterError, SQLError,
+ connection_errors,
)
-from .utils import cache_id, is_wrapped, status_to_exception, unwrap_binary
+from .utils import (
+ cache_id, get_field_by_id, is_wrapped,
+ status_to_exception, unsigned, unwrap_binary,
+)
from .api.cache_config import (
cache_create, cache_create_with_config,
cache_get_or_create, cache_get_or_create_with_config,
@@ -35,6 +43,7 @@
cache_remove_if_equals, cache_replace_if_equals, cache_get_size,
)
from .api.sql import scan, scan_cursor_get_page, sql, sql_cursor_get_page
+from .api.affinity import cache_get_node_partitions
PROP_CODES = set([
@@ -63,6 +72,8 @@
:py:meth:`~pyignite.client.Client.get_cache` methods instead. See
:ref:`this example <create_cache>` on how to do it.
"""
+
+ affinity = None
_cache_id = None
_name = None
_client = None
@@ -70,7 +81,7 @@
@staticmethod
def _validate_settings(
- settings: Union[str, dict]=None, get_only: bool=False,
+ settings: Union[str, dict] = None, get_only: bool = False,
):
if any([
not settings,
@@ -89,8 +100,8 @@
raise ParameterError('Only cache name allowed as a parameter')
def __init__(
- self, client: 'Client', settings: Union[str, dict]=None,
- with_get: bool=False, get_only: bool=False,
+ self, client: 'Client', settings: Union[str, dict] = None,
+ with_get: bool = False, get_only: bool = False,
):
"""
Initialize cache object.
@@ -113,11 +124,26 @@
if not get_only:
func = CACHE_CREATE_FUNCS[type(settings) is dict][with_get]
- result = func(client, settings)
+ result = func(client.random_node, settings)
if result.status != 0:
raise CacheCreationError(result.message)
self._cache_id = cache_id(self._name)
+ self.affinity = {
+ 'version': (0, 0),
+ }
+
+ def get_protocol_version(self) -> Optional[Tuple]:
+ """
+ Returns the tuple of major, minor, and revision numbers of the used
+ thin protocol version, or None, if no connection to the Ignite cluster
+ was not yet established.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+ """
+ return self.client.protocol_version
@property
def settings(self) -> Optional[dict]:
@@ -130,7 +156,10 @@
:return: dict of cache properties and their values.
"""
if self._settings is None:
- config_result = cache_get_configuration(self._client, self._cache_id)
+ config_result = cache_get_configuration(
+ self.get_best_node(),
+ self._cache_id
+ )
if config_result.status == 0:
self._settings = config_result.value
else:
@@ -185,10 +214,124 @@
"""
Destroys cache with a given name.
"""
- return cache_destroy(self._client, self._cache_id)
+ return cache_destroy(self.get_best_node(), self._cache_id)
@status_to_exception(CacheError)
- def get(self, key, key_hint: object=None) -> Any:
+ def _get_affinity(self, conn: 'Connection') -> Dict:
+ """
+ Queries server for affinity mappings. Retries in case
+ of an intermittent error (most probably “Getting affinity for topology
+ version earlier than affinity is calculated”).
+
+ :param conn: connection to Igneite server,
+ :return: OP_CACHE_PARTITIONS operation result value.
+ """
+ for _ in range(AFFINITY_RETRIES or 1):
+ result = cache_get_node_partitions(conn, self._cache_id)
+ if result.status == 0 and result.value['partition_mapping']:
+ break
+ time.sleep(AFFINITY_DELAY)
+
+ return result
+
+ def get_best_node(
+ self, key: Any = None, key_hint: 'IgniteDataType' = None,
+ ) -> 'Connection':
+ """
+ Returns the node from the list of the nodes, opened by client, that
+ most probably contains the needed key-value pair. See IEP-23.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+
+ :param key: (optional) pythonic key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: Ignite connection object.
+ """
+ conn = self._client.random_node
+
+ if self.client.partition_aware and key is not None:
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
+ if self.affinity['version'] < self._client.affinity_version:
+ # update partition mapping
+ while True:
+ try:
+ self.affinity = self._get_affinity(conn)
+ break
+ except connection_errors:
+ # retry if connection failed
+ pass
+ except CacheError:
+ # server did not create mapping in time
+ return conn
+
+ # flatten it a bit
+ try:
+ self.affinity.update(self.affinity['partition_mapping'][0])
+ except IndexError:
+ return conn
+ del self.affinity['partition_mapping']
+
+ # calculate the number of partitions
+ parts = 0
+ if 'node_mapping' in self.affinity:
+ for p in self.affinity['node_mapping'].values():
+ parts += len(p)
+
+ self.affinity['number_of_partitions'] = parts
+ else:
+ # get number of partitions
+ parts = self.affinity.get('number_of_partitions')
+
+ if not parts:
+ return conn
+
+ if self.affinity['is_applicable']:
+ affinity_key_id = self.affinity['cache_config'].get(
+ key_hint.type_id,
+ None
+ )
+ if affinity_key_id and isinstance(key, GenericObjectMeta):
+ key, key_hint = get_field_by_id(key, affinity_key_id)
+
+ # calculate partition for key or affinity key
+ # (algorithm is taken from `RendezvousAffinityFunction.java`)
+ base_value = key_hint.hashcode(key, self._client)
+ mask = parts - 1
+
+ if parts & mask == 0:
+ part = (base_value ^ (unsigned(base_value) >> 16)) & mask
+ else:
+ part = abs(base_value // parts)
+
+ assert 0 <= part < parts, 'Partition calculation has failed'
+
+ # search for connection
+ try:
+ node_uuid, best_conn = None, None
+ for u, p in self.affinity['node_mapping'].items():
+ if part in p:
+ node_uuid = u
+ break
+
+ if node_uuid:
+ for n in conn.client._nodes:
+ if n.uuid == node_uuid:
+ best_conn = n
+ break
+ if best_conn and best_conn.alive:
+ conn = best_conn
+ except KeyError:
+ pass
+
+ return conn
+
+ @status_to_exception(CacheError)
+ def get(self, key, key_hint: object = None) -> Any:
"""
Retrieves a value from cache by key.
@@ -197,12 +340,22 @@
should be converted,
:return: value retrieved.
"""
- result = cache_get(self._client, self._cache_id, key, key_hint=key_hint)
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
+ result = cache_get(
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key,
+ key_hint=key_hint
+ )
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
- def put(self, key, value, key_hint: object=None, value_hint: object=None):
+ def put(
+ self, key, value, key_hint: object = None, value_hint: object = None
+ ):
"""
Puts a value with a given key to cache (overwriting existing value
if any).
@@ -214,8 +367,12 @@
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
return cache_put(
- self._client, self._cache_id, key, value,
+ self.get_best_node(key, key_hint),
+ self._cache_id, key, value,
key_hint=key_hint, value_hint=value_hint
)
@@ -227,7 +384,7 @@
:param keys: list of keys or tuples of (key, key_hint),
:return: a dict of key-value pairs.
"""
- result = cache_get_all(self._client, self._cache_id, keys)
+ result = cache_get_all(self.get_best_node(), self._cache_id, keys)
if result.value:
for key, value in result.value.items():
result.value[key] = self._process_binary(value)
@@ -243,11 +400,11 @@
to save. Each key or value can be an item of representable
Python type or a tuple of (item, hint),
"""
- return cache_put_all(self._client, self._cache_id, pairs)
+ return cache_put_all(self.get_best_node(), self._cache_id, pairs)
@status_to_exception(CacheError)
def replace(
- self, key, value, key_hint: object=None, value_hint: object=None
+ self, key, value, key_hint: object = None, value_hint: object = None
):
"""
Puts a value with a given key to cache only if the key already exist.
@@ -259,28 +416,33 @@
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
result = cache_replace(
- self._client, self._cache_id, key, value,
+ self.get_best_node(key, key_hint),
+ self._cache_id, key, value,
key_hint=key_hint, value_hint=value_hint
)
result.value = self._process_binary(result.value)
return result
@status_to_exception(CacheError)
- def clear(self, keys: Optional[list]=None):
+ def clear(self, keys: Optional[list] = None):
"""
Clears the cache without notifying listeners or cache writers.
:param keys: (optional) list of cache keys or (key, key type
hint) tuples to clear (default: clear all).
"""
+ conn = self.get_best_node()
if keys:
- return cache_clear_keys(self._client, self._cache_id, keys)
+ return cache_clear_keys(conn, self._cache_id, keys)
else:
- return cache_clear(self._client, self._cache_id)
+ return cache_clear(conn, self._cache_id)
@status_to_exception(CacheError)
- def clear_key(self, key, key_hint: object=None):
+ def clear_key(self, key, key_hint: object = None):
"""
Clears the cache key without notifying listeners or cache writers.
@@ -288,8 +450,14 @@
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
return cache_clear_key(
- self._client, self._cache_id, key, key_hint=key_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key,
+ key_hint=key_hint
)
@status_to_exception(CacheError)
@@ -302,8 +470,14 @@
should be converted,
:return: boolean `True` when key is present, `False` otherwise.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
return cache_contains_key(
- self._client, self._cache_id, key, key_hint=key_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key,
+ key_hint=key_hint
)
@status_to_exception(CacheError)
@@ -330,8 +504,14 @@
value should be converted.
:return: old value or None.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
result = cache_get_and_put(
- self._client, self._cache_id, key, value, key_hint, value_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key, value,
+ key_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@@ -352,8 +532,14 @@
value should be converted,
:return: old value or None.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
result = cache_get_and_put_if_absent(
- self._client, self._cache_id, key, value, key_hint, value_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key, value,
+ key_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@@ -371,8 +557,14 @@
:param value_hint: (optional) Ignite data type, for which the given
value should be converted.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
return cache_put_if_absent(
- self._client, self._cache_id, key, value, key_hint, value_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key, value,
+ key_hint, value_hint
)
@status_to_exception(CacheError)
@@ -385,8 +577,14 @@
should be converted,
:return: old value or None.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
result = cache_get_and_remove(
- self._client, self._cache_id, key, key_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key,
+ key_hint
)
result.value = self._process_binary(result.value)
return result
@@ -408,8 +606,14 @@
value should be converted.
:return: old value or None.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
result = cache_get_and_replace(
- self._client, self._cache_id, key, value, key_hint, value_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key, value,
+ key_hint, value_hint
)
result.value = self._process_binary(result.value)
return result
@@ -423,7 +627,12 @@
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
"""
- return cache_remove_key(self._client, self._cache_id, key, key_hint)
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
+ return cache_remove_key(
+ self.get_best_node(key, key_hint), self._cache_id, key, key_hint
+ )
@status_to_exception(CacheError)
def remove_keys(self, keys: list):
@@ -433,14 +642,16 @@
:param keys: list of keys or tuples of (key, key_hint) to remove.
"""
- return cache_remove_keys(self._client, self._cache_id, keys)
+ return cache_remove_keys(
+ self.get_best_node(), self._cache_id, keys
+ )
@status_to_exception(CacheError)
def remove_all(self):
"""
Removes all cache entries, notifying listeners and cache writers.
"""
- return cache_remove_all(self._client, self._cache_id)
+ return cache_remove_all(self.get_best_node(), self._cache_id)
@status_to_exception(CacheError)
def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
@@ -455,8 +666,14 @@
:param sample_hint: (optional) Ignite data type, for whic
the given sample should be converted.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
return cache_remove_if_equals(
- self._client, self._cache_id, key, sample, key_hint, sample_hint
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key, sample,
+ key_hint, sample_hint
)
@status_to_exception(CacheError)
@@ -479,8 +696,13 @@
value should be converted,
:return: boolean `True` when key is present, `False` otherwise.
"""
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
result = cache_replace_if_equals(
- self._client, self._cache_id, key, sample, value,
+ self.get_best_node(key, key_hint),
+ self._cache_id,
+ key, sample, value,
key_hint, sample_hint, value_hint
)
result.value = self._process_binary(result.value)
@@ -496,9 +718,13 @@
(PeekModes.BACKUP). Defaults to all cache partitions (PeekModes.ALL),
:return: integer number of cache entries.
"""
- return cache_get_size(self._client, self._cache_id, peek_modes)
+ return cache_get_size(
+ self.get_best_node(), self._cache_id, peek_modes
+ )
- def scan(self, page_size: int=1, partitions: int=-1, local: bool=False):
+ def scan(
+ self, page_size: int = 1, partitions: int = -1, local: bool = False
+ ):
"""
Returns all key-value pairs from the cache, similar to `get_all`, but
with internal pagination, which is slower, but safer.
@@ -511,7 +737,15 @@
on local node only. Defaults to False,
:return: generator with key-value pairs.
"""
- result = scan(self._client, self._cache_id, page_size, partitions, local)
+ node = self.get_best_node()
+
+ result = scan(
+ node,
+ self._cache_id,
+ page_size,
+ partitions,
+ local
+ )
if result.status != 0:
raise CacheError(result.message)
@@ -522,7 +756,7 @@
yield k, v
while result.value['more']:
- result = scan_cursor_get_page(self._client, cursor)
+ result = scan_cursor_get_page(node, cursor)
if result.status != 0:
raise CacheError(result.message)
@@ -532,9 +766,9 @@
yield k, v
def select_row(
- self, query_str: str, page_size: int=1,
- query_args: Optional[list]=None, distributed_joins: bool=False,
- replicated_only: bool=False, local: bool=False, timeout: int=0
+ self, query_str: str, page_size: int = 1,
+ query_args: Optional[list] = None, distributed_joins: bool = False,
+ replicated_only: bool = False, local: bool = False, timeout: int = 0
):
"""
Executes a simplified SQL SELECT query over data stored in the cache.
@@ -554,6 +788,8 @@
disables timeout (default),
:return: generator with key-value pairs.
"""
+ node = self.get_best_node()
+
def generate_result(value):
cursor = value['cursor']
more = value['more']
@@ -563,7 +799,7 @@
yield k, v
while more:
- inner_result = sql_cursor_get_page(self._client, cursor)
+ inner_result = sql_cursor_get_page(node, cursor)
if result.status != 0:
raise SQLError(result.message)
more = inner_result.value['more']
@@ -578,7 +814,7 @@
if not type_name:
raise SQLError('Value type is unknown')
result = sql(
- self._client,
+ node,
self._cache_id,
type_name,
query_str,
diff --git a/pyignite/client.py b/pyignite/client.py
index d5a9464..3202b78 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -41,7 +41,10 @@
"""
from collections import defaultdict, OrderedDict
-from typing import Iterable, Type, Union
+import random
+import re
+from itertools import chain
+from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
from .api.binary import get_binary_type, put_binary_type
from .api.cache_config import cache_get_names
@@ -51,15 +54,20 @@
from .constants import *
from .datatypes import BinaryObject
from .datatypes.internal import tc_map
-from .exceptions import BinaryTypeError, CacheError, SQLError
-from .utils import entity_id, schema_id, status_to_exception
+from .exceptions import (
+ BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
+)
+from .utils import (
+ capitalize, entity_id, schema_id, process_delimiter,
+ status_to_exception, is_iterable,
+)
from .binary import GenericObjectMeta
__all__ = ['Client']
-class Client(Connection):
+class Client:
"""
This is a main `pyignite` class, that is build upon the
:class:`~pyignite.connection.Connection`. In addition to the attributes,
@@ -72,14 +80,22 @@
"""
_registry = defaultdict(dict)
- _compact_footer = None
+ _compact_footer: bool = None
+ _connection_args: Dict = None
+ _current_node: int = None
+ _nodes: List[Connection] = None
- def _transfer_params(self, to: 'Client'):
- super()._transfer_params(to)
- to._registry = self._registry
- to._compact_footer = self._compact_footer
+ # used for Complex object data class names sanitizing
+ _identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
+ _ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)
- def __init__(self, compact_footer: bool=None, *args, **kwargs):
+ affinity_version: Optional[Tuple] = None
+ protocol_version: Optional[Tuple] = None
+
+ def __init__(
+ self, compact_footer: bool = None, partition_aware: bool = False,
+ **kwargs
+ ):
"""
Initialize client.
@@ -88,9 +104,154 @@
Default is to use the same approach the server is using (None).
Apache Ignite binary protocol documentation on this topic:
https://apacheignite.readme.io/docs/binary-client-protocol-data-format#section-schema
+ :param partition_aware: (optional) try to calculate the exact data
+ placement from the key before to issue the key operation to the
+ server node:
+ https://cwiki.apache.org/confluence/display/IGNITE/IEP-23%3A+Best+Effort+Affinity+for+thin+clients
+ The feature is in experimental status, so the parameter is `False`
+ by default. This will be changed later.
"""
self._compact_footer = compact_footer
- super().__init__(*args, **kwargs)
+ self._connection_args = kwargs
+ self._nodes = []
+ self._current_node = 0
+ self._partition_aware = partition_aware
+ self.affinity_version = (0, 0)
+
+ def get_protocol_version(self) -> Optional[Tuple]:
+ """
+ Returns the tuple of major, minor, and revision numbers of the used
+ thin protocol version, or None, if no connection to the Ignite cluster
+ was not yet established.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+ """
+ return self.protocol_version
+
+ @property
+ def partition_aware(self):
+ return self._partition_aware and self.partition_awareness_supported_by_protocol
+
+ @property
+ def partition_awareness_supported_by_protocol(self):
+ # TODO: Need to re-factor this. I believe, we need separate class or
+ # set of functions to work with protocol versions without manually
+ # comparing versions with just some random tuples
+ return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
+
+ def connect(self, *args):
+ """
+ Connect to Ignite cluster node(s).
+
+ :param args: (optional) host(s) and port(s) to connect to.
+ """
+ if len(args) == 0:
+ # no parameters − use default Ignite host and port
+ nodes = [(IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT)]
+ elif len(args) == 1 and is_iterable(args[0]):
+ # iterable of host-port pairs is given
+ nodes = args[0]
+ elif (
+ len(args) == 2
+ and isinstance(args[0], str)
+ and isinstance(args[1], int)
+ ):
+ # host and port are given
+ nodes = [args]
+ else:
+ raise ConnectionError('Connection parameters are not valid.')
+
+ # the following code is quite twisted, because the protocol version
+ # is initially unknown
+
+ # TODO: open first node in foreground, others − in background
+ for i, node in enumerate(nodes):
+ host, port = node
+ conn = Connection(self, **self._connection_args)
+ conn.host = host
+ conn.port = port
+
+ try:
+ if self.protocol_version is None or self.partition_aware:
+ # open connection before adding to the pool
+ conn.connect(host, port)
+
+ # now we have the protocol version
+ if not self.partition_aware:
+ # do not try to open more nodes
+ self._current_node = i
+ else:
+ # take a chance to schedule the reconnection
+ # for all the failed connections, that was probed
+ # before this
+ for failed_node in self._nodes[:i]:
+ failed_node.reconnect()
+
+ except connection_errors:
+ conn._fail()
+ if self.partition_aware:
+ # schedule the reconnection
+ conn.reconnect()
+
+ self._nodes.append(conn)
+
+ if self.protocol_version is None:
+ raise ReconnectError('Can not connect.')
+
+ def close(self):
+ for conn in self._nodes:
+ conn.close()
+ self._nodes.clear()
+
+ @property
+ def random_node(self) -> Connection:
+ """
+ Returns random usable node.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+ """
+ if self.partition_aware:
+ # if partition awareness is used just pick a random connected node
+ try:
+ return random.choice(
+ list(n for n in self._nodes if n.alive)
+ )
+ except IndexError:
+ # cannot choose from an empty sequence
+ raise ReconnectError('Can not reconnect: out of nodes.') from None
+ else:
+ # if partition awareness is not used then just return the current
+ # node if it's alive or the next usable node if connection with the
+ # current is broken
+ node = self._nodes[self._current_node]
+ if node.alive:
+ return node
+
+ # close current (supposedly failed) node
+ self._nodes[self._current_node].close()
+
+ # advance the node index
+ self._current_node += 1
+ if self._current_node >= len(self._nodes):
+ self._current_node = 0
+
+ # prepare the list of node indexes to try to connect to
+ num_nodes = len(self._nodes)
+ for i in chain(range(self._current_node, num_nodes), range(self._current_node)):
+ node = self._nodes[i]
+ try:
+ node.connect(node.host, node.port)
+ except connection_errors:
+ pass
+ else:
+ return node
+
+ # no nodes left
+ raise ReconnectError('Can not reconnect: out of nodes.')
@status_to_exception(BinaryTypeError)
def get_binary_type(self, binary_type: Union[str, int]) -> dict:
@@ -135,7 +296,9 @@
)
return converted_schema
- result = get_binary_type(self, binary_type)
+ conn = self.random_node
+
+ result = get_binary_type(conn, binary_type)
if result.status != 0 or not result.value['type_exists']:
return result
@@ -178,8 +341,8 @@
@status_to_exception(BinaryTypeError)
def put_binary_type(
- self, type_name: str, affinity_key_field: str=None,
- is_enum=False, schema: dict=None
+ self, type_name: str, affinity_key_field: str = None,
+ is_enum=False, schema: dict = None
):
"""
Registers binary type information in cluster. Do not update binary
@@ -197,11 +360,11 @@
Binary type with no fields is OK.
"""
return put_binary_type(
- self, type_name, affinity_key_field, is_enum, schema
+ self.random_node, type_name, affinity_key_field, is_enum, schema
)
@staticmethod
- def _create_dataclass(type_name: str, schema: OrderedDict=None) -> Type:
+ def _create_dataclass(type_name: str, schema: OrderedDict = None) -> Type:
"""
Creates default (generic) class for Ignite Complex object.
@@ -224,13 +387,42 @@
for schema in type_info['schemas']:
if not self._registry[type_id].get(schema_id(schema), None):
data_class = self._create_dataclass(
- type_info['type_name'],
+ self._create_type_name(type_info['type_name']),
schema,
)
self._registry[type_id][schema_id(schema)] = data_class
+ @classmethod
+ def _create_type_name(cls, type_name: str) -> str:
+ """
+ Creates Python data class name from Ignite binary type name.
+
+ Handles all the special cases found in
+ `java.org.apache.ignite.binary.BinaryBasicNameMapper.simpleName()`.
+ Tries to adhere to PEP8 along the way.
+ """
+
+ # general sanitizing
+ type_name = cls._identifier.sub('', type_name)
+
+ # - name ending with '$' (Scala)
+ # - name + '$' + some digits (anonymous class)
+ # - '$$Lambda$' in the middle
+ type_name = process_delimiter(type_name, '$')
+
+ # .NET outer/inner class delimiter
+ type_name = process_delimiter(type_name, '+')
+
+ # Java fully qualified class name
+ type_name = process_delimiter(type_name, '.')
+
+ # start chars sanitizing
+ type_name = capitalize(cls._ident_start.sub('', type_name))
+
+ return type_name
+
def register_binary_type(
- self, data_class: Type, affinity_key_field: str=None,
+ self, data_class: Type, affinity_key_field: str = None,
):
"""
Register the given class as a representation of a certain Complex
@@ -250,8 +442,8 @@
self._registry[data_class.type_id][data_class.schema_id] = data_class
def query_binary_type(
- self, binary_type: Union[int, str], schema: Union[int, dict]=None,
- sync: bool=True
+ self, binary_type: Union[int, str], schema: Union[int, dict] = None,
+ sync: bool = True
):
"""
Queries the registry of Complex object classes.
@@ -324,16 +516,16 @@
:return: list of cache names.
"""
- return cache_get_names(self)
+ return cache_get_names(self.random_node)
def sql(
- self, query_str: str, page_size: int=1, query_args: Iterable=None,
- schema: Union[int, str]='PUBLIC',
- statement_type: int=0, distributed_joins: bool=False,
- local: bool=False, replicated_only: bool=False,
- enforce_join_order: bool=False, collocated: bool=False,
- lazy: bool=False, include_field_names: bool=False,
- max_rows: int=-1, timeout: int=0,
+ self, query_str: str, page_size: int = 1, query_args: Iterable = None,
+ schema: Union[int, str] = 'PUBLIC',
+ statement_type: int = 0, distributed_joins: bool = False,
+ local: bool = False, replicated_only: bool = False,
+ enforce_join_order: bool = False, collocated: bool = False,
+ lazy: bool = False, include_field_names: bool = False,
+ max_rows: int = -1, timeout: int = 0,
):
"""
Runs an SQL query and returns its result.
@@ -384,7 +576,7 @@
while more:
inner_result = sql_fields_cursor_get_page(
- self, cursor, field_count
+ conn, cursor, field_count
)
if inner_result.status != 0:
raise SQLError(result.message)
@@ -392,9 +584,11 @@
for line in inner_result.value['data']:
yield line
+ conn = self.random_node
+
schema = self.get_or_create_cache(schema)
result = sql_fields(
- self, schema.cache_id, query_str,
+ conn, schema.cache_id, query_str,
page_size, query_args, schema.name,
statement_type, distributed_joins, local, replicated_only,
enforce_join_order, collocated, lazy, include_field_names,
diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py
index 1f6f0c0..cf40718 100644
--- a/pyignite/connection/__init__.py
+++ b/pyignite/connection/__init__.py
@@ -33,15 +33,20 @@
as well as Ignite protocol handshaking.
"""
+from collections import OrderedDict
import socket
+from threading import Lock
+from typing import Union
from pyignite.constants import *
from pyignite.exceptions import (
- HandshakeError, ParameterError, ReconnectError, SocketError,
+ HandshakeError, ParameterError, SocketError, connection_errors,
)
+from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
+from pyignite.datatypes.internal import Struct
+from pyignite.utils import DaemonicTimer
-from pyignite.utils import is_iterable
-from .handshake import HandshakeRequest, read_response
+from .handshake import HandshakeRequest
from .ssl import wrap
@@ -60,18 +65,22 @@
"""
_socket = None
- nodes = None
+ _failed = None
+ _in_use = None
+
+ client = None
host = None
port = None
timeout = None
prefetch = None
username = None
password = None
+ ssl_params = {}
+ uuid = None
@staticmethod
- def _check_kwargs(kwargs):
+ def _check_ssl_params(params):
expected_args = [
- 'timeout',
'use_ssl',
'ssl_version',
'ssl_ciphers',
@@ -80,22 +89,24 @@
'ssl_keyfile_password',
'ssl_certfile',
'ssl_ca_certfile',
- 'username',
- 'password',
]
- for kw in kwargs:
- if kw not in expected_args:
+ for param in params:
+ if param not in expected_args:
raise ParameterError((
'Unexpected parameter for connection initialization: `{}`'
- ).format(kw))
+ ).format(param))
- def __init__(self, prefetch: bytes=b'', **kwargs):
+ def __init__(
+ self, client: 'Client', prefetch: bytes = b'', timeout: int = None,
+ username: str = None, password: str = None, **ssl_params
+ ):
"""
Initialize connection.
For the use of the SSL-related parameters see
https://docs.python.org/3/library/ssl.html#ssl-certificates.
+ :param client: Ignite client object,
:param prefetch: (optional) initialize the read-ahead data buffer.
Empty by default,
:param timeout: (optional) sets timeout (in seconds) for each socket
@@ -131,47 +142,159 @@
cluster,
:param password: (optional) password to authenticate to Ignite cluster.
"""
+ self.client = client
self.prefetch = prefetch
- self._check_kwargs(kwargs)
- self.timeout = kwargs.pop('timeout', None)
- self.username = kwargs.pop('username', None)
- self.password = kwargs.pop('password', None)
- if all([self.username, self.password, 'use_ssl' not in kwargs]):
- kwargs['use_ssl'] = True
- self.init_kwargs = kwargs
-
- read_response = read_response
- _wrap = wrap
+ self.timeout = timeout
+ self.username = username
+ self.password = password
+ self._check_ssl_params(ssl_params)
+ if self.username and self.password and 'use_ssl' not in ssl_params:
+ ssl_params['use_ssl'] = True
+ self.ssl_params = ssl_params
+ self._failed = False
+ self._in_use = Lock()
@property
def socket(self) -> socket.socket:
- """
- Network socket.
- """
- if self._socket is None:
- self._reconnect()
+ """ Network socket. """
return self._socket
- def __repr__(self) -> str:
- if self.host and self.port:
- return '{}:{}'.format(self.host, self.port)
- else:
- return '<not connected>'
+ @property
+ def closed(self) -> bool:
+ """ Tells if socket is closed. """
+ return self._socket is None
- def _connect(self, host: str, port: int):
+ @property
+ def failed(self) -> bool:
+ """ Tells if connection is failed. """
+ return self._failed
+
+ @property
+ def alive(self) -> bool:
+ """ Tells if connection is up and no failure detected. """
+ return not (self._failed or self.closed)
+
+ def __repr__(self) -> str:
+ return '{}:{}'.format(self.host or '?', self.port or '?')
+
+ _wrap = wrap
+
+ def get_protocol_version(self):
"""
- Actually connect socket.
+ Returns the tuple of major, minor, and revision numbers of the used
+ thin protocol version, or None, if no connection to the Ignite cluster
+ was yet established.
"""
+ return self.client.protocol_version
+
+ def _fail(self):
+ """ set client to failed state. """
+ self._failed = True
+ self._in_use.release()
+
+ def read_response(self) -> Union[dict, OrderedDict]:
+ """
+ Processes server's response to the handshake request.
+
+ :return: handshake data.
+ """
+ response_start = Struct([
+ ('length', Int),
+ ('op_code', Byte),
+ ])
+ start_class, start_buffer = response_start.parse(self)
+ start = start_class.from_buffer_copy(start_buffer)
+ data = response_start.to_python(start)
+ response_end = None
+ if data['op_code'] == 0:
+ response_end = Struct([
+ ('version_major', Short),
+ ('version_minor', Short),
+ ('version_patch', Short),
+ ('message', String),
+ ])
+ elif self.get_protocol_version() >= (1, 4, 0):
+ response_end = Struct([
+ ('node_uuid', UUIDObject),
+ ])
+ if response_end:
+ end_class, end_buffer = response_end.parse(self)
+ end = end_class.from_buffer_copy(end_buffer)
+ data.update(response_end.to_python(end))
+ return data
+
+ def connect(
+ self, host: str = None, port: int = None
+ ) -> Union[dict, OrderedDict]:
+ """
+ Connect to the given server node with protocol version fallback.
+
+ :param host: Ignite server node's host name or IP,
+ :param port: Ignite server node's port number.
+ """
+ detecting_protocol = False
+
+ # go non-blocking for faster reconnect
+ if not self._in_use.acquire(blocking=False):
+ raise ConnectionError('Connection is in use.')
+
+ # choose highest version first
+ if self.client.protocol_version is None:
+ detecting_protocol = True
+ self.client.protocol_version = max(PROTOCOLS)
+
+ try:
+ result = self._connect_version(host, port)
+ except HandshakeError as e:
+ if e.expected_version in PROTOCOLS:
+ self.client.protocol_version = e.expected_version
+ result = self._connect_version(host, port)
+ else:
+ raise e
+ except connection_errors:
+ # restore undefined protocol version
+ if detecting_protocol:
+ self.client.protocol_version = None
+ raise
+
+ # connection is ready for end user
+ self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
+
+ self._failed = False
+ return result
+
+ def _connect_version(
+ self, host: str = None, port: int = None,
+ ) -> Union[dict, OrderedDict]:
+ """
+ Connect to the given server node using protocol version
+ defined on client.
+
+ :param host: Ignite server node's host name or IP,
+ :param port: Ignite server node's port number.
+ """
+
+ host = host or IGNITE_DEFAULT_HOST
+ port = port or IGNITE_DEFAULT_PORT
+
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self.timeout)
self._socket = self._wrap(self.socket)
self._socket.connect((host, port))
- hs_request = HandshakeRequest(self.username, self.password)
+ protocol_version = self.client.protocol_version
+
+ hs_request = HandshakeRequest(
+ protocol_version,
+ self.username,
+ self.password
+ )
self.send(hs_request)
hs_response = self.read_response()
if hs_response['op_code'] == 0:
- self.close()
+ # disconnect but keep in use
+ self.close(release=False)
+
error_text = 'Handshake error: {}'.format(hs_response['message'])
# if handshake fails for any reason other than protocol mismatch
# (i.e. authentication error), server version is 0.0.0
@@ -185,74 +308,78 @@
'{version_major}.{version_minor}.{version_patch}. Client '
'provides {client_major}.{client_minor}.{client_patch}.'
).format(
- client_major=PROTOCOL_VERSION_MAJOR,
- client_minor=PROTOCOL_VERSION_MINOR,
- client_patch=PROTOCOL_VERSION_PATCH,
+ client_major=protocol_version[0],
+ client_minor=protocol_version[1],
+ client_patch=protocol_version[2],
**hs_response
)
- raise HandshakeError(error_text)
+ raise HandshakeError((
+ hs_response['version_major'],
+ hs_response['version_minor'],
+ hs_response['version_patch'],
+ ), error_text)
self.host, self.port = host, port
+ return hs_response
- def connect(self, *args):
+ def reconnect(self, seq_no=0):
"""
- Connect to the server. Connection parameters may be either one node
- (host and port), or list (or other iterable) of nodes.
-
- :param host: Ignite server host,
- :param port: Ignite server port,
- :param nodes: iterable of (host, port) tuples.
+ Tries to reconnect synchronously, then in background.
"""
- self.nodes = iter([])
- if len(args) == 0:
- host, port = IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT
- elif len(args) == 1 and is_iterable(args[0]):
- self.nodes = iter(args[0])
- host, port = next(self.nodes)
- elif (
- len(args) == 2
- and isinstance(args[0], str)
- and isinstance(args[1], int)
- ):
- host, port = args
- else:
- raise ConnectionError('Connection parameters are not valid.')
- self._connect(host, port)
+ # stop trying to reconnect
+ if seq_no >= len(RECONNECT_BACKOFF_SEQUENCE):
+ self._failed = False
+
+ self._reconnect()
+
+ if self.failed:
+ DaemonicTimer(
+ RECONNECT_BACKOFF_SEQUENCE[seq_no],
+ self.reconnect,
+ kwargs={'seq_no': seq_no + 1},
+ ).start()
def _reconnect(self):
- """
- Restore the connection using the next node in `nodes` iterable.
- """
- for host, port in self.nodes:
- try:
- self._connect(host, port)
- return
- except OSError:
- pass
- self.host = self.port = self.nodes = None
- # exception chaining gives a misleading traceback here
- raise ReconnectError('Can not reconnect: out of nodes') from None
+ # do not reconnect if connection is already working
+ # or was closed on purpose
+ if not self.failed:
+ return
+
+ # return connection to initial state regardless of use lock
+ self.close(release=False)
+ try:
+ self._in_use.release()
+ except RuntimeError:
+ pass
+
+ # connect and silence the connection errors
+ try:
+ self.connect(self.host, self.port)
+ except connection_errors:
+ pass
def _transfer_params(self, to: 'Connection'):
"""
Transfer non-SSL parameters to target connection object.
- :param target: connection object to transfer parameters to.
+ :param to: connection object to transfer parameters to.
"""
to.username = self.username
to.password = self.password
- to.nodes = self.nodes
+ to.client = self.client
+ to.host = self.host
+ to.port = self.port
- def clone(self, prefetch: bytes=b'') -> 'Connection':
+ def clone(self, prefetch: bytes = b'') -> 'Connection':
"""
Clones this connection in its current state.
:return: `Connection` object.
"""
- clone = self.__class__(**self.init_kwargs)
+ clone = self.__class__(self.client, **self.ssl_params)
self._transfer_params(to=clone)
- if self.port and self.host:
- clone._connect(self.host, self.port)
+ if self.alive:
+ clone.connect(self.host, self.port)
clone.prefetch = prefetch
return clone
@@ -263,6 +390,9 @@
:param data: bytes to send,
:param flags: (optional) OS-specific flags.
"""
+ if self.closed:
+ raise SocketError('Attempt to use closed connection.')
+
kwargs = {}
if flags is not None:
kwargs['flags'] = flags
@@ -271,13 +401,18 @@
while total_bytes_sent < len(data):
try:
- bytes_sent = self.socket.send(data[total_bytes_sent:], **kwargs)
- except OSError:
- self._socket = self.host = self.port = None
+ bytes_sent = self.socket.send(
+ data[total_bytes_sent:],
+ **kwargs
+ )
+ except connection_errors:
+ self._fail()
+ self.reconnect()
raise
if bytes_sent == 0:
- self.socket.close()
- raise SocketError('Socket connection broken.')
+ self._fail()
+ self.reconnect()
+ raise SocketError('Connection broken.')
total_bytes_sent += bytes_sent
def recv(self, buffersize, flags=None) -> bytes:
@@ -288,14 +423,18 @@
:param flags: (optional) OS-specific flags,
:return: data received.
"""
+ if self.closed:
+ raise SocketError('Attempt to use closed connection.')
+
pref_size = len(self.prefetch)
if buffersize > pref_size:
result = self.prefetch
self.prefetch = b''
try:
result += self._recv(buffersize-pref_size, flags)
- except (SocketError, OSError):
- self._socket = self.host = self.port = None
+ except connection_errors:
+ self._fail()
+ self.reconnect()
raise
return result
else:
@@ -316,18 +455,28 @@
while bytes_rcvd < buffersize:
chunk = self.socket.recv(buffersize-bytes_rcvd, **kwargs)
if chunk == b'':
- self.socket.close()
- raise SocketError('Socket connection broken.')
+ raise SocketError('Connection broken.')
chunks.append(chunk)
bytes_rcvd += len(chunk)
return b''.join(chunks)
- def close(self):
+ def close(self, release=True):
"""
- Mark socket closed. This is recommended but not required, since
- sockets are automatically closed when they are garbage-collected.
+ Try to mark socket closed, then unlink it. This is recommended but
+ not required, since sockets are automatically closed when
+ garbage-collected.
"""
- self._socket.shutdown(socket.SHUT_RDWR)
- self._socket.close()
- self._socket = self.host = self.port = None
+ if release:
+ try:
+ self._in_use.release()
+ except RuntimeError:
+ pass
+
+ if self._socket:
+ try:
+ self._socket.shutdown(socket.SHUT_RDWR)
+ self._socket.close()
+ except connection_errors:
+ pass
+ self._socket = None
diff --git a/pyignite/connection/generators.py b/pyignite/connection/generators.py
deleted file mode 100644
index d76db0e..0000000
--- a/pyignite/connection/generators.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class RoundRobin:
- """
- Round-robin generator for use with `Client.connect()`. Cycles a node
- list until a maximum number of reconnects is reached (if set).
- """
-
- def __init__(self, nodes: list, max_reconnects: int=None):
- """
- :param nodes: list of two-tuples of (host, port) format,
- :param max_reconnects: (optional) maximum number of reconnect attempts.
- defaults to None (cycle nodes infinitely).
- """
- self.nodes = nodes
- self.max_reconnects = max_reconnects
- self.node_index = 0
- self.reconnects = 0
-
- def __iter__(self) -> 'RoundRobin':
- return self
-
- def __next__(self) -> tuple:
- if self.max_reconnects is not None:
- if self.reconnects >= self.max_reconnects:
- raise StopIteration
- else:
- self.reconnects += 1
-
- if self.node_index >= len(self.nodes):
- self.node_index = 0
- node = self.nodes[self.node_index]
- self.node_index += 1
- return node
diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py
index 13d57fe..2e0264f 100644
--- a/pyignite/connection/handshake.py
+++ b/pyignite/connection/handshake.py
@@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Optional
+from typing import Optional, Tuple
-from pyignite.constants import *
from pyignite.datatypes import Byte, Int, Short, String
from pyignite.datatypes.internal import Struct
@@ -27,9 +26,11 @@
handshake_struct = None
username = None
password = None
+ protocol_version = None
def __init__(
- self, username: Optional[str]=None, password: Optional[str]=None
+ self, protocol_version: Tuple[int, int, int],
+ username: Optional[str] = None, password: Optional[str] = None
):
fields = [
('length', Int),
@@ -39,6 +40,7 @@
('version_patch', Short),
('client_code', Byte),
]
+ self.protocol_version = protocol_version
if username and password:
self.username = username
self.password = password
@@ -52,9 +54,9 @@
handshake_data = {
'length': 8,
'op_code': OP_HANDSHAKE,
- 'version_major': PROTOCOL_VERSION_MAJOR,
- 'version_minor': PROTOCOL_VERSION_MINOR,
- 'version_patch': PROTOCOL_VERSION_PATCH,
+ 'version_major': self.protocol_version[0],
+ 'version_minor': self.protocol_version[1],
+ 'version_patch': self.protocol_version[2],
'client_code': 2, # fixed value defined by protocol
}
if self.username and self.password:
@@ -68,24 +70,3 @@
len(self.password),
])
return self.handshake_struct.from_python(handshake_data)
-
-
-def read_response(client):
- response_start = Struct([
- ('length', Int),
- ('op_code', Byte),
- ])
- start_class, start_buffer = response_start.parse(client)
- start = start_class.from_buffer_copy(start_buffer)
- data = response_start.to_python(start)
- if data['op_code'] == 0:
- response_end = Struct([
- ('version_major', Short),
- ('version_minor', Short),
- ('version_patch', Short),
- ('message', String),
- ])
- end_class, end_buffer = response_end.parse(client)
- end = end_class.from_buffer_copy(end_buffer)
- data.update(response_end.to_python(end))
- return data
diff --git a/pyignite/connection/ssl.py b/pyignite/connection/ssl.py
index 044b103..9773860 100644
--- a/pyignite/connection/ssl.py
+++ b/pyignite/connection/ssl.py
@@ -19,24 +19,24 @@
from pyignite.constants import *
-def wrap(client, _socket):
+def wrap(conn: 'Connection', _socket):
""" Wrap socket in SSL wrapper. """
- if client.init_kwargs.get('use_ssl', None):
- keyfile = client.init_kwargs.get('ssl_keyfile', None)
- certfile = client.init_kwargs.get('ssl_certfile', None)
+ if conn.ssl_params.get('use_ssl', None):
+ keyfile = conn.ssl_params.get('ssl_keyfile', None)
+ certfile = conn.ssl_params.get('ssl_certfile', None)
if keyfile and not certfile:
raise ValueError("certfile must be specified")
- password = client.init_kwargs.get('ssl_keyfile_password', None)
- ssl_version = client.init_kwargs.get('ssl_version', SSL_DEFAULT_VERSION)
- ciphers = client.init_kwargs.get('ssl_ciphers', SSL_DEFAULT_CIPHERS)
- cert_reqs = client.init_kwargs.get('ssl_cert_reqs', ssl.CERT_NONE)
- ca_certs = client.init_kwargs.get('ssl_ca_certfile', None)
+ password = conn.ssl_params.get('ssl_keyfile_password', None)
+ ssl_version = conn.ssl_params.get('ssl_version', SSL_DEFAULT_VERSION)
+ ciphers = conn.ssl_params.get('ssl_ciphers', SSL_DEFAULT_CIPHERS)
+ cert_reqs = conn.ssl_params.get('ssl_cert_reqs', ssl.CERT_NONE)
+ ca_certs = conn.ssl_params.get('ssl_ca_certfile', None)
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
-
+
if ca_certs:
context.load_verify_locations(ca_certs)
if certfile:
diff --git a/pyignite/constants.py b/pyignite/constants.py
index 78c9379..fc840d6 100644
--- a/pyignite/constants.py
+++ b/pyignite/constants.py
@@ -21,16 +21,23 @@
__all__ = [
- 'PROTOCOL_VERSION_MAJOR', 'PROTOCOL_VERSION_MINOR',
- 'PROTOCOL_VERSION_PATCH', 'MAX_LONG', 'MIN_LONG', 'MAX_INT', 'MIN_INT',
+ 'PROTOCOLS', 'MAX_LONG', 'MIN_LONG', 'MAX_INT', 'MIN_INT',
'PROTOCOL_BYTE_ORDER', 'PROTOCOL_STRING_ENCODING',
'PROTOCOL_CHAR_ENCODING', 'SSL_DEFAULT_VERSION', 'SSL_DEFAULT_CIPHERS',
'FNV1_OFFSET_BASIS', 'FNV1_PRIME',
'IGNITE_DEFAULT_HOST', 'IGNITE_DEFAULT_PORT',
+ 'RHF_ERROR', 'RHF_TOPOLOGY_CHANGED', 'AFFINITY_DELAY', 'AFFINITY_RETRIES',
+ 'RECONNECT_BACKOFF_SEQUENCE',
]
+PROTOCOLS = {
+ (1, 4, 0),
+ (1, 3, 0),
+ (1, 2, 0),
+}
+
PROTOCOL_VERSION_MAJOR = 1
-PROTOCOL_VERSION_MINOR = 2
+PROTOCOL_VERSION_MINOR = 4
PROTOCOL_VERSION_PATCH = 0
MAX_LONG = 9223372036854775807
@@ -50,3 +57,12 @@
IGNITE_DEFAULT_HOST = 'localhost'
IGNITE_DEFAULT_PORT = 10800
+
+# response header flags
+RHF_ERROR = 1
+RHF_TOPOLOGY_CHANGED = 2
+
+AFFINITY_DELAY = 0.01
+AFFINITY_RETRIES = 32
+
+RECONNECT_BACKOFF_SEQUENCE = [0, 1, 1, 2, 3, 5, 8, 13]
diff --git a/pyignite/datatypes/base.py b/pyignite/datatypes/base.py
index a0522c0..25b5b1e 100644
--- a/pyignite/datatypes/base.py
+++ b/pyignite/datatypes/base.py
@@ -13,10 +13,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from abc import ABC
+class IgniteDataTypeProps:
+ """
+ Add `type_name` and `type_id` properties for all classes and objects
+ of Ignite type hierarchy.
+ """
+ @property
+ def type_name(self) -> str:
+ """ Binary object type name. """
+ return getattr(self, '_type_name', None)
+
+ @property
+ def type_id(self) -> int:
+ """ Binary object type ID. """
+ from pyignite.utils import entity_id
+
+ return getattr(
+ self,
+ '_type_id',
+ entity_id(getattr(self, '_type_name', None))
+ )
-class IgniteDataType(ABC):
+class IgniteDataTypeMeta(type, IgniteDataTypeProps):
+ """
+ Class variant of Ignate data type properties.
+ """
+ pass
+
+
+class IgniteDataType(metaclass=IgniteDataTypeMeta):
"""
This is a base class for all Ignite data types, a.k.a. parser/constructor
classes, both object and payload varieties.
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index 87e5130..d9ce36a 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -16,13 +16,15 @@
from collections import OrderedDict
import ctypes
import inspect
+from typing import Iterable, Dict
from pyignite.constants import *
from pyignite.exceptions import ParseError
-from pyignite.utils import entity_id, hashcode, is_hinted
from .base import IgniteDataType
from .internal import AnyDataObject, infer_from_python
from .type_codes import *
+from .type_ids import *
+from .type_names import *
__all__ = [
@@ -33,11 +35,21 @@
class ObjectArrayObject(IgniteDataType):
"""
- Array of objects of any type. Its Python representation is
- tuple(type_id, iterable of any type).
+ Array of Ignite objects of any consistent type. Its Python representation
+ is tuple(type_id, iterable of any type). The only type ID that makes sense
+ in Python client is :py:attr:`~OBJECT`, that corresponds directly to
+ the root object type in Java type hierarchy (`java.lang.Object`).
"""
+ OBJECT = -1
+
+ _type_name = NAME_OBJ_ARR
+ _type_id = TYPE_OBJ_ARR
type_code = TC_OBJECT_ARRAY
- type_or_id_name = 'type_id'
+
+ @staticmethod
+ def hashcode(value: Iterable) -> int:
+ # Arrays are not supported as keys at the moment.
+ return 0
@classmethod
def build_header(cls):
@@ -86,7 +98,7 @@
*args, **kwargs
)
)
- return getattr(ctype_object, cls.type_or_id_name), result
+ return ctype_object.type_id, result
@classmethod
def from_python(cls, value):
@@ -103,12 +115,12 @@
value = [value]
length = 1
header.length = length
- setattr(header, cls.type_or_id_name, type_or_id)
- buffer = bytes(header)
+ header.type_id = type_or_id
+ buffer = bytearray(header)
for x in value:
buffer += infer_from_python(x)
- return buffer
+ return bytes(buffer)
class WrappedDataObject(IgniteDataType):
@@ -167,19 +179,53 @@
raise ParseError('Send unwrapped data.')
-class CollectionObject(ObjectArrayObject):
+class CollectionObject(IgniteDataType):
"""
- Just like object array, but contains deserialization type hint instead of
- type id. This hint is also useless in Python, because the list type along
- covers all the use cases.
+ Similar to object array, but contains platform-agnostic deserialization
+ type hint instead of type ID.
- Also represented as tuple(type_id, iterable of any type) in Python.
+ Represented as tuple(hint, iterable of any type) in Python. Hints are:
+
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.USER_SET` −
+ a set of unique Ignite thin data objects. The exact Java type of a set
+ is undefined,
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.USER_COL` −
+ a collection of Ignite thin data objects. The exact Java type
+ of a collection is undefined,
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.ARR_LIST` −
+ represents the `java.util.ArrayList` type,
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.LINKED_LIST` −
+ represents the `java.util.LinkedList` type,
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.HASH_SET`−
+ represents the `java.util.HashSet` type,
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.LINKED_HASH_SET` −
+ represents the `java.util.LinkedHashSet` type,
+ * :py:attr:`~pyignite.datatypes.complex.CollectionObject.SINGLETON_LIST` −
+ represents the return type of the `java.util.Collection.singletonList`
+ method.
+
+ It is safe to say that `USER_SET` (`set` in Python) and `USER_COL` (`list`)
+ can cover all the imaginable use cases from Python perspective.
"""
+ USER_SET = -1
+ USER_COL = 0
+ ARR_LIST = 1
+ LINKED_LIST = 2
+ HASH_SET = 3
+ LINKED_HASH_SET = 4
+ SINGLETON_LIST = 5
+
+ _type_name = NAME_COL
+ _type_id = TYPE_COL
type_code = TC_COLLECTION
- type_or_id_name = 'type'
pythonic = list
default = []
+ @staticmethod
+ def hashcode(value: Iterable) -> int:
+ # Collections are not supported as keys at the moment.
+ return 0
+
@classmethod
def build_header(cls):
return type(
@@ -195,6 +241,62 @@
}
)
+ @classmethod
+ def parse(cls, client: 'Client'):
+ header_class = cls.build_header()
+ buffer = client.recv(ctypes.sizeof(header_class))
+ header = header_class.from_buffer_copy(buffer)
+ fields = []
+
+ for i in range(header.length):
+ c_type, buffer_fragment = AnyDataObject.parse(client)
+ buffer += buffer_fragment
+ fields.append(('element_{}'.format(i), c_type))
+
+ final_class = type(
+ cls.__name__,
+ (header_class,),
+ {
+ '_pack_': 1,
+ '_fields_': fields,
+ }
+ )
+ return final_class, buffer
+
+ @classmethod
+ def to_python(cls, ctype_object, *args, **kwargs):
+ result = []
+ for i in range(ctype_object.length):
+ result.append(
+ AnyDataObject.to_python(
+ getattr(ctype_object, 'element_{}'.format(i)),
+ *args, **kwargs
+ )
+ )
+ return ctype_object.type, result
+
+ @classmethod
+ def from_python(cls, value):
+ type_or_id, value = value
+ header_class = cls.build_header()
+ header = header_class()
+ header.type_code = int.from_bytes(
+ cls.type_code,
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ try:
+ length = len(value)
+ except TypeError:
+ value = [value]
+ length = 1
+ header.length = length
+ header.type = type_or_id
+ buffer = bytearray(header)
+
+ for x in value:
+ buffer += infer_from_python(x)
+ return bytes(buffer)
+
class Map(IgniteDataType):
"""
@@ -203,9 +305,16 @@
Ignite does not track the order of key-value pairs in its caches, hence
the ordinary Python dict type, not the collections.OrderedDict.
"""
+ _type_name = NAME_MAP
+ _type_id = TYPE_MAP
HASH_MAP = 1
LINKED_HASH_MAP = 2
+ @staticmethod
+ def hashcode(value: Dict) -> int:
+ # Maps are not supported as keys at the moment.
+ return 0
+
@classmethod
def build_header(cls):
return type(
@@ -271,22 +380,25 @@
)
if hasattr(header, 'type'):
header.type = type_id
- buffer = bytes(header)
+ buffer = bytearray(header)
for k, v in value.items():
buffer += infer_from_python(k)
buffer += infer_from_python(v)
- return buffer
+ return bytes(buffer)
class MapObject(Map):
"""
- This is a dictionary type. Type conversion hint can be a `HASH_MAP`
- (ordinary dict) or `LINKED_HASH_MAP` (collections.OrderedDict).
+ This is a dictionary type.
- Keys and values in map are independent data objects, but `count`
- counts pairs. Very annoying.
+ Represented as tuple(type_id, value).
+
+ Type ID can be a :py:attr:`~HASH_MAP` (corresponds to an ordinary `dict`
+ in Python) or a :py:attr:`~LINKED_HASH_MAP` (`collections.OrderedDict`).
"""
+ _type_name = NAME_MAP
+ _type_id = TYPE_MAP
type_code = TC_MAP
pythonic = dict
default = {}
@@ -319,6 +431,7 @@
class BinaryObject(IgniteDataType):
+ _type_id = TYPE_BINARY_OBJ
type_code = TC_COMPLEX_OBJECT
USER_TYPE = 0x0001
@@ -328,6 +441,46 @@
OFFSET_TWO_BYTES = 0x0010
COMPACT_FOOTER = 0x0020
+ @staticmethod
+ def find_client():
+ """
+ A nice hack. Extracts the nearest `Client` instance from the
+ call stack.
+ """
+ from pyignite import Client
+ from pyignite.connection import Connection
+
+ frame = None
+ try:
+ for rec in inspect.stack()[2:]:
+ frame = rec[0]
+ code = frame.f_code
+ for varname in code.co_varnames:
+ suspect = frame.f_locals[varname]
+ if isinstance(suspect, Client):
+ return suspect
+ if isinstance(suspect, Connection):
+ return suspect.client
+ finally:
+ del frame
+
+ @staticmethod
+ def hashcode(
+ value: object, client: 'Client' = None, *args, **kwargs
+ ) -> int:
+ # binary objects's hashcode implementation is special in the sense
+ # that you need to fully serialize the object to calculate
+ # its hashcode
+ if value._hashcode is None:
+
+ # …and for to serialize it you need a Client instance
+ if client is None:
+ client = BinaryObject.find_client()
+
+ value._build(client)
+
+ return value._hashcode
+
@classmethod
def build_header(cls):
return type(
@@ -373,11 +526,12 @@
)
@staticmethod
- def get_dataclass(client: 'Client', header) -> OrderedDict:
+ def get_dataclass(conn: 'Connection', header) -> OrderedDict:
# get field names from outer space
- temp_conn = client.clone()
- result = temp_conn.query_binary_type(header.type_id, header.schema_id)
- temp_conn.close()
+ result = conn.client.query_binary_type(
+ header.type_id,
+ header.schema_id
+ )
if not result:
raise ParseError('Binary type is not registered')
return result
@@ -417,7 +571,7 @@
return final_class, buffer
@classmethod
- def to_python(cls, ctype_object, client: 'Client'=None, *args, **kwargs):
+ def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs):
if not client:
raise ParseError(
@@ -443,84 +597,19 @@
@classmethod
def from_python(cls, value: object):
- def find_client():
- """
- A nice hack. Extracts the nearest `Client` instance from the
- call stack.
- """
- from pyignite import Client
+ if getattr(value, '_buffer', None) is None:
+ client = cls.find_client()
- frame = None
- try:
- for rec in inspect.stack()[2:]:
- frame = rec[0]
- code = frame.f_code
- for varname in code.co_varnames:
- suspect = frame.f_locals[varname]
- if isinstance(suspect, Client):
- return suspect
- finally:
- del frame
-
- compact_footer = True # this is actually used
- client = find_client()
- if client:
# if no client can be found, the class of the `value` is discarded
# and the new dataclass is automatically registered later on
- client.register_binary_type(value.__class__)
- compact_footer = client.compact_footer
- else:
- raise Warning(
- 'Can not register binary type {}'.format(value.type_name)
- )
-
- # prepare header
- header_class = cls.build_header()
- header = header_class()
- header.type_code = int.from_bytes(
- cls.type_code,
- byteorder=PROTOCOL_BYTE_ORDER
- )
-
- header.flags = cls.USER_TYPE | cls.HAS_SCHEMA
- if compact_footer:
- header.flags |= cls.COMPACT_FOOTER
- header.version = value.version
- header.type_id = value.type_id
- header.schema_id = value.schema_id
-
- # create fields and calculate offsets
- field_buffer = b''
- offsets = [ctypes.sizeof(header_class)]
- schema_items = list(value.schema.items())
- for field_name, field_type in schema_items:
- partial_buffer = field_type.from_python(
- getattr(
- value, field_name, getattr(field_type, 'default', None)
+ if client:
+ client.register_binary_type(value.__class__)
+ else:
+ raise Warning(
+ 'Can not register binary type {}'.format(value.type_name)
)
- )
- offsets.append(max(offsets) + len(partial_buffer))
- field_buffer += partial_buffer
- offsets = offsets[:-1]
+ # build binary representation
+ value._build(client)
- # create footer
- if max(offsets, default=0) < 255:
- header.flags |= cls.OFFSET_ONE_BYTE
- elif max(offsets) < 65535:
- header.flags |= cls.OFFSET_TWO_BYTES
- schema_class = cls.schema_type(header.flags) * len(offsets)
- schema = schema_class()
- if compact_footer:
- for i, offset in enumerate(offsets):
- schema[i] = offset
- else:
- for i, offset in enumerate(offsets):
- schema[i].field_id = entity_id(schema_items[i][0])
- schema[i].offset = offset
- # calculate size and hash code
- header.schema_offset = ctypes.sizeof(header_class) + len(field_buffer)
- header.length = header.schema_offset + ctypes.sizeof(schema_class)
- header.hash_code = hashcode(field_buffer + bytes(schema))
-
- return bytes(header) + field_buffer + bytes(schema)
+ return value._buffer
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 844e0ef..9fd5d64 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -17,7 +17,7 @@
import ctypes
import decimal
from datetime import date, datetime, timedelta
-from typing import Any, Tuple
+from typing import Any, Tuple, Union, Callable
import uuid
import attr
@@ -28,10 +28,13 @@
from .type_codes import *
-__all__ = ['AnyDataArray', 'AnyDataObject', 'Struct', 'StructArray', 'tc_map']
+__all__ = [
+ 'AnyDataArray', 'AnyDataObject', 'Struct', 'StructArray', 'tc_map',
+ 'infer_from_python',
+]
-def tc_map(key: bytes, _memo_map: dict={}):
+def tc_map(key: bytes, _memo_map: dict = {}):
"""
Returns a default parser/generator class for the given type code.
@@ -108,6 +111,20 @@
return _memo_map[key]
+class Conditional:
+
+ def __init__(self, predicate1: Callable[[any], bool], predicate2: Callable[[any], bool], var1, var2):
+ self.predicate1 = predicate1
+ self.predicate2 = predicate2
+ self.var1 = var1
+ self.var2 = var2
+
+ def parse(self, client: 'Client', context):
+ return self.var1.parse(client) if self.predicate1(context) else self.var2.parse(client)
+
+ def to_python(self, ctype_object, context, *args, **kwargs):
+ return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context) else self.var2.to_python(ctype_object, *args, **kwargs)
+
@attr.s
class StructArray:
""" `counter_type` counter, followed by count*following structure. """
@@ -167,7 +184,7 @@
header_class = self.build_header_class()
header = header_class()
header.length = length
- buffer = bytes(header)
+ buffer = bytearray(header)
for i, v in enumerate(value):
for default_key, default_value in self.defaults.items():
@@ -175,7 +192,7 @@
for name, el_class in self.following:
buffer += el_class.from_python(v[name])
- return buffer
+ return bytes(buffer)
@attr.s
@@ -185,16 +202,22 @@
dict_type = attr.ib(default=OrderedDict)
defaults = attr.ib(type=dict, default={})
- def parse(self, client: 'Client') -> Tuple[type, bytes]:
+ def parse(
+ self, client: 'Client'
+ ) -> Tuple[ctypes.BigEndianStructure, bytes]:
buffer = b''
fields = []
+ values = {}
for name, c_type in self.fields:
- c_type, buffer_fragment = c_type.parse(client)
+ is_cond = isinstance(c_type, Conditional)
+ c_type, buffer_fragment = c_type.parse(client, values) if is_cond else c_type.parse(client)
buffer += buffer_fragment
fields.append((name, c_type))
+ values[name] = buffer_fragment
+
data_class = type(
'Struct',
(ctypes.LittleEndianStructure,),
@@ -206,11 +229,18 @@
return data_class, buffer
- def to_python(self, ctype_object, *args, **kwargs) -> Any:
+ def to_python(
+ self, ctype_object, *args, **kwargs
+ ) -> Union[dict, OrderedDict]:
result = self.dict_type()
for name, c_type in self.fields:
+ is_cond = isinstance(c_type, Conditional)
result[name] = c_type.to_python(
getattr(ctype_object, name),
+ result,
+ *args, **kwargs
+ ) if is_cond else c_type.to_python(
+ getattr(ctype_object, name),
*args, **kwargs
)
return result
@@ -296,7 +326,7 @@
"""
from pyignite.datatypes import (
LongObject, DoubleObject, String, BoolObject, Null, UUIDObject,
- DateObject, TimeObject, DecimalObject,
+ DateObject, TimeObject, DecimalObject, ByteArrayObject,
)
cls._python_map = {
@@ -304,6 +334,7 @@
float: DoubleObject,
str: String,
bytes: String,
+ bytearray: ByteArrayObject,
bool: BoolObject,
type(None): Null,
uuid.UUID: UUIDObject,
@@ -340,7 +371,7 @@
@classmethod
def map_python_type(cls, value):
from pyignite.datatypes import (
- MapObject, ObjectArrayObject, BinaryObject,
+ MapObject, CollectionObject, BinaryObject,
)
if cls._python_map is None:
@@ -349,12 +380,12 @@
cls._init_python_array_map()
value_type = type(value)
- if is_iterable(value) and value_type is not str:
+ if is_iterable(value) and value_type not in (str, bytearray, bytes):
value_subtype = cls.get_subtype(value)
if value_subtype in cls._python_array_map:
return cls._python_array_map[value_subtype]
- # a little heuristics (order may be important)
+ # a little heuristics (order is important)
if all([
value_subtype is None,
len(value) == 2,
@@ -369,7 +400,9 @@
isinstance(value[0], int),
is_iterable(value[1]),
]):
- return ObjectArrayObject
+ return CollectionObject
+
+ # no default for ObjectArrayObject, sorry
raise TypeError(
'Type `array of {}` is invalid'.format(value_subtype)
@@ -465,8 +498,8 @@
value = [value]
length = 1
header.length = length
- buffer = bytes(header)
+ buffer = bytearray(header)
for x in value:
buffer += infer_from_python(x)
- return buffer
+ return bytes(buffer)
diff --git a/pyignite/datatypes/key_value.py b/pyignite/datatypes/key_value.py
index 0f21ac6..ee2ae7b 100644
--- a/pyignite/datatypes/key_value.py
+++ b/pyignite/datatypes/key_value.py
@@ -18,7 +18,9 @@
class PeekModes(ByteArray):
- ALL = 0
- NEAR = 1
- PRIMARY = 2
- BACKUP = 3
+ ALL = 1
+ NEAR = 2
+ PRIMARY = 4
+ BACKUP = 8
+ ONHEAP = 16
+ OFFHEAP = 32
diff --git a/pyignite/datatypes/null_object.py b/pyignite/datatypes/null_object.py
index a648e30..19b41c7 100644
--- a/pyignite/datatypes/null_object.py
+++ b/pyignite/datatypes/null_object.py
@@ -20,6 +20,7 @@
"""
import ctypes
+from typing import Any
from .base import IgniteDataType
from .type_codes import TC_NULL
@@ -33,6 +34,11 @@
pythonic = type(None)
_object_c_type = None
+ @staticmethod
+ def hashcode(value: Any) -> int:
+ # Null object can not be a cache key.
+ return 0
+
@classmethod
def build_c_type(cls):
if cls._object_c_type is None:
diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py
index d1e9f4e..23d070d 100644
--- a/pyignite/datatypes/primitive.py
+++ b/pyignite/datatypes/primitive.py
@@ -17,6 +17,8 @@
from pyignite.constants import *
from .base import IgniteDataType
+from .type_ids import *
+from .type_names import *
__all__ = [
@@ -38,7 +40,8 @@
- Char,
- Bool.
"""
-
+ _type_name = None
+ _type_id = None
c_type = None
@classmethod
@@ -55,30 +58,44 @@
class Byte(Primitive):
+ _type_name = NAME_BYTE
+ _type_id = TYPE_BYTE
c_type = ctypes.c_byte
class Short(Primitive):
+ _type_name = NAME_SHORT
+ _type_id = TYPE_SHORT
c_type = ctypes.c_short
class Int(Primitive):
+ _type_name = NAME_INT
+ _type_id = TYPE_INT
c_type = ctypes.c_int
class Long(Primitive):
+ _type_name = NAME_LONG
+ _type_id = TYPE_LONG
c_type = ctypes.c_longlong
class Float(Primitive):
+ _type_name = NAME_FLOAT
+ _type_id = TYPE_FLOAT
c_type = ctypes.c_float
class Double(Primitive):
+ _type_name = NAME_DOUBLE
+ _type_id = TYPE_DOUBLE
c_type = ctypes.c_double
class Char(Primitive):
+ _type_name = NAME_CHAR
+ _type_id = TYPE_CHAR
c_type = ctypes.c_short
@classmethod
@@ -103,4 +120,6 @@
class Bool(Primitive):
+ _type_name = NAME_BOOLEAN
+ _type_id = TYPE_BOOLEAN
c_type = ctypes.c_bool
diff --git a/pyignite/datatypes/primitive_arrays.py b/pyignite/datatypes/primitive_arrays.py
index 6a93191..bca4fd9 100644
--- a/pyignite/datatypes/primitive_arrays.py
+++ b/pyignite/datatypes/primitive_arrays.py
@@ -14,11 +14,14 @@
# limitations under the License.
import ctypes
+from typing import Any
from pyignite.constants import *
from .base import IgniteDataType
from .primitive import *
from .type_codes import *
+from .type_ids import *
+from .type_names import *
__all__ = [
@@ -33,9 +36,16 @@
"""
Base class for array of primitives. Payload-only.
"""
+ _type_name = None
+ _type_id = None
primitive_type = None
type_code = None
+ @staticmethod
+ def hashcode(value: Any) -> int:
+ # Arrays are not supported as keys at the moment.
+ return 0
+
@classmethod
def build_header_class(cls):
return type(
@@ -87,49 +97,79 @@
)
length = len(value)
header.length = length
- buffer = bytes(header)
+ buffer = bytearray(header)
for x in value:
buffer += cls.primitive_type.from_python(x)
- return buffer
+ return bytes(buffer)
class ByteArray(PrimitiveArray):
+ _type_name = NAME_BYTE_ARR
+ _type_id = TYPE_BYTE_ARR
primitive_type = Byte
type_code = TC_BYTE_ARRAY
+ @classmethod
+ def to_python(cls, ctype_object, *args, **kwargs):
+ return bytearray(ctype_object.data)
+
+ @classmethod
+ def from_python(cls, value):
+ header_class = cls.build_header_class()
+ header = header_class()
+
+ # no need to iterate on bytes or bytearray
+ # to create ByteArray data buffer
+ header.length = len(value)
+ return bytes(bytearray(header) + bytearray(value))
+
class ShortArray(PrimitiveArray):
+ _type_name = NAME_SHORT_ARR
+ _type_id = TYPE_SHORT_ARR
primitive_type = Short
type_code = TC_SHORT_ARRAY
class IntArray(PrimitiveArray):
+ _type_name = NAME_INT_ARR
+ _type_id = TYPE_INT_ARR
primitive_type = Int
type_code = TC_INT_ARRAY
class LongArray(PrimitiveArray):
+ _type_name = NAME_LONG_ARR
+ _type_id = TYPE_LONG_ARR
primitive_type = Long
type_code = TC_LONG_ARRAY
class FloatArray(PrimitiveArray):
+ _type_name = NAME_FLOAT_ARR
+ _type_id = TYPE_FLOAT_ARR
primitive_type = Float
type_code = TC_FLOAT_ARRAY
class DoubleArray(PrimitiveArray):
+ _type_name = NAME_DOUBLE_ARR
+ _type_id = TYPE_DOUBLE_ARR
primitive_type = Double
type_code = TC_DOUBLE_ARRAY
class CharArray(PrimitiveArray):
+ _type_name = NAME_CHAR_ARR
+ _type_id = TYPE_CHAR_ARR
primitive_type = Char
type_code = TC_CHAR_ARRAY
class BoolArray(PrimitiveArray):
+ _type_name = NAME_BOOLEAN_ARR
+ _type_id = TYPE_BOOLEAN_ARR
primitive_type = Bool
type_code = TC_BOOL_ARRAY
@@ -138,6 +178,8 @@
"""
Base class for primitive array object. Type code plus payload.
"""
+ _type_name = None
+ _type_id = None
pythonic = list
default = []
@@ -157,36 +199,83 @@
class ByteArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_BYTE_ARR
+ _type_id = TYPE_BYTE_ARR
primitive_type = Byte
type_code = TC_BYTE_ARRAY
+ @classmethod
+ def to_python(cls, ctype_object, *args, **kwargs):
+ return ByteArray.to_python(ctype_object, *args, **kwargs)
+
+ @classmethod
+ def from_python(cls, value):
+ header_class = cls.build_header_class()
+ header = header_class()
+ header.type_code = int.from_bytes(
+ cls.type_code,
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+
+ # no need to iterate on bytes or bytearray
+ # to create ByteArrayObject data buffer
+ header.length = len(value)
+ try:
+ # `value` is a `bytearray` or a sequence of integer values
+ # in range 0 to 255
+ value_buffer = bytearray(value)
+ except ValueError:
+ # `value` is a sequence of integers in range -128 to 127
+ value_buffer = bytearray()
+ for ch in value:
+ if -128 <= ch <= 255:
+ value_buffer.append(ctypes.c_ubyte(ch).value)
+ else:
+ raise ValueError(
+ 'byte must be in range(-128, 256)!'
+ ) from None
+
+ return bytes(bytearray(header) + value_buffer)
+
class ShortArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_SHORT_ARR
+ _type_id = TYPE_SHORT_ARR
primitive_type = Short
type_code = TC_SHORT_ARRAY
class IntArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_INT_ARR
+ _type_id = TYPE_INT_ARR
primitive_type = Int
type_code = TC_INT_ARRAY
class LongArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_LONG_ARR
+ _type_id = TYPE_LONG_ARR
primitive_type = Long
type_code = TC_LONG_ARRAY
class FloatArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_FLOAT_ARR
+ _type_id = TYPE_FLOAT_ARR
primitive_type = Float
type_code = TC_FLOAT_ARRAY
class DoubleArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_DOUBLE_ARR
+ _type_id = TYPE_DOUBLE_ARR
primitive_type = Double
type_code = TC_DOUBLE_ARRAY
class CharArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_CHAR_ARR
+ _type_id = TYPE_CHAR_ARR
primitive_type = Char
type_code = TC_CHAR_ARRAY
@@ -204,5 +293,7 @@
class BoolArrayObject(PrimitiveArrayObject):
+ _type_name = NAME_BOOLEAN_ARR
+ _type_id = TYPE_BOOLEAN_ARR
primitive_type = Bool
type_code = TC_BOOL_ARRAY
diff --git a/pyignite/datatypes/primitive_objects.py b/pyignite/datatypes/primitive_objects.py
index 105acee..0bd0ec6 100644
--- a/pyignite/datatypes/primitive_objects.py
+++ b/pyignite/datatypes/primitive_objects.py
@@ -16,8 +16,11 @@
import ctypes
from pyignite.constants import *
+from pyignite.utils import unsigned
from .base import IgniteDataType
from .type_codes import *
+from .type_ids import *
+from .type_names import *
__all__ = [
@@ -33,10 +36,11 @@
Primitive data objects are built of primitive data prepended by
the corresponding type code.
"""
-
+ _type_name = None
+ _type_id = None
+ _object_c_type = None
c_type = None
type_code = None
- _object_c_type = None
@classmethod
def build_c_type(cls):
@@ -77,46 +81,89 @@
class ByteObject(DataObject):
+ _type_name = NAME_BYTE
+ _type_id = TYPE_BYTE
c_type = ctypes.c_byte
type_code = TC_BYTE
pythonic = int
default = 0
+ @staticmethod
+ def hashcode(value: int, *args, **kwargs) -> int:
+ return value
+
class ShortObject(DataObject):
+ _type_name = NAME_SHORT
+ _type_id = TYPE_SHORT
c_type = ctypes.c_short
type_code = TC_SHORT
pythonic = int
default = 0
+ @staticmethod
+ def hashcode(value: int, *args, **kwargs) -> int:
+ return value
+
class IntObject(DataObject):
+ _type_name = NAME_INT
+ _type_id = TYPE_INT
c_type = ctypes.c_int
type_code = TC_INT
pythonic = int
default = 0
+ @staticmethod
+ def hashcode(value: int, *args, **kwargs) -> int:
+ return value
+
class LongObject(DataObject):
+ _type_name = NAME_LONG
+ _type_id = TYPE_LONG
c_type = ctypes.c_longlong
type_code = TC_LONG
pythonic = int
default = 0
+ @staticmethod
+ def hashcode(value: int, *args, **kwargs) -> int:
+ return value ^ (unsigned(value, ctypes.c_ulonglong) >> 32)
+
class FloatObject(DataObject):
+ _type_name = NAME_FLOAT
+ _type_id = TYPE_FLOAT
c_type = ctypes.c_float
type_code = TC_FLOAT
pythonic = float
default = 0.0
+ @staticmethod
+ def hashcode(value: float, *args, **kwargs) -> int:
+ return ctypes.cast(
+ ctypes.pointer(ctypes.c_float(value)),
+ ctypes.POINTER(ctypes.c_int)
+ ).contents.value
+
class DoubleObject(DataObject):
+ _type_name = NAME_DOUBLE
+ _type_id = TYPE_DOUBLE
c_type = ctypes.c_double
type_code = TC_DOUBLE
pythonic = float
default = 0.0
+ @staticmethod
+ def hashcode(value: float, *args, **kwargs) -> int:
+ bits = ctypes.cast(
+ ctypes.pointer(ctypes.c_double(value)),
+ ctypes.POINTER(ctypes.c_longlong)
+ ).contents.value
+ return (bits & 0xffffffff) ^ (unsigned(bits, ctypes.c_longlong) >> 32)
+
class CharObject(DataObject):
"""
@@ -125,11 +172,17 @@
to/from UTF-8 to keep the coding hassle to minimum. Bear in mind
though: decoded character may take 1..4 bytes in UTF-8.
"""
+ _type_name = NAME_CHAR
+ _type_id = TYPE_CHAR
c_type = ctypes.c_short
type_code = TC_CHAR
pythonic = str
default = ' '
+ @staticmethod
+ def hashcode(value: str, *args, **kwargs) -> int:
+ return ord(value)
+
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
return ctype_object.value.to_bytes(
@@ -152,7 +205,13 @@
class BoolObject(DataObject):
+ _type_name = NAME_BOOLEAN
+ _type_id = TYPE_BOOLEAN
c_type = ctypes.c_bool
type_code = TC_BOOL
pythonic = bool
default = False
+
+ @staticmethod
+ def hashcode(value: bool, *args, **kwargs) -> int:
+ return 1231 if value else 1237
diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py
index 8808da2..c65cae4 100644
--- a/pyignite/datatypes/standard.py
+++ b/pyignite/datatypes/standard.py
@@ -17,11 +17,15 @@
from datetime import date, datetime, time, timedelta
import decimal
from math import ceil
+from typing import Any, Tuple
import uuid
from pyignite.constants import *
+from pyignite.utils import datetime_hashcode, decimal_hashcode, hashcode
from .base import IgniteDataType
from .type_codes import *
+from .type_ids import *
+from .type_names import *
from .null_object import Null
@@ -41,6 +45,8 @@
class StandardObject(IgniteDataType):
+ _type_name = None
+ _type_id = None
type_code = None
@classmethod
@@ -64,9 +70,15 @@
Pascal-style string: `c_int` counter, followed by count*bytes.
UTF-8-encoded, so that one character may take 1 to 4 bytes.
"""
+ _type_name = NAME_STRING
+ _type_id = TYPE_STRING
type_code = TC_STRING
pythonic = str
+ @staticmethod
+ def hashcode(value: str, *args, **kwargs) -> int:
+ return hashcode(value)
+
@classmethod
def build_c_type(cls, length: int):
return type(
@@ -127,10 +139,16 @@
class DecimalObject(IgniteDataType):
+ _type_name = NAME_DECIMAL
+ _type_id = TYPE_DECIMAL
type_code = TC_DECIMAL
pythonic = decimal.Decimal
default = decimal.Decimal('0.00')
+ @staticmethod
+ def hashcode(value: decimal.Decimal, *args, **kwargs) -> int:
+ return decimal_hashcode(value)
+
@classmethod
def build_c_header(cls):
return type(
@@ -251,11 +269,22 @@
and :py:meth:`~pyignite.datatypes.standard.UUIDObject.from_python` methods
is changed for compatibility with `java.util.UUID`.
"""
- type_code = TC_UUID
+ _type_name = NAME_UUID
+ _type_id = TYPE_UUID
_object_c_type = None
+ type_code = TC_UUID
UUID_BYTE_ORDER = (7, 6, 5, 4, 3, 2, 1, 0, 15, 14, 13, 12, 11, 10, 9, 8)
+ UUID_BYTE_ORDER = (7, 6, 5, 4, 3, 2, 1, 0, 15, 14, 13, 12, 11, 10, 9, 8)
+
+ @staticmethod
+ def hashcode(value: 'UUID', *args, **kwargs) -> int:
+ msb = value.int >> 64
+ lsb = value.int & 0xffffffffffffffff
+ hilo = msb ^ lsb
+ return (hilo >> 32) ^ (hilo & 0xffffffff)
+
@classmethod
def build_c_type(cls):
if cls._object_c_type is None:
@@ -308,10 +337,16 @@
`epoch` and `fraction` stored separately and represented as
tuple(datetime.datetime, integer).
"""
+ _type_name = NAME_TIMESTAMP
+ _type_id = TYPE_TIMESTAMP
+ _object_c_type = None
type_code = TC_TIMESTAMP
pythonic = tuple
default = (datetime(1970, 1, 1), 0)
- _object_c_type = None
+
+ @staticmethod
+ def hashcode(value: Tuple[datetime, int], *args, **kwargs) -> int:
+ return datetime_hashcode(int(value[0].timestamp() * 1000))
@classmethod
def build_c_type(cls):
@@ -364,10 +399,16 @@
Represented as a naive datetime.datetime in Python.
"""
+ _type_name = NAME_DATE
+ _type_id = TYPE_DATE
+ _object_c_type = None
type_code = TC_DATE
pythonic = datetime
default = datetime(1970, 1, 1)
- _object_c_type = None
+
+ @staticmethod
+ def hashcode(value: datetime, *args, **kwargs) -> int:
+ return datetime_hashcode(int(value.timestamp() * 1000))
@classmethod
def build_c_type(cls):
@@ -416,10 +457,16 @@
Represented as a datetime.timedelta in Python.
"""
+ _type_name = NAME_TIME
+ _type_id = TYPE_TIME
+ _object_c_type = None
type_code = TC_TIME
pythonic = timedelta
default = timedelta()
- _object_c_type = None
+
+ @staticmethod
+ def hashcode(value: timedelta, *args, **kwargs) -> int:
+ return datetime_hashcode(int(value.total_seconds() * 1000))
@classmethod
def build_c_type(cls):
@@ -468,8 +515,10 @@
(using language-specific type serialization is a good way to kill the
interoperability though), so it represented by tuple(int, int) in Python.
"""
- type_code = TC_ENUM
+ _type_name = 'Enum'
+ _type_id = TYPE_ENUM
_object_c_type = None
+ type_code = TC_ENUM
@classmethod
def build_c_type(cls):
@@ -518,6 +567,8 @@
"""
Another way of representing the enum type. Same, but different.
"""
+ _type_name = 'Enum'
+ _type_id = TYPE_BINARY_ENUM
type_code = TC_BINARY_ENUM
@@ -525,9 +576,16 @@
"""
Base class for array of primitives. Payload-only.
"""
+ _type_name = None
+ _type_id = None
standard_type = None
type_code = None
+ @staticmethod
+ def hashcode(value: Any) -> int:
+ # Arrays are not supported as keys at the moment.
+ return 0
+
@classmethod
def build_header_class(cls):
return type(
@@ -585,11 +643,11 @@
)
length = len(value)
header.length = length
- buffer = bytes(header)
+ buffer = bytearray(header)
for x in value:
buffer += cls.standard_type.from_python(x)
- return buffer
+ return bytes(buffer)
class StringArray(StandardArray):
@@ -599,34 +657,50 @@
List(str) in Python.
"""
+ _type_name = NAME_STRING_ARR
+ _type_id = TYPE_STRING_ARR
standard_type = String
class DecimalArray(StandardArray):
+ _type_name = NAME_DECIMAL_ARR
+ _type_id = TYPE_DECIMAL_ARR
standard_type = DecimalObject
class UUIDArray(StandardArray):
+ _type_name = NAME_UUID_ARR
+ _type_id = TYPE_UUID_ARR
standard_type = UUIDObject
class TimestampArray(StandardArray):
+ _type_name = NAME_TIMESTAMP_ARR
+ _type_id = TYPE_TIMESTAMP_ARR
standard_type = TimestampObject
class DateArray(StandardArray):
+ _type_name = NAME_DATE_ARR
+ _type_id = TYPE_DATE_ARR
standard_type = DateObject
class TimeArray(StandardArray):
+ _type_name = NAME_TIME_ARR
+ _type_id = TYPE_TIME_ARR
standard_type = TimeObject
class EnumArray(StandardArray):
+ _type_name = 'Enum[]'
+ _type_id = TYPE_ENUM_ARR
standard_type = EnumObject
class StandardArrayObject(StandardArray):
+ _type_name = None
+ _type_id = None
pythonic = list
default = []
@@ -647,18 +721,24 @@
class StringArrayObject(StandardArrayObject):
""" List of strings. """
+ _type_name = NAME_STRING_ARR
+ _type_id = TYPE_STRING_ARR
standard_type = String
type_code = TC_STRING_ARRAY
class DecimalArrayObject(StandardArrayObject):
""" List of decimal.Decimal objects. """
+ _type_name = NAME_DECIMAL_ARR
+ _type_id = TYPE_DECIMAL_ARR
standard_type = DecimalObject
type_code = TC_DECIMAL_ARRAY
class UUIDArrayObject(StandardArrayObject):
- """ Translated into Python as a list(uuid.UUID)"""
+ """ Translated into Python as a list(uuid.UUID). """
+ _type_name = NAME_UUID_ARR
+ _type_id = TYPE_UUID_ARR
standard_type = UUIDObject
type_code = TC_UUID_ARRAY
@@ -667,18 +747,24 @@
"""
Translated into Python as a list of (datetime.datetime, integer) tuples.
"""
+ _type_name = NAME_TIMESTAMP_ARR
+ _type_id = TYPE_TIMESTAMP_ARR
standard_type = TimestampObject
type_code = TC_TIMESTAMP_ARRAY
class DateArrayObject(StandardArrayObject):
""" List of datetime.datetime type values. """
+ _type_name = NAME_DATE_ARR
+ _type_id = TYPE_DATE_ARR
standard_type = DateObject
type_code = TC_DATE_ARRAY
class TimeArrayObject(StandardArrayObject):
""" List of datetime.timedelta type values. """
+ _type_name = NAME_TIME_ARR
+ _type_id = TYPE_TIME_ARR
standard_type = TimeObject
type_code = TC_TIME_ARRAY
@@ -688,6 +774,8 @@
Array of (int, int) tuples, plus it holds a `type_id` in its header.
The only `type_id` value of -1 (user type) works from Python perspective.
"""
+ _type_name = 'Enum[]'
+ _type_id = TYPE_ENUM_ARR
standard_type = EnumObject
type_code = TC_ENUM_ARRAY
@@ -719,11 +807,11 @@
length = len(value)
header.length = length
header.type_id = type_id
- buffer = bytes(header)
+ buffer = bytearray(header)
for x in value:
buffer += cls.standard_type.from_python(x)
- return buffer
+ return bytes(buffer)
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
diff --git a/pyignite/datatypes/type_ids.py b/pyignite/datatypes/type_ids.py
new file mode 100644
index 0000000..be2d9c3
--- /dev/null
+++ b/pyignite/datatypes/type_ids.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+TYPE_BYTE = 1
+TYPE_SHORT = 2
+TYPE_INT = 3
+TYPE_LONG = 4
+TYPE_FLOAT = 5
+TYPE_DOUBLE = 6
+TYPE_CHAR = 7
+TYPE_BOOLEAN = 8
+TYPE_STRING = 9
+TYPE_UUID = 10
+TYPE_DATE = 11
+TYPE_BYTE_ARR = 12
+TYPE_SHORT_ARR = 13
+TYPE_INT_ARR = 14
+TYPE_LONG_ARR = 15
+TYPE_FLOAT_ARR = 16
+TYPE_DOUBLE_ARR = 17
+TYPE_CHAR_ARR = 18
+TYPE_BOOLEAN_ARR = 19
+TYPE_STRING_ARR = 20
+TYPE_UUID_ARR = 21
+TYPE_DATE_ARR = 22
+TYPE_OBJ_ARR = 23
+TYPE_COL = 24
+TYPE_MAP = 25
+TYPE_BINARY_OBJ = 27
+TYPE_ENUM = 28
+TYPE_ENUM_ARR = 29
+TYPE_DECIMAL = 30
+TYPE_DECIMAL_ARR = 31
+TYPE_CLASS = 32
+TYPE_TIMESTAMP = 33
+TYPE_TIMESTAMP_ARR = 34
+TYPE_PROXY = 35
+TYPE_TIME = 36
+TYPE_TIME_ARR = 37
+TYPE_BINARY_ENUM = 38
diff --git a/pyignite/datatypes/type_names.py b/pyignite/datatypes/type_names.py
new file mode 100644
index 0000000..08ce75d
--- /dev/null
+++ b/pyignite/datatypes/type_names.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+NAME_BYTE = 'java.lang.Byte'
+NAME_SHORT = 'java.lang.Short'
+NAME_INT = 'java.lang.Integer'
+NAME_LONG = 'java.lang.Long'
+NAME_FLOAT = 'java.lang.Float'
+NAME_DOUBLE = 'java.land.Double'
+NAME_CHAR = 'java.lang.Character'
+NAME_BOOLEAN = 'java.lang.Boolean'
+NAME_STRING = 'java.lang.String'
+NAME_UUID = 'java.util.UUID'
+NAME_DATE = 'java.util.Date'
+NAME_BYTE_ARR = 'class [B'
+NAME_SHORT_ARR = 'class [S'
+NAME_INT_ARR = 'class [I'
+NAME_LONG_ARR = 'class [J'
+NAME_FLOAT_ARR = 'class [F'
+NAME_DOUBLE_ARR = 'class [D'
+NAME_CHAR_ARR = 'class [C'
+NAME_BOOLEAN_ARR = 'class [Z'
+NAME_STRING_ARR = 'class [Ljava.lang.String;'
+NAME_UUID_ARR = 'class [Ljava.util.UUID;'
+NAME_DATE_ARR = 'class [Ljava.util.Date;'
+NAME_OBJ_ARR = 'class [Ljava.lang.Object;'
+NAME_COL = 'java.util.Collection'
+NAME_MAP = 'java.util.Map'
+NAME_DECIMAL = 'java.math.BigDecimal'
+NAME_DECIMAL_ARR = 'class [Ljava.math.BigDecimal;'
+NAME_TIMESTAMP = 'java.sql.Timestamp'
+NAME_TIMESTAMP_ARR = 'class [Ljava.sql.Timestamp;'
+NAME_TIME = 'java.sql.Time'
+NAME_TIME_ARR = 'class [Ljava.sql.Time;'
diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py
index 2bc5996..1b41d32 100644
--- a/pyignite/exceptions.py
+++ b/pyignite/exceptions.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Tuple
from socket import error as SocketError
@@ -30,7 +31,10 @@
as defined in
https://apacheignite.readme.io/docs/binary-client-protocol#section-handshake
"""
- pass
+
+ def __init__(self, expected_version: Tuple[int, int, int], message: str):
+ self.expected_version = expected_version
+ self.message = message
class ReconnectError(Exception):
@@ -78,3 +82,6 @@
An error in SQL query.
"""
pass
+
+
+connection_errors = (IOError, OSError)
diff --git a/pyignite/queries/__init__.py b/pyignite/queries/__init__.py
index 2c2d254..3029f87 100644
--- a/pyignite/queries/__init__.py
+++ b/pyignite/queries/__init__.py
@@ -21,319 +21,4 @@
:mod:`pyignite.datatypes` binary parser/generator classes.
"""
-from collections import OrderedDict
-import ctypes
-from random import randint
-
-import attr
-
-from pyignite.api.result import APIResult
-from pyignite.constants import *
-from pyignite.datatypes import (
- AnyDataObject, Bool, Int, Long, String, StringArray, Struct,
-)
-from .op_codes import *
-
-
-@attr.s
-class Response:
- following = attr.ib(type=list, factory=list)
- _response_header = None
-
- def __attrs_post_init__(self):
- # replace None with empty list
- self.following = self.following or []
-
- @classmethod
- def build_header(cls):
- if cls._response_header is None:
- cls._response_header = type(
- 'ResponseHeader',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', ctypes.c_int),
- ('query_id', ctypes.c_longlong),
- ('status_code', ctypes.c_int),
- ],
- },
- )
- return cls._response_header
-
- def parse(self, client: 'Client'):
- header_class = self.build_header()
- buffer = client.recv(ctypes.sizeof(header_class))
- header = header_class.from_buffer_copy(buffer)
- fields = []
-
- if header.status_code == OP_SUCCESS:
- for name, ignite_type in self.following:
- c_type, buffer_fragment = ignite_type.parse(client)
- buffer += buffer_fragment
- fields.append((name, c_type))
- else:
- c_type, buffer_fragment = String.parse(client)
- buffer += buffer_fragment
- fields.append(('error_message', c_type))
-
- response_class = type(
- 'Response',
- (header_class,),
- {
- '_pack_': 1,
- '_fields_': fields,
- }
- )
- return response_class, buffer
-
- def to_python(self, ctype_object, *args, **kwargs):
- result = OrderedDict()
-
- for name, c_type in self.following:
- result[name] = c_type.to_python(
- getattr(ctype_object, name),
- *args, **kwargs
- )
-
- return result if result else None
-
-
-@attr.s
-class SQLResponse(Response):
- """
- The response class of SQL functions is special in the way the row-column
- data is counted in it. Basically, Ignite thin client API is following a
- “counter right before the counted objects” rule in most of its parts.
- SQL ops are breaking this rule.
- """
- include_field_names = attr.ib(type=bool, default=False)
- has_cursor = attr.ib(type=bool, default=False)
-
- def fields_or_field_count(self):
- if self.include_field_names:
- return 'fields', StringArray
- return 'field_count', Int
-
- def parse(self, client: 'Client'):
- header_class = self.build_header()
- buffer = client.recv(ctypes.sizeof(header_class))
- header = header_class.from_buffer_copy(buffer)
- fields = []
-
- if header.status_code == OP_SUCCESS:
- following = [
- self.fields_or_field_count(),
- ('row_count', Int),
- ]
- if self.has_cursor:
- following.insert(0, ('cursor', Long))
- body_struct = Struct(following)
- body_class, body_buffer = body_struct.parse(client)
- body = body_class.from_buffer_copy(body_buffer)
-
- if self.include_field_names:
- field_count = body.fields.length
- else:
- field_count = body.field_count
-
- data_fields = []
- data_buffer = b''
- for i in range(body.row_count):
- row_fields = []
- row_buffer = b''
- for j in range(field_count):
- field_class, field_buffer = AnyDataObject.parse(client)
- row_fields.append(('column_{}'.format(j), field_class))
- row_buffer += field_buffer
-
- row_class = type(
- 'SQLResponseRow',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': row_fields,
- }
- )
- data_fields.append(('row_{}'.format(i), row_class))
- data_buffer += row_buffer
-
- data_class = type(
- 'SQLResponseData',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': data_fields,
- }
- )
- fields += body_class._fields_ + [
- ('data', data_class),
- ('more', ctypes.c_bool),
- ]
- buffer += body_buffer + data_buffer
- else:
- c_type, buffer_fragment = String.parse(client)
- buffer += buffer_fragment
- fields.append(('error_message', c_type))
-
- final_class = type(
- 'SQLResponse',
- (header_class,),
- {
- '_pack_': 1,
- '_fields_': fields,
- }
- )
- buffer += client.recv(ctypes.sizeof(final_class) - len(buffer))
- return final_class, buffer
-
- def to_python(self, ctype_object, *args, **kwargs):
- if ctype_object.status_code == 0:
- result = {
- 'more': Bool.to_python(
- ctype_object.more, *args, **kwargs
- ),
- 'data': [],
- }
- if hasattr(ctype_object, 'fields'):
- result['fields'] = StringArray.to_python(
- ctype_object.fields, *args, **kwargs
- )
- else:
- result['field_count'] = Int.to_python(
- ctype_object.field_count, *args, **kwargs
- )
- if hasattr(ctype_object, 'cursor'):
- result['cursor'] = Long.to_python(
- ctype_object.cursor, *args, **kwargs
- )
- for row_item in ctype_object.data._fields_:
- row_name = row_item[0]
- row_object = getattr(ctype_object.data, row_name)
- row = []
- for col_item in row_object._fields_:
- col_name = col_item[0]
- col_object = getattr(row_object, col_name)
- row.append(
- AnyDataObject.to_python(col_object, *args, **kwargs)
- )
- result['data'].append(row)
- return result
-
-
-@attr.s
-class Query:
- op_code = attr.ib(type=int)
- following = attr.ib(type=list, factory=list)
- query_id = attr.ib(type=int, default=None)
- _query_c_type = None
-
- @classmethod
- def build_c_type(cls):
- if cls._query_c_type is None:
- cls._query_c_type = type(
- cls.__name__,
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', ctypes.c_int),
- ('op_code', ctypes.c_short),
- ('query_id', ctypes.c_longlong),
- ],
- },
- )
- return cls._query_c_type
-
- def from_python(self, values: dict=None):
- if values is None:
- values = {}
- buffer = b''
-
- header_class = self.build_c_type()
- header = header_class()
- header.op_code = self.op_code
- if self.query_id is None:
- header.query_id = randint(MIN_LONG, MAX_LONG)
-
- for name, c_type in self.following:
- buffer += c_type.from_python(values[name])
-
- header.length = (
- len(buffer)
- + ctypes.sizeof(header_class)
- - ctypes.sizeof(ctypes.c_int)
- )
- return header.query_id, bytes(header) + buffer
-
- def perform(
- self, conn: 'Connection', query_params: dict=None,
- response_config: list=None,
- ) -> APIResult:
- """
- Perform query and process result.
-
- :param conn: connection to Ignite server,
- :param query_params: (optional) dict of named query parameters.
- Defaults to no parameters,
- :param response_config: (optional) response configuration − list of
- (name, type_hint) tuples. Defaults to empty return value,
- :return: instance of :class:`~pyignite.api.result.APIResult` with raw
- value (may undergo further processing in API functions).
- """
- _, send_buffer = self.from_python(query_params)
- conn.send(send_buffer)
- response_struct = Response(response_config)
- response_ctype, recv_buffer = response_struct.parse(conn)
- response = response_ctype.from_buffer_copy(recv_buffer)
- result = APIResult(response)
- if result.status == 0:
- result.value = response_struct.to_python(response)
- return result
-
-
-class ConfigQuery(Query):
- """
- This is a special query, used for creating caches with configuration.
- """
- _query_c_type = None
-
- @classmethod
- def build_c_type(cls):
- if cls._query_c_type is None:
- cls._query_c_type = type(
- cls.__name__,
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', ctypes.c_int),
- ('op_code', ctypes.c_short),
- ('query_id', ctypes.c_longlong),
- ('config_length', ctypes.c_int),
- ],
- },
- )
- return cls._query_c_type
-
- def from_python(self, values: dict = None):
- if values is None:
- values = {}
- buffer = b''
-
- header_class = self.build_c_type()
- header = header_class()
- header.op_code = self.op_code
- if self.query_id is None:
- header.query_id = randint(MIN_LONG, MAX_LONG)
-
- for name, c_type in self.following:
- buffer += c_type.from_python(values[name])
-
- header.length = (
- len(buffer)
- + ctypes.sizeof(header_class)
- - ctypes.sizeof(ctypes.c_int)
- )
- header.config_length = header.length - ctypes.sizeof(header_class)
- return header.query_id, bytes(header) + buffer
+from .query import Query, ConfigQuery, get_response_class
diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py
index 1396e83..7372713 100644
--- a/pyignite/queries/op_codes.py
+++ b/pyignite/queries/op_codes.py
@@ -43,6 +43,7 @@
OP_CACHE_REMOVE_KEYS = 1018
OP_CACHE_REMOVE_ALL = 1019
OP_CACHE_GET_SIZE = 1020
+OP_CACHE_LOCAL_PEEK = 1021
OP_CACHE_GET_NAMES = 1050
OP_CACHE_CREATE_WITH_NAME = 1051
@@ -51,6 +52,7 @@
OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION = 1054
OP_CACHE_GET_CONFIGURATION = 1055
OP_CACHE_DESTROY = 1056
+OP_CACHE_PARTITIONS = 1101
OP_QUERY_SCAN = 2000
OP_QUERY_SCAN_CURSOR_GET_PAGE = 2001
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
new file mode 100644
index 0000000..0e7cfa3
--- /dev/null
+++ b/pyignite/queries/query.py
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ctypes
+from random import randint
+
+import attr
+
+from pyignite.api.result import APIResult
+from pyignite.constants import *
+from pyignite.queries import response
+
+
+def get_response_class(obj: object, sql: bool = False):
+ """
+ Response class factory.
+
+ :param obj: cache, connection or client object,
+ :param sql: (optional) return normal (default) or SQL response class,
+ :return: response class.
+ """
+ template = 'SQLResponse{}{}{}' if sql else 'Response{}{}{}'
+ return getattr(response, template.format(*obj.get_protocol_version()))
+
+
+@attr.s
+class Query:
+ op_code = attr.ib(type=int)
+ following = attr.ib(type=list, factory=list)
+ query_id = attr.ib(type=int, default=None)
+ _query_c_type = None
+
+ @classmethod
+ def build_c_type(cls):
+ if cls._query_c_type is None:
+ cls._query_c_type = type(
+ cls.__name__,
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('length', ctypes.c_int),
+ ('op_code', ctypes.c_short),
+ ('query_id', ctypes.c_longlong),
+ ],
+ },
+ )
+ return cls._query_c_type
+
+ def from_python(self, values: dict = None):
+ if values is None:
+ values = {}
+ buffer = b''
+
+ header_class = self.build_c_type()
+ header = header_class()
+ header.op_code = self.op_code
+ if self.query_id is None:
+ header.query_id = randint(MIN_LONG, MAX_LONG)
+
+ for name, c_type in self.following:
+ buffer += c_type.from_python(values[name])
+
+ header.length = (
+ len(buffer)
+ + ctypes.sizeof(header_class)
+ - ctypes.sizeof(ctypes.c_int)
+ )
+ return header.query_id, bytes(header) + buffer
+
+ def perform(
+ self, conn: 'Connection', query_params: dict = None,
+ response_config: list = None, sql: bool = False, **kwargs,
+ ) -> APIResult:
+ """
+ Perform query and process result.
+
+ :param conn: connection to Ignite server,
+ :param query_params: (optional) dict of named query parameters.
+ Defaults to no parameters,
+ :param response_config: (optional) response configuration − list of
+ (name, type_hint) tuples. Defaults to empty return value,
+ :param sql: (optional) use normal (default) or SQL response class,
+ :return: instance of :class:`~pyignite.api.result.APIResult` with raw
+ value (may undergo further processing in API functions).
+ """
+ _, send_buffer = self.from_python(query_params)
+ conn.send(send_buffer)
+ response_class = get_response_class(conn, sql)
+ response_struct = response_class(response_config, **kwargs)
+ response_ctype, recv_buffer = response_struct.parse(conn)
+ response = response_ctype.from_buffer_copy(recv_buffer)
+
+ # this test depends on protocol version
+ if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED:
+ # update latest affinity version
+ conn.client.affinity_version = (
+ response.affinity_version, response.affinity_minor
+ )
+
+ # build result
+ result = APIResult(response)
+ if result.status == 0:
+ result.value = response_struct.to_python(response)
+ return result
+
+
+class ConfigQuery(Query):
+ """
+ This is a special query, used for creating caches with configuration.
+ """
+ _query_c_type = None
+
+ @classmethod
+ def build_c_type(cls):
+ if cls._query_c_type is None:
+ cls._query_c_type = type(
+ cls.__name__,
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('length', ctypes.c_int),
+ ('op_code', ctypes.c_short),
+ ('query_id', ctypes.c_longlong),
+ ('config_length', ctypes.c_int),
+ ],
+ },
+ )
+ return cls._query_c_type
+
+ def from_python(self, values: dict = None):
+ if values is None:
+ values = {}
+ buffer = b''
+
+ header_class = self.build_c_type()
+ header = header_class()
+ header.op_code = self.op_code
+ if self.query_id is None:
+ header.query_id = randint(MIN_LONG, MAX_LONG)
+
+ for name, c_type in self.following:
+ buffer += c_type.from_python(values[name])
+
+ header.length = (
+ len(buffer)
+ + ctypes.sizeof(header_class)
+ - ctypes.sizeof(ctypes.c_int)
+ )
+ header.config_length = header.length - ctypes.sizeof(header_class)
+ return header.query_id, bytes(header) + buffer
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
new file mode 100644
index 0000000..5fb4879
--- /dev/null
+++ b/pyignite/queries/response.py
@@ -0,0 +1,428 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+import ctypes
+
+import attr
+
+from pyignite.constants import *
+from pyignite.datatypes import (
+ AnyDataObject, Bool, Int, Long, String, StringArray, Struct,
+)
+from .op_codes import *
+
+
+@attr.s
+class Response140:
+ following = attr.ib(type=list, factory=list)
+ _response_header = None
+
+ def __attrs_post_init__(self):
+ # replace None with empty list
+ self.following = self.following or []
+
+ @classmethod
+ def build_header(cls):
+ if cls._response_header is None:
+ cls._response_header = type(
+ 'ResponseHeader',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('length', ctypes.c_int),
+ ('query_id', ctypes.c_longlong),
+ ('flags', ctypes.c_short),
+ ],
+ },
+ )
+ return cls._response_header
+
+ def parse(self, conn: 'Connection'):
+ header_class = self.build_header()
+ buffer = conn.recv(ctypes.sizeof(header_class))
+ header = header_class.from_buffer_copy(buffer)
+ fields = []
+
+ if header.flags & RHF_TOPOLOGY_CHANGED:
+ fields = [
+ ('affinity_version', ctypes.c_longlong),
+ ('affinity_minor', ctypes.c_int),
+ ]
+
+ if header.flags & RHF_ERROR:
+ fields.append(('status_code', ctypes.c_int))
+ buffer += conn.recv(
+ sum([ctypes.sizeof(field[1]) for field in fields])
+ )
+ msg_type, buffer_fragment = String.parse(conn)
+ buffer += buffer_fragment
+ fields.append(('error_message', msg_type))
+
+ else:
+ buffer += conn.recv(
+ sum([ctypes.sizeof(field[1]) for field in fields])
+ )
+ for name, ignite_type in self.following:
+ c_type, buffer_fragment = ignite_type.parse(conn)
+ buffer += buffer_fragment
+ fields.append((name, c_type))
+
+ response_class = type(
+ 'Response',
+ (header_class,),
+ {
+ '_pack_': 1,
+ '_fields_': fields,
+ }
+ )
+ return response_class, buffer
+
+ def to_python(self, ctype_object, *args, **kwargs):
+ result = OrderedDict()
+
+ for name, c_type in self.following:
+ result[name] = c_type.to_python(
+ getattr(ctype_object, name),
+ *args, **kwargs
+ )
+
+ return result if result else None
+
+
+@attr.s
+class SQLResponse140(Response140):
+ """
+ The response class of SQL functions is special in the way the row-column
+ data is counted in it. Basically, Ignite thin client API is following a
+ “counter right before the counted objects” rule in most of its parts.
+ SQL ops are breaking this rule.
+ """
+ include_field_names = attr.ib(type=bool, default=False)
+ has_cursor = attr.ib(type=bool, default=False)
+
+ def fields_or_field_count(self):
+ if self.include_field_names:
+ return 'fields', StringArray
+ return 'field_count', Int
+
+ def parse(self, conn: 'Connection'):
+ header_class = self.build_header()
+ buffer = conn.recv(ctypes.sizeof(header_class))
+ header = header_class.from_buffer_copy(buffer)
+ fields = []
+
+ if header.flags & RHF_TOPOLOGY_CHANGED:
+ fields = [
+ ('affinity_version', ctypes.c_longlong),
+ ('affinity_minor', ctypes.c_int),
+ ]
+
+ if header.flags & RHF_ERROR:
+ fields.append(('status_code', ctypes.c_int))
+ buffer += conn.recv(
+ sum([ctypes.sizeof(field[1]) for field in fields])
+ )
+ msg_type, buffer_fragment = String.parse(conn)
+ buffer += buffer_fragment
+ fields.append(('error_message', msg_type))
+ else:
+ buffer += conn.recv(
+ sum([ctypes.sizeof(field[1]) for field in fields])
+ )
+ following = [
+ self.fields_or_field_count(),
+ ('row_count', Int),
+ ]
+ if self.has_cursor:
+ following.insert(0, ('cursor', Long))
+ body_struct = Struct(following)
+ body_class, body_buffer = body_struct.parse(conn)
+ body = body_class.from_buffer_copy(body_buffer)
+
+ if self.include_field_names:
+ field_count = body.fields.length
+ else:
+ field_count = body.field_count
+
+ data_fields = []
+ data_buffer = b''
+ for i in range(body.row_count):
+ row_fields = []
+ row_buffer = b''
+ for j in range(field_count):
+ field_class, field_buffer = AnyDataObject.parse(conn)
+ row_fields.append(('column_{}'.format(j), field_class))
+ row_buffer += field_buffer
+
+ row_class = type(
+ 'SQLResponseRow',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': row_fields,
+ }
+ )
+ data_fields.append(('row_{}'.format(i), row_class))
+ data_buffer += row_buffer
+
+ data_class = type(
+ 'SQLResponseData',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': data_fields,
+ }
+ )
+ fields += body_class._fields_ + [
+ ('data', data_class),
+ ('more', ctypes.c_bool),
+ ]
+ buffer += body_buffer + data_buffer
+
+ final_class = type(
+ 'SQLResponse',
+ (header_class,),
+ {
+ '_pack_': 1,
+ '_fields_': fields,
+ }
+ )
+ buffer += conn.recv(ctypes.sizeof(final_class) - len(buffer))
+ return final_class, buffer
+
+ def to_python(self, ctype_object, *args, **kwargs):
+ if not hasattr(ctype_object, 'status_code'):
+ result = {
+ 'more': Bool.to_python(
+ ctype_object.more, *args, **kwargs
+ ),
+ 'data': [],
+ }
+ if hasattr(ctype_object, 'fields'):
+ result['fields'] = StringArray.to_python(
+ ctype_object.fields, *args, **kwargs
+ )
+ else:
+ result['field_count'] = Int.to_python(
+ ctype_object.field_count, *args, **kwargs
+ )
+ if hasattr(ctype_object, 'cursor'):
+ result['cursor'] = Long.to_python(
+ ctype_object.cursor, *args, **kwargs
+ )
+ for row_item in ctype_object.data._fields_:
+ row_name = row_item[0]
+ row_object = getattr(ctype_object.data, row_name)
+ row = []
+ for col_item in row_object._fields_:
+ col_name = col_item[0]
+ col_object = getattr(row_object, col_name)
+ row.append(
+ AnyDataObject.to_python(col_object, *args, **kwargs)
+ )
+ result['data'].append(row)
+ return result
+
+
+@attr.s
+class Response130:
+ following = attr.ib(type=list, factory=list)
+ _response_header = None
+
+ def __attrs_post_init__(self):
+ # replace None with empty list
+ self.following = self.following or []
+
+ @classmethod
+ def build_header(cls):
+ if cls._response_header is None:
+ cls._response_header = type(
+ 'ResponseHeader',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('length', ctypes.c_int),
+ ('query_id', ctypes.c_longlong),
+ ('status_code', ctypes.c_int),
+ ],
+ },
+ )
+ return cls._response_header
+
+ def parse(self, client: 'Client'):
+ header_class = self.build_header()
+ buffer = client.recv(ctypes.sizeof(header_class))
+ header = header_class.from_buffer_copy(buffer)
+ fields = []
+
+ if header.status_code == OP_SUCCESS:
+ for name, ignite_type in self.following:
+ c_type, buffer_fragment = ignite_type.parse(client)
+ buffer += buffer_fragment
+ fields.append((name, c_type))
+ else:
+ c_type, buffer_fragment = String.parse(client)
+ buffer += buffer_fragment
+ fields.append(('error_message', c_type))
+
+ response_class = type(
+ 'Response',
+ (header_class,),
+ {
+ '_pack_': 1,
+ '_fields_': fields,
+ }
+ )
+ return response_class, buffer
+
+ def to_python(self, ctype_object, *args, **kwargs):
+ result = OrderedDict()
+
+ for name, c_type in self.following:
+ result[name] = c_type.to_python(
+ getattr(ctype_object, name),
+ *args, **kwargs
+ )
+
+ return result if result else None
+
+
+@attr.s
+class SQLResponse130(Response130):
+ """
+ The response class of SQL functions is special in the way the row-column
+ data is counted in it. Basically, Ignite thin client API is following a
+ “counter right before the counted objects” rule in most of its parts.
+ SQL ops are breaking this rule.
+ """
+ include_field_names = attr.ib(type=bool, default=False)
+ has_cursor = attr.ib(type=bool, default=False)
+
+ def fields_or_field_count(self):
+ if self.include_field_names:
+ return 'fields', StringArray
+ return 'field_count', Int
+
+ def parse(self, client: 'Client'):
+ header_class = self.build_header()
+ buffer = client.recv(ctypes.sizeof(header_class))
+ header = header_class.from_buffer_copy(buffer)
+ fields = []
+
+ if header.status_code == OP_SUCCESS:
+ following = [
+ self.fields_or_field_count(),
+ ('row_count', Int),
+ ]
+ if self.has_cursor:
+ following.insert(0, ('cursor', Long))
+ body_struct = Struct(following)
+ body_class, body_buffer = body_struct.parse(client)
+ body = body_class.from_buffer_copy(body_buffer)
+
+ if self.include_field_names:
+ field_count = body.fields.length
+ else:
+ field_count = body.field_count
+
+ data_fields = []
+ data_buffer = b''
+ for i in range(body.row_count):
+ row_fields = []
+ row_buffer = b''
+ for j in range(field_count):
+ field_class, field_buffer = AnyDataObject.parse(client)
+ row_fields.append(('column_{}'.format(j), field_class))
+ row_buffer += field_buffer
+
+ row_class = type(
+ 'SQLResponseRow',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': row_fields,
+ }
+ )
+ data_fields.append(('row_{}'.format(i), row_class))
+ data_buffer += row_buffer
+
+ data_class = type(
+ 'SQLResponseData',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': data_fields,
+ }
+ )
+ fields += body_class._fields_ + [
+ ('data', data_class),
+ ('more', ctypes.c_bool),
+ ]
+ buffer += body_buffer + data_buffer
+ else:
+ c_type, buffer_fragment = String.parse(client)
+ buffer += buffer_fragment
+ fields.append(('error_message', c_type))
+
+ final_class = type(
+ 'SQLResponse',
+ (header_class,),
+ {
+ '_pack_': 1,
+ '_fields_': fields,
+ }
+ )
+ buffer += client.recv(ctypes.sizeof(final_class) - len(buffer))
+ return final_class, buffer
+
+ def to_python(self, ctype_object, *args, **kwargs):
+ if ctype_object.status_code == 0:
+ result = {
+ 'more': Bool.to_python(
+ ctype_object.more, *args, **kwargs
+ ),
+ 'data': [],
+ }
+ if hasattr(ctype_object, 'fields'):
+ result['fields'] = StringArray.to_python(
+ ctype_object.fields, *args, **kwargs
+ )
+ else:
+ result['field_count'] = Int.to_python(
+ ctype_object.field_count, *args, **kwargs
+ )
+ if hasattr(ctype_object, 'cursor'):
+ result['cursor'] = Long.to_python(
+ ctype_object.cursor, *args, **kwargs
+ )
+ for row_item in ctype_object.data._fields_:
+ row_name = row_item[0]
+ row_object = getattr(ctype_object.data, row_name)
+ row = []
+ for col_item in row_object._fields_:
+ col_name = col_item[0]
+ col_object = getattr(row_object, col_name)
+ row.append(
+ AnyDataObject.to_python(col_object, *args, **kwargs)
+ )
+ result['data'].append(row)
+ return result
+
+
+Response120 = Response130
+SQLResponse120 = SQLResponse130
diff --git a/pyignite/utils.py b/pyignite/utils.py
index 1d4298e..ca9725d 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -13,14 +13,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import ctypes
+import decimal
+
from functools import wraps
-from typing import Any, Type, Union
+from threading import Event, Thread
+from typing import Any, Callable, Optional, Type, Tuple, Union
from pyignite.datatypes.base import IgniteDataType
from .constants import *
-def is_iterable(value):
+LONG_MASK = 0xffffffff
+DIGITS_PER_INT = 9
+
+
+def is_pow2(value: int) -> bool:
+ """ Check if value is power of two. """
+ return value > 0 and ((value & (value - 1)) == 0)
+
+
+def is_iterable(value: Any) -> bool:
""" Check if value is iterable. """
try:
iter(value)
@@ -71,7 +84,7 @@
return ((value ^ 0x80000000) & 0xffffffff) - 0x80000000
-def unwrap_binary(client: 'Client', wrapped: tuple):
+def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
"""
Unwrap wrapped BinaryObject and convert it to Python data.
@@ -82,13 +95,15 @@
from pyignite.datatypes.complex import BinaryObject
blob, offset = wrapped
- client_clone = client.clone(prefetch=blob)
- client_clone.pos = offset
- data_class, data_bytes = BinaryObject.parse(client_clone)
- return BinaryObject.to_python(
+ conn_clone = client.random_node.clone(prefetch=blob)
+ conn_clone.pos = offset
+ data_class, data_bytes = BinaryObject.parse(conn_clone)
+ result = BinaryObject.to_python(
data_class.from_buffer_copy(data_bytes),
client,
)
+ conn_clone.close()
+ return result
def hashcode(string: Union[str, bytes]) -> int:
@@ -118,13 +133,15 @@
return cache if type(cache) is int else hashcode(cache)
-def entity_id(cache: Union[str, int]) -> int:
+def entity_id(cache: Union[str, int]) -> Optional[int]:
"""
Create a type ID from type name or field ID from field name.
:param cache: entity name or ID,
:return: entity ID.
"""
+ if cache is None:
+ return None
return cache if type(cache) is int else hashcode(cache.lower())
@@ -153,6 +170,56 @@
return s_id
+def decimal_hashcode(value: decimal.Decimal) -> int:
+ """
+ This is a translation of `java.math.BigDecimal` class `hashCode()` method
+ to Python.
+
+ :param value: pythonic decimal value,
+ :return: hashcode.
+ """
+ sign, digits, scale = value.normalize().as_tuple()
+ sign = -1 if sign else 1
+ value = int(''.join([str(d) for d in digits]))
+
+ if value < MAX_LONG:
+ # this is the case when Java BigDecimal digits are stored
+ # compactly, in the internal 64-bit integer field
+ int_hash = (
+ (unsigned(value, ctypes.c_ulonglong) >> 32) * 31
+ + (value & LONG_MASK)
+ ) & LONG_MASK
+ else:
+ # digits are not fit in the 64-bit long, so they get split internally
+ # to an array of values within 32-bit integer range each (it is really
+ # a part of `java.math.BigInteger` class internals)
+ magnitude = []
+ order = 0
+ while True:
+ elem = value >> order
+ if elem > 1:
+ magnitude.insert(0, ctypes.c_int(elem).value)
+ order += 32
+ else:
+ break
+
+ int_hash = 0
+ for v in magnitude:
+ int_hash = (31 * int_hash + (v & LONG_MASK)) & LONG_MASK
+
+ return ctypes.c_int(31 * int_hash * sign - scale).value
+
+
+def datetime_hashcode(value: int) -> int:
+ """
+ Calculates hashcode from UNIX epoch.
+
+ :param value: UNIX time,
+ :return: Java hashcode.
+ """
+ return (value & LONG_MASK) ^ (unsigned(value, ctypes.c_ulonglong) >> 32)
+
+
def status_to_exception(exc: Type[Exception]):
"""
Converts erroneous status code with error message to an exception
@@ -170,3 +237,62 @@
return result.value
return ste_wrapper
return ste_decorator
+
+
+def get_field_by_id(
+ obj: 'GenericObjectMeta', field_id: int
+) -> Tuple[Any, IgniteDataType]:
+ """
+ Returns a complex object's field value, given the field's entity ID.
+
+ :param obj: complex object,
+ :param field_id: field ID,
+ :return: complex object field's value and type.
+ """
+ for fname, ftype in obj._schema.items():
+ if entity_id(fname) == field_id:
+ return getattr(obj, fname, getattr(ftype, 'default')), ftype
+
+
+def unsigned(value: int, c_type: ctypes._SimpleCData = ctypes.c_uint) -> int:
+ """ Convert signed integer value to unsigned. """
+ return c_type(value).value
+
+
+class DaemonicTimer(Thread):
+ """
+ Same as normal `threading.Timer`, but do not delay the program exit.
+ """
+
+ def __init__(self, interval, function, args=None, kwargs=None):
+ Thread.__init__(self, daemon=True)
+ self.interval = interval
+ self.function = function
+ self.args = args if args is not None else []
+ self.kwargs = kwargs if kwargs is not None else {}
+ self.finished = Event()
+
+ def cancel(self):
+ """Stop the timer if it hasn't finished yet."""
+ self.finished.set()
+
+ def run(self):
+ self.finished.wait(self.interval)
+ if not self.finished.is_set():
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+
+
+def capitalize(string: str) -> str:
+ """
+ Capitalizing the string, assuming the first character is a letter.
+ Does not touch any other character, unlike the `string.capitalize()`.
+ """
+ return string[:1].upper() + string[1:]
+
+
+def process_delimiter(name: str, delimiter: str) -> str:
+ """
+ Splits the name by delimiter, capitalize each part, merge.
+ """
+ return ''.join([capitalize(x) for x in name.split(delimiter)])
diff --git a/requirements/tests.txt b/requirements/tests.txt
index c107c8b..327f501 100644
--- a/requirements/tests.txt
+++ b/requirements/tests.txt
@@ -3,3 +3,4 @@
pytest==3.6.1
pytest-cov==2.5.1
teamcity-messages==1.21
+psutil==5.6.5
diff --git a/tests/config/ignite-config-base.xml b/tests/config/ignite-config-base.xml
new file mode 100644
index 0000000..7487618
--- /dev/null
+++ b/tests/config/ignite-config-base.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+
+ <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_FALLBACK"/>
+ <property name="searchSystemEnvironment" value="true"/>
+ </bean>
+
+ <bean id="grid.cfg" abstract="true" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="localAddress" value="127.0.0.1"/>
+ <property name="localPort" value="48500"/>
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:48500..48503</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300"/>
+ </bean>
+ </property>
+
+ <property name="communicationSpi">
+ <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
+ <property name="localAddress" value="127.0.0.1"/>
+ <property name="localPort" value="48100"/>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="custom-affinity"/>
+ <property name="affinity">
+ <bean class="org.apache.ignite.internal.processors.affinity.LocalAffinityFunction"/>
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
+ <constructor-arg type="java.lang.String" value="config/log4j.xml"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/tests/config/ssl.xml b/tests/config/ignite-config-ssl.xml
similarity index 76%
rename from tests/config/ssl.xml
rename to tests/config/ignite-config-ssl.xml
index 8d74cbb..827405c 100644
--- a/tests/config/ssl.xml
+++ b/tests/config/ignite-config-ssl.xml
@@ -22,21 +22,16 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <import resource="ignite-config-base.xml"/>
- <!--
- Initialize property configurer so we can reference environment variables.
- -->
- <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_FALLBACK"/>
- <property name="searchSystemEnvironment" value="true"/>
- </bean>
-
- <bean id="test.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <bean parent="grid.cfg">
<property name="connectorConfiguration"><null/></property>
<property name="clientConnectorConfiguration">
<bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
- <property name="portRange" value="10"/>
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="${IGNITE_CLIENT_PORT}"/>
+ <property name="portRange" value="0"/>
<property name="sslEnabled" value="true"/>
<property name="useIgniteSslContextFactory" value="false"/>
<property name="sslClientAuth" value="true"/>
@@ -44,9 +39,9 @@
<!-- Provide Ssl context. -->
<property name="sslContextFactory">
<bean class="org.apache.ignite.ssl.SslContextFactory">
- <property name="keyStoreFilePath" value="${PYTHON_TEST_CONFIG_PATH}/ssl/server.jks"/>
+ <property name="keyStoreFilePath" value="config/ssl/server.jks"/>
<property name="keyStorePassword" value="123456"/>
- <property name="trustStoreFilePath" value="${PYTHON_TEST_CONFIG_PATH}/ssl/trust.jks"/>
+ <property name="trustStoreFilePath" value="config/ssl/trust.jks"/>
<property name="trustStorePassword" value="123456"/>
</bean>
</property>
diff --git a/tests/config/ignite-config.xml b/tests/config/ignite-config.xml
new file mode 100644
index 0000000..09fba2c
--- /dev/null
+++ b/tests/config/ignite-config.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="ignite-config-base.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="${IGNITE_CLIENT_PORT}"/>
+ <property name="portRange" value="0"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/tests/config/log4j.xml b/tests/config/log4j.xml
new file mode 100644
index 0000000..f5562d0
--- /dev/null
+++ b/tests/config/log4j.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<Configuration>
+ <Appenders>
+ <Console name="CONSOLE" target="SYSTEM_OUT">
+ <PatternLayout pattern="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </Console>
+ <RollingFile name="FILE" append="true"
+ filePattern="logs/ignite-log-${env:IGNITE_INSTANCE_INDEX}-%i.log.gz"
+ fileName="logs/ignite-log-${env:IGNITE_INSTANCE_INDEX}.txt">
+ <PatternLayout pattern="%m%n"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="10MB" />
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener" level="debug"/>
+ <Root level="info">
+ <AppenderRef ref="CONSOLE"/>
+ <AppenderRef ref="FILE"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/tests/conftest.py b/tests/conftest.py
index 8ebd5b8..9974b16 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -21,10 +21,11 @@
from pyignite import Client
from pyignite.constants import *
-from pyignite.api import cache_create, cache_get_names, cache_destroy
+from pyignite.api import cache_create, cache_destroy
+from tests.util import _start_ignite, start_ignite_gen, get_request_grid_idx
-class UseSSLParser(argparse.Action):
+class BoolParser(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
values = True if values is None else bool(strtobool(values))
@@ -64,14 +65,111 @@
)
+@pytest.fixture(scope='session', autouse=True)
+def server1(request):
+ yield from start_ignite_server_gen(1, request)
+
+
+@pytest.fixture(scope='session', autouse=True)
+def server2(request):
+ yield from start_ignite_server_gen(2, request)
+
+
+@pytest.fixture(scope='session', autouse=True)
+def server3(request):
+ yield from start_ignite_server_gen(3, request)
+
+
+@pytest.fixture(scope='module')
+def start_ignite_server(use_ssl):
+ def start(idx=1):
+ return _start_ignite(idx, use_ssl=use_ssl)
+
+ return start
+
+
+def start_ignite_server_gen(idx, request):
+ use_ssl = request.config.getoption("--use-ssl")
+ yield from start_ignite_gen(idx, use_ssl)
+
+
@pytest.fixture(scope='module')
def client(
- ignite_host, ignite_port, timeout, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile,
- ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version,
+ node, timeout, partition_aware, use_ssl, ssl_keyfile, ssl_keyfile_password,
+ ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version,
+ username, password,
+):
+ yield from client0(node, timeout, partition_aware, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile,
+ ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version, username, password)
+
+
+@pytest.fixture(scope='module')
+def client_partition_aware(
+ node, timeout, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile,
+ ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version, username,
+ password
+):
+ yield from client0(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile,
+ ssl_cert_reqs, ssl_ciphers, ssl_version, username, password)
+
+
+@pytest.fixture(scope='module')
+def client_partition_aware_single_server(
+ node, timeout, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile,
+ ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version, username,
+ password
+):
+ node = node[:1]
+ yield from client(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile,
+ ssl_cert_reqs, ssl_ciphers, ssl_version, username, password)
+
+
+@pytest.fixture
+def cache(client):
+ cache_name = 'my_bucket'
+ conn = client.random_node
+
+ cache_create(conn, cache_name)
+ yield cache_name
+ cache_destroy(conn, cache_name)
+
+
+@pytest.fixture(autouse=True)
+def log_init():
+ # Init log call timestamp
+ get_request_grid_idx()
+
+
+@pytest.fixture(scope='module')
+def start_client(use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers,
+ ssl_version,username, password):
+ def start(**kwargs):
+ cli_kw = kwargs.copy()
+ cli_kw.update({
+ 'use_ssl': use_ssl,
+ 'ssl_keyfile': ssl_keyfile,
+ 'ssl_keyfile_password': ssl_keyfile_password,
+ 'ssl_certfile': ssl_certfile,
+ 'ssl_ca_certfile': ssl_ca_certfile,
+ 'ssl_cert_reqs': ssl_cert_reqs,
+ 'ssl_ciphers': ssl_ciphers,
+ 'ssl_version': ssl_version,
+ 'username': username,
+ 'password': password
+ })
+ return Client(**cli_kw)
+
+ return start
+
+
+def client0(
+ node, timeout, partition_aware, use_ssl, ssl_keyfile, ssl_keyfile_password,
+ ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version,
username, password,
):
client = Client(
timeout=timeout,
+ partition_aware=partition_aware,
use_ssl=use_ssl,
ssl_keyfile=ssl_keyfile,
ssl_keyfile_password=ssl_keyfile_password,
@@ -83,22 +181,17 @@
username=username,
password=password,
)
- client.connect(ignite_host, ignite_port)
+ nodes = []
+ for n in node:
+ host, port = n.split(':')
+ port = int(port)
+ nodes.append((host, port))
+ client.connect(nodes)
yield client
- for cache_name in cache_get_names(client).value:
- cache_destroy(client, cache_name)
client.close()
@pytest.fixture
-def cache(client):
- cache_name = 'my_bucket'
- cache_create(client, cache_name)
- yield cache_name
- cache_destroy(client, cache_name)
-
-
-@pytest.fixture
def examples(request):
return request.config.getoption("--examples")
@@ -112,17 +205,13 @@
def pytest_addoption(parser):
parser.addoption(
- '--ignite-host',
+ '--node',
action='append',
- default=[IGNITE_DEFAULT_HOST],
- help='Ignite binary protocol test server host (default: localhost)'
- )
- parser.addoption(
- '--ignite-port',
- action='append',
- default=[IGNITE_DEFAULT_PORT],
- type=int,
- help='Ignite binary protocol test server port (default: 10800)'
+ default=None,
+ help=(
+ 'Ignite binary protocol test server connection string '
+ '(default: "localhost:10801")'
+ )
)
parser.addoption(
'--timeout',
@@ -135,8 +224,15 @@
)
)
parser.addoption(
+ '--partition-aware',
+ action=BoolParser,
+ nargs='?',
+ default=False,
+ help='Turn on the best effort affinity feature'
+ )
+ parser.addoption(
'--use-ssl',
- action=UseSSLParser,
+ action=BoolParser,
nargs='?',
default=False,
help='Use SSL encryption'
@@ -214,9 +310,11 @@
def pytest_generate_tests(metafunc):
session_parameters = {
- 'ignite_host': IGNITE_DEFAULT_HOST,
- 'ignite_port': IGNITE_DEFAULT_PORT,
+ 'node': ['{host}:{port}'.format(host='127.0.0.1', port=10801),
+ '{host}:{port}'.format(host='127.0.0.1', port=10802),
+ '{host}:{port}'.format(host='127.0.0.1', port=10803)],
'timeout': None,
+ 'partition_aware': False,
'use_ssl': False,
'ssl_keyfile': None,
'ssl_keyfile_password': None,
@@ -232,9 +330,10 @@
for param_name in session_parameters:
if param_name in metafunc.fixturenames:
param = metafunc.config.getoption(param_name)
+ # TODO: This does not work for bool
if param is None:
param = session_parameters[param_name]
- if type(param) is not list:
+ if param_name == 'node' or type(param) is not list:
param = [param]
metafunc.parametrize(param_name, param, scope='session')
diff --git a/tests/test_affinity.py b/tests/test_affinity.py
new file mode 100644
index 0000000..a55251b
--- /dev/null
+++ b/tests/test_affinity.py
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+import decimal
+from uuid import UUID, uuid4
+
+import pytest
+
+from pyignite import GenericObjectMeta
+from pyignite.api import *
+from pyignite.constants import *
+from pyignite.datatypes import *
+from pyignite.datatypes.cache_config import CacheMode
+from pyignite.datatypes.prop_codes import *
+
+
+def test_get_node_partitions(client_partition_aware):
+
+ conn = client_partition_aware.random_node
+
+ cache_1 = client_partition_aware.get_or_create_cache('test_cache_1')
+ cache_2 = client_partition_aware.get_or_create_cache({
+ PROP_NAME: 'test_cache_2',
+ PROP_CACHE_KEY_CONFIGURATION: [
+ {
+ 'type_name': ByteArray.type_name,
+ 'affinity_key_field_name': 'byte_affinity',
+ }
+ ],
+ })
+ cache_3 = client_partition_aware.get_or_create_cache('test_cache_3')
+ cache_4 = client_partition_aware.get_or_create_cache('test_cache_4')
+ cache_5 = client_partition_aware.get_or_create_cache('test_cache_5')
+
+ result = cache_get_node_partitions(
+ conn,
+ [cache_1.cache_id, cache_2.cache_id]
+ )
+ assert result.status == 0, result.message
+
+
+@pytest.mark.parametrize(
+ 'key, key_hint', [
+ # integers
+ (42, None),
+ (43, ByteObject),
+ (-44, ByteObject),
+ (45, IntObject),
+ (-46, IntObject),
+ (47, ShortObject),
+ (-48, ShortObject),
+ (49, LongObject),
+ (MAX_INT-50, LongObject),
+ (MAX_INT+51, LongObject),
+
+ # floating point
+ (5.2, None),
+ (5.354, FloatObject),
+ (-5.556, FloatObject),
+ (-57.58, DoubleObject),
+
+ # boolean
+ (True, None),
+ (True, BoolObject),
+ (False, BoolObject),
+
+ # char
+ ('A', CharObject),
+ ('Z', CharObject),
+ ('â…“', CharObject),
+ ('á', CharObject),
+ ('Ñ‹', CharObject),
+ ('ã‚«', CharObject),
+ ('Ø', CharObject),
+ ('ß', CharObject),
+
+ # string
+ ('This is a test string', None),
+ ('Кириллица', None),
+ ('Little Mary had a lamb', String),
+
+ # UUID
+ (UUID('12345678123456789876543298765432'), None),
+ (UUID('74274274274274274274274274274274'), UUIDObject),
+ (uuid4(), None),
+
+ # decimal (long internal representation in Java)
+ (decimal.Decimal('-234.567'), None),
+ (decimal.Decimal('200.0'), None),
+ (decimal.Decimal('123.456'), DecimalObject),
+ (decimal.Decimal('1.0'), None),
+ (decimal.Decimal('0.02'), None),
+
+ # decimal (BigInteger internal representation in Java)
+ (decimal.Decimal('12345671234567123.45671234567'), None),
+ (decimal.Decimal('-845678456.7845678456784567845'), None),
+
+ # date and time
+ (datetime(1980, 1, 1), None),
+ ((datetime(1980, 1, 1), 999), TimestampObject),
+ (timedelta(days=99), TimeObject),
+
+ ],
+)
+def test_affinity(client_partition_aware, key, key_hint):
+
+ cache_1 = client_partition_aware.get_or_create_cache({
+ PROP_NAME: 'test_cache_1',
+ PROP_CACHE_MODE: CacheMode.PARTITIONED,
+ })
+ value = 42
+ cache_1.put(key, value, key_hint=key_hint)
+
+ best_node = cache_1.get_best_node(key, key_hint=key_hint)
+
+ for node in filter(lambda n: n.alive, client_partition_aware._nodes):
+ result = cache_local_peek(
+ node, cache_1.cache_id, key, key_hint=key_hint,
+ )
+ if node is best_node:
+ assert result.value == value, (
+ 'Affinity calculation error for {}'.format(key)
+ )
+ else:
+ assert result.value is None, (
+ 'Affinity calculation error for {}'.format(key)
+ )
+
+ cache_1.destroy()
+
+
+def test_affinity_for_generic_object(client_partition_aware):
+
+ cache_1 = client_partition_aware.get_or_create_cache({
+ PROP_NAME: 'test_cache_1',
+ PROP_CACHE_MODE: CacheMode.PARTITIONED,
+ })
+
+ class KeyClass(
+ metaclass=GenericObjectMeta,
+ schema={
+ 'NO': IntObject,
+ 'NAME': String,
+ },
+ ):
+ pass
+
+ key = KeyClass()
+ key.NO = 1
+ key.NAME = 'test_string'
+
+ cache_1.put(key, 42, key_hint=BinaryObject)
+
+ best_node = cache_1.get_best_node(key, key_hint=BinaryObject)
+
+ for node in filter(lambda n: n.alive, client_partition_aware._nodes):
+ result = cache_local_peek(
+ node, cache_1.cache_id, key, key_hint=BinaryObject,
+ )
+ if node is best_node:
+ assert result.value == 42, (
+ 'Affinity calculation error for {}'.format(key)
+ )
+ else:
+ assert result.value is None, (
+ 'Affinity calculation error for {}'.format(key)
+ )
+
+ cache_1.destroy()
+
+
+def test_affinity_for_generic_object_without_type_hints(client_partition_aware):
+
+ if not client_partition_aware.partition_awareness_supported_by_protocol:
+ pytest.skip(
+ 'Best effort affinity is not supported by the protocol {}.'.format(
+ client_partition_aware.protocol_version
+ )
+ )
+
+ cache_1 = client_partition_aware.get_or_create_cache({
+ PROP_NAME: 'test_cache_1',
+ PROP_CACHE_MODE: CacheMode.PARTITIONED,
+ })
+
+ class KeyClass(
+ metaclass=GenericObjectMeta,
+ schema={
+ 'NO': IntObject,
+ 'NAME': String,
+ },
+ ):
+ pass
+
+ key = KeyClass()
+ key.NO = 2
+ key.NAME = 'another_test_string'
+
+ cache_1.put(key, 42)
+
+ best_node = cache_1.get_best_node(key)
+
+ for node in filter(lambda n: n.alive, client_partition_aware._nodes):
+ result = cache_local_peek(
+ node, cache_1.cache_id, key
+ )
+ if node is best_node:
+ assert result.value == 42, (
+ 'Affinity calculation error for {}'.format(key)
+ )
+ else:
+ assert result.value is None, (
+ 'Affinity calculation error for {}'.format(key)
+ )
+
+ cache_1.destroy()
diff --git a/tests/test_affinity_bad_servers.py b/tests/test_affinity_bad_servers.py
new file mode 100644
index 0000000..dce09de
--- /dev/null
+++ b/tests/test_affinity_bad_servers.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from pyignite.exceptions import ReconnectError
+from tests.util import *
+
+
+def test_client_with_multiple_bad_servers(start_client):
+ client = start_client(partition_aware=True)
+ with pytest.raises(ReconnectError) as e_info:
+ client.connect([("127.0.0.1", 10900), ("127.0.0.1", 10901)])
+ assert str(e_info.value) == "Can not connect."
+
+
+def test_client_with_failed_server(request, start_ignite_server, start_client):
+ srv = start_ignite_server(4)
+ try:
+ client = start_client()
+ client.connect([("127.0.0.1", 10804)])
+ cache = client.get_or_create_cache(request.node.name)
+ cache.put(1, 1)
+ kill_process_tree(srv.pid)
+ with pytest.raises(ConnectionResetError):
+ cache.get(1)
+ finally:
+ kill_process_tree(srv.pid)
+
+
+def test_client_with_recovered_server(request, start_ignite_server, start_client):
+ srv = start_ignite_server(4)
+ try:
+ client = start_client()
+ client.connect([("127.0.0.1", 10804)])
+ cache = client.get_or_create_cache(request.node.name)
+ cache.put(1, 1)
+
+ # Kill and restart server
+ kill_process_tree(srv.pid)
+ srv = start_ignite_server(4)
+
+ # First request fails
+ with pytest.raises(Exception):
+ cache.put(1, 2)
+
+ # Retry succeeds
+ cache.put(1, 2)
+ assert cache.get(1) == 2
+ finally:
+ kill_process_tree(srv.pid)
diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py
new file mode 100644
index 0000000..eb46ab6
--- /dev/null
+++ b/tests/test_affinity_request_routing.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+
+import pytest
+
+from pyignite import *
+from pyignite.datatypes import *
+from pyignite.datatypes.cache_config import CacheMode
+from pyignite.datatypes.prop_codes import *
+from tests.util import *
+
+
+@pytest.mark.parametrize("key,grid_idx", [(1, 3), (2, 1), (3, 1), (4, 3), (5, 1), (6, 3), (11, 2), (13, 2), (19, 2)])
+@pytest.mark.parametrize("backups", [0, 1, 2, 3])
+def test_cache_operation_on_primitive_key_routes_request_to_primary_node(
+ request, key, grid_idx, backups, client_partition_aware):
+
+ cache = client_partition_aware.get_or_create_cache({
+ PROP_NAME: request.node.name + str(backups),
+ PROP_BACKUPS_NUMBER: backups,
+ })
+
+ # Warm up affinity map
+ cache.put(key, key)
+ get_request_grid_idx()
+
+ # Test
+ cache.get(key)
+ assert get_request_grid_idx() == grid_idx
+
+ cache.put(key, key)
+ assert get_request_grid_idx("Put") == grid_idx
+
+ cache.replace(key, key + 1)
+ assert get_request_grid_idx("Replace") == grid_idx
+
+ cache.clear_key(key)
+ assert get_request_grid_idx("ClearKey") == grid_idx
+
+ cache.contains_key(key)
+ assert get_request_grid_idx("ContainsKey") == grid_idx
+
+ cache.get_and_put(key, 3)
+ assert get_request_grid_idx("GetAndPut") == grid_idx
+
+ cache.get_and_put_if_absent(key, 4)
+ assert get_request_grid_idx("GetAndPutIfAbsent") == grid_idx
+
+ cache.put_if_absent(key, 5)
+ assert get_request_grid_idx("PutIfAbsent") == grid_idx
+
+ cache.get_and_remove(key)
+ assert get_request_grid_idx("GetAndRemove") == grid_idx
+
+ cache.get_and_replace(key, 6)
+ assert get_request_grid_idx("GetAndReplace") == grid_idx
+
+ cache.remove_key(key)
+ assert get_request_grid_idx("RemoveKey") == grid_idx
+
+ cache.remove_if_equals(key, -1)
+ assert get_request_grid_idx("RemoveIfEquals") == grid_idx
+
+ cache.replace(key, -1)
+ assert get_request_grid_idx("Replace") == grid_idx
+
+ cache.replace_if_equals(key, 10, -10)
+ assert get_request_grid_idx("ReplaceIfEquals") == grid_idx
+
+
+@pytest.mark.skip(reason="Custom key objects are not supported yet")
+def test_cache_operation_on_complex_key_routes_request_to_primary_node():
+ pass
+
+
+@pytest.mark.parametrize("key,grid_idx", [(1, 2), (2, 1), (3, 1), (4, 2), (5, 2), (6, 3)])
+@pytest.mark.skip(reason="Custom key objects are not supported yet")
+def test_cache_operation_on_custom_affinity_key_routes_request_to_primary_node(
+ request, client_partition_aware, key, grid_idx):
+ class AffinityTestType1(
+ metaclass=GenericObjectMeta,
+ type_name='AffinityTestType1',
+ schema=OrderedDict([
+ ('test_str', String),
+ ('test_int', LongObject)
+ ])
+ ):
+ pass
+
+ cache_config = {
+ PROP_NAME: request.node.name,
+ PROP_CACHE_KEY_CONFIGURATION: [
+ {
+ 'type_name': 'AffinityTestType1',
+ 'affinity_key_field_name': 'test_int',
+ },
+ ],
+ }
+ cache = client_partition_aware.create_cache(cache_config)
+
+ # noinspection PyArgumentList
+ key_obj = AffinityTestType1(
+ test_str="abc",
+ test_int=key
+ )
+
+ cache.put(key_obj, 1)
+ cache.put(key_obj, 2)
+
+ assert get_request_grid_idx("Put") == grid_idx
+
+
+@pytest.mark.skip("https://issues.apache.org/jira/browse/IGNITE-13967")
+def test_cache_operation_routed_to_new_cluster_node(request, start_ignite_server, start_client):
+ client = start_client(partition_aware=True)
+ client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802), ("127.0.0.1", 10803), ("127.0.0.1", 10804)])
+ cache = client.get_or_create_cache(request.node.name)
+ key = 12
+ cache.put(key, key)
+ cache.put(key, key)
+ assert get_request_grid_idx("Put") == 3
+
+ srv = start_ignite_server(4)
+ try:
+ # Wait for rebalance and partition map exchange
+ def check_grid_idx():
+ cache.get(key)
+ return get_request_grid_idx() == 4
+ wait_for_condition(check_grid_idx)
+
+ # Response is correct and comes from the new node
+ res = cache.get_and_remove(key)
+ assert res == key
+ assert get_request_grid_idx("GetAndRemove") == 4
+ finally:
+ kill_process_tree(srv.pid)
+
+
+def test_unsupported_affinity_cache_operation_routed_to_random_node(client_partition_aware):
+ verify_random_node(client_partition_aware.get_cache("custom-affinity"))
+
+
+def test_replicated_cache_operation_routed_to_random_node(request, client_partition_aware):
+ cache = client_partition_aware.get_or_create_cache({
+ PROP_NAME: request.node.name,
+ PROP_CACHE_MODE: CacheMode.REPLICATED,
+ })
+
+ verify_random_node(cache)
+
+
+def verify_random_node(cache):
+ key = 1
+ cache.put(key, key)
+
+ idx1 = get_request_grid_idx("Put")
+ idx2 = idx1
+
+ # Try 10 times - random node may end up being the same
+ for _ in range(1, 10):
+ cache.put(key, key)
+ idx2 = get_request_grid_idx("Put")
+ if idx2 != idx1:
+ break
+ assert idx1 != idx2
diff --git a/tests/test_affinity_single_connection.py b/tests/test_affinity_single_connection.py
new file mode 100644
index 0000000..c40393c
--- /dev/null
+++ b/tests/test_affinity_single_connection.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from tests.util import get_request_grid_idx
+
+
+def test_all_cache_operations_with_partition_aware_client_on_single_server(request, client_partition_aware_single_server):
+ cache = client_partition_aware_single_server.get_or_create_cache(request.node.name)
+ key = 1
+ key2 = 2
+
+ # Put/Get
+ cache.put(key, key)
+ assert cache.get(key) == key
+
+ # Replace
+ res = cache.replace(key, key2)
+ assert res
+ assert cache.get(key) == key2
+
+ # Clear
+ cache.put(key2, key2)
+ cache.clear_key(key2)
+ assert cache.get(key2) is None
+
+ # ContainsKey
+ assert cache.contains_key(key)
+ assert not cache.contains_key(key2)
+
+ # GetAndPut
+ cache.put(key, key)
+ res = cache.get_and_put(key, key2)
+ assert res == key
+ assert cache.get(key) == key2
+
+ # GetAndPutIfAbsent
+ cache.clear_key(key)
+ res = cache.get_and_put_if_absent(key, key)
+ res2 = cache.get_and_put_if_absent(key, key2)
+ assert res is None
+ assert res2 == key
+ assert cache.get(key) == key
+
+ # PutIfAbsent
+ cache.clear_key(key)
+ res = cache.put_if_absent(key, key)
+ res2 = cache.put_if_absent(key, key2)
+ assert res
+ assert not res2
+ assert cache.get(key) == key
+
+ # GetAndRemove
+ cache.put(key, key)
+ res = cache.get_and_remove(key)
+ assert res == key
+ assert cache.get(key) is None
+
+ # GetAndReplace
+ cache.put(key, key)
+ res = cache.get_and_replace(key, key2)
+ assert res == key
+ assert cache.get(key) == key2
+
+ # RemoveKey
+ cache.put(key, key)
+ cache.remove_key(key)
+ assert cache.get(key) is None
+
+ # RemoveIfEquals
+ cache.put(key, key)
+ res = cache.remove_if_equals(key, key2)
+ res2 = cache.remove_if_equals(key, key)
+ assert not res
+ assert res2
+ assert cache.get(key) is None
+
+ # Replace
+ cache.put(key, key)
+ cache.replace(key, key2)
+ assert cache.get(key) == key2
+
+ # ReplaceIfEquals
+ cache.put(key, key)
+ res = cache.replace_if_equals(key, key2, key2)
+ res2 = cache.replace_if_equals(key, key, key2)
+ assert not res
+ assert res2
+ assert cache.get(key) == key2
diff --git a/tests/test_binary.py b/tests/test_binary.py
index 29ccf68..1c051f0 100644
--- a/tests/test_binary.py
+++ b/tests/test_binary.py
@@ -278,3 +278,29 @@
assert not hasattr(result, 'test_bool')
migrate_cache.destroy()
+
+
+def test_complex_object_names(client):
+ """
+ Test the ability to work with Complex types, which names contains symbols
+ not suitable for use in Python identifiers.
+ """
+ type_name = 'Non.Pythonic#type-name$'
+ key = 'key'
+ data = 'test'
+
+ class NonPythonicallyNamedType(
+ metaclass=GenericObjectMeta,
+ type_name=type_name,
+ schema=OrderedDict([
+ ('field', String),
+ ])
+ ):
+ pass
+
+ cache = client.get_or_create_cache('test_name_cache')
+ cache.put(key, NonPythonicallyNamedType(field=data))
+
+ obj = cache.get(key)
+ assert obj.type_name == type_name, 'Complex type name mismatch'
+ assert obj.field == data, 'Complex object data failure'
diff --git a/tests/test_cache_class.py b/tests/test_cache_class.py
index 22865be..1df0d44 100644
--- a/tests/test_cache_class.py
+++ b/tests/test_cache_class.py
@@ -23,7 +23,7 @@
BoolObject, DecimalObject, FloatObject, IntObject, String,
)
from pyignite.datatypes.prop_codes import *
-from pyignite.exceptions import CacheError
+from pyignite.exceptions import CacheError, ParameterError
def test_cache_create(client):
@@ -178,7 +178,7 @@
@pytest.mark.parametrize('page_size', range(1, 17, 5))
-def test_cache_scan(client, page_size):
+def test_cache_scan(request, client, page_size):
test_data = {
1: 'This is a test',
2: 'One more test',
@@ -197,7 +197,7 @@
15: 'sollicitudin iaculis',
}
- cache = client.get_or_create_cache('my_oop_cache')
+ cache = client.get_or_create_cache(request.node.name)
cache.put_all(test_data)
gen = cache.scan(page_size=page_size)
@@ -219,3 +219,18 @@
cache.put('my_key', 43)
value = cache.get_and_put_if_absent('my_key', 42)
assert value is 43
+
+
+def test_cache_get_when_cache_does_not_exist(client):
+ cache = client.get_cache('missing-cache')
+ with pytest.raises(CacheError) as e_info:
+ cache.put(1, 1)
+ assert str(e_info.value) == "Cache does not exist [cacheId= 1665146971]"
+
+
+def test_cache_create_with_none_name(client):
+ with pytest.raises(ParameterError) as e_info:
+ client.create_cache(None)
+ assert str(e_info.value) == "You should supply at least cache name"
+
+
diff --git a/tests/test_cache_config.py b/tests/test_cache_config.py
index 2f01618..b708b0c 100644
--- a/tests/test_cache_config.py
+++ b/tests/test_cache_config.py
@@ -19,10 +19,12 @@
def test_get_configuration(client):
- result = cache_get_or_create(client, 'my_unique_cache')
+ conn = client.random_node
+
+ result = cache_get_or_create(conn, 'my_unique_cache')
assert result.status == 0
- result = cache_get_configuration(client, 'my_unique_cache')
+ result = cache_get_configuration(conn, 'my_unique_cache')
assert result.status == 0
assert result.value[PROP_NAME] == 'my_unique_cache'
@@ -30,8 +32,9 @@
def test_create_with_config(client):
cache_name = 'my_very_unique_name'
+ conn = client.random_node
- result = cache_create_with_config(client, {
+ result = cache_create_with_config(conn, {
PROP_NAME: cache_name,
PROP_CACHE_KEY_CONFIGURATION: [
{
@@ -42,10 +45,10 @@
})
assert result.status == 0
- result = cache_get_names(client)
+ result = cache_get_names(conn)
assert cache_name in result.value
- result = cache_create_with_config(client, {
+ result = cache_create_with_config(conn, {
PROP_NAME: cache_name,
})
assert result.status != 0
@@ -54,8 +57,9 @@
def test_get_or_create_with_config(client):
cache_name = 'my_very_unique_name'
+ conn = client.random_node
- result = cache_get_or_create_with_config(client, {
+ result = cache_get_or_create_with_config(conn, {
PROP_NAME: cache_name,
PROP_CACHE_KEY_CONFIGURATION: [
{
@@ -66,10 +70,10 @@
})
assert result.status == 0
- result = cache_get_names(client)
+ result = cache_get_names(conn)
assert cache_name in result.value
- result = cache_get_or_create_with_config(client, {
+ result = cache_get_or_create_with_config(conn, {
PROP_NAME: cache_name,
})
assert result.status == 0
diff --git a/tests/test_datatypes.py b/tests/test_datatypes.py
index b68ba8c..ae66c38 100644
--- a/tests/test_datatypes.py
+++ b/tests/test_datatypes.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from collections import OrderedDict
+import ctypes
from datetime import datetime, timedelta
import decimal
import pytest
@@ -20,6 +22,7 @@
from pyignite.api.key_value import cache_get, cache_put
from pyignite.datatypes import *
+from pyignite.utils import unsigned
@pytest.mark.parametrize(
@@ -47,7 +50,9 @@
# arrays of integers
([1, 2, 3, 5], None),
- ([1, 2, 3, 5], ByteArrayObject),
+ (b'buzz', ByteArrayObject),
+ (bytearray([7, 8, 8, 11]), None),
+ (bytearray([7, 8, 8, 11]), ByteArrayObject),
([1, 2, 3, 5], ShortArrayObject),
([1, 2, 3, 5], IntArrayObject),
@@ -114,27 +119,57 @@
((-1, [(6001, 1), (6002, 2), (6003, 3)]), BinaryEnumArrayObject),
# object array
- ((-1, [1, 2, decimal.Decimal('3')]), None),
+ ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3')]), ObjectArrayObject),
# collection
- ((3, [1, 2, 3]), CollectionObject),
+ ((CollectionObject.LINKED_LIST, [1, 2, 3]), None),
# map
- ((1, {'key': 4, 5: 6.0}), None),
- ((2, {'key': 4, 5: 6.0}), None),
+ ((MapObject.HASH_MAP, {'key': 4, 5: 6.0}), None),
+ ((MapObject.LINKED_HASH_MAP, OrderedDict([('key', 4), (5, 6.0)])), None),
]
)
def test_put_get_data(client, cache, value, value_hint):
- result = cache_put(client, cache, 'my_key', value, value_hint=value_hint)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_key', value, value_hint=value_hint)
assert result.status == 0
- result = cache_get(client, cache, 'my_key')
+ result = cache_get(conn, cache, 'my_key')
assert result.status == 0
assert result.value == value
@pytest.mark.parametrize(
+ 'value',
+ [
+ [1, 2, 3, 5],
+ (7, 8, 13, 18),
+ (-128, -1, 0, 1, 127, 255),
+ ]
+)
+def test_bytearray_from_list_or_tuple(client, cache, value):
+ """
+ ByteArrayObject's pythonic type is `bytearray`, but it should also accept
+ lists or tuples as a content.
+ """
+
+ conn = client.random_node
+
+ result = cache_put(
+ conn, cache, 'my_key', value, value_hint=ByteArrayObject
+ )
+ assert result.status == 0
+
+ result = cache_get(conn, cache, 'my_key')
+ assert result.status == 0
+ assert result.value == bytearray([
+ unsigned(ch, ctypes.c_ubyte) for ch in value
+ ])
+
+
+@pytest.mark.parametrize(
'uuid_string',
[
'd57babad-7bc1-4c82-9f9c-e72841b92a85',
diff --git a/tests/test_get_names.py b/tests/test_get_names.py
index 0e50f3d..2d6c0bc 100644
--- a/tests/test_get_names.py
+++ b/tests/test_get_names.py
@@ -18,11 +18,13 @@
def test_get_names(client):
+ conn = client.random_node
+
bucket_names = ['my_bucket', 'my_bucket_2', 'my_bucket_3']
for name in bucket_names:
- cache_create(client, name)
+ cache_create(conn, name)
- result = cache_get_names(client)
+ result = cache_get_names(conn)
assert result.status == 0
assert type(result.value) == list
assert len(result.value) >= len(bucket_names)
diff --git a/tests/test_handshake.py b/tests/test_handshake.py
deleted file mode 100644
index d655d94..0000000
--- a/tests/test_handshake.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import socket
-
-from pyignite import Client
-from pyignite.connection.handshake import HandshakeRequest, read_response
-
-
-def test_handshake(
- monkeypatch,
- ignite_host, ignite_port, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile,
- ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version,
- username, password,
-):
- client = Client(
- use_ssl=use_ssl,
- ssl_keyfile=ssl_keyfile,
- ssl_keyfile_password=ssl_keyfile_password,
- ssl_certfile=ssl_certfile,
- ssl_ca_certfile=ssl_ca_certfile,
- ssl_cert_reqs=ssl_cert_reqs,
- ssl_ciphers=ssl_ciphers,
- ssl_version=ssl_version,
- username=username,
- password=password,
- )
- client._socket = client._wrap(
- socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- )
- client.socket.connect((ignite_host, ignite_port))
- hs_request = HandshakeRequest(username, password)
- client.send(hs_request)
- hs_response = read_response(client)
- assert hs_response['op_code'] != 0
-
- client.close()
-
- # intentionally pass wrong protocol version
- from pyignite.connection import handshake
- monkeypatch.setattr(handshake, 'PROTOCOL_VERSION_MAJOR', 10)
-
- client._socket = client._wrap(
- socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- )
- client.socket.connect((ignite_host, ignite_port))
- hs_request = HandshakeRequest(username, password)
- client.send(hs_request)
- hs_response = read_response(client)
- assert hs_response['op_code'] == 0
-
- client.close()
diff --git a/tests/test_key_value.py b/tests/test_key_value.py
index 6b4fb0e..a7edce1 100644
--- a/tests/test_key_value.py
+++ b/tests/test_key_value.py
@@ -23,30 +23,36 @@
def test_put_get(client, cache):
- result = cache_put(client, cache, 'my_key', 5)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_key', 5)
assert result.status == 0
- result = cache_get(client, cache, 'my_key')
+ result = cache_get(conn, cache, 'my_key')
assert result.status == 0
assert result.value == 5
def test_get_all(client, cache):
- result = cache_get_all(client, cache, ['key_1', 2, (3, IntObject)])
+ conn = client.random_node
+
+ result = cache_get_all(conn, cache, ['key_1', 2, (3, IntObject)])
assert result.status == 0
assert result.value == {}
- cache_put(client, cache, 'key_1', 4)
- cache_put(client, cache, 3, 18, key_hint=IntObject)
+ cache_put(conn, cache, 'key_1', 4)
+ cache_put(conn, cache, 3, 18, key_hint=IntObject)
- result = cache_get_all(client, cache, ['key_1', 2, (3, IntObject)])
+ result = cache_get_all(conn, cache, ['key_1', 2, (3, IntObject)])
assert result.status == 0
assert result.value == {'key_1': 4, 3: 18}
def test_put_all(client, cache):
+ conn = client.random_node
+
test_dict = {
1: 2,
'key_1': 4,
@@ -54,10 +60,10 @@
}
test_keys = ['key_1', 1, 3]
- result = cache_put_all(client, cache, test_dict)
+ result = cache_put_all(conn, cache, test_dict)
assert result.status == 0
- result = cache_get_all(client, cache, test_keys)
+ result = cache_get_all(conn, cache, test_keys)
assert result.status == 0
assert len(test_dict) == 3
@@ -67,266 +73,300 @@
def test_contains_key(client, cache):
- cache_put(client, cache, 'test_key', 42)
+ conn = client.random_node
- result = cache_contains_key(client, cache, 'test_key')
+ cache_put(conn, cache, 'test_key', 42)
+
+ result = cache_contains_key(conn, cache, 'test_key')
assert result.value is True
- result = cache_contains_key(client, cache, 'non-existant-key')
+ result = cache_contains_key(conn, cache, 'non-existant-key')
assert result.value is False
def test_contains_keys(client, cache):
- cache_put(client, cache, 5, 6)
- cache_put(client, cache, 'test_key', 42)
+ conn = client.random_node
- result = cache_contains_keys(client, cache, [5, 'test_key'])
+ cache_put(conn, cache, 5, 6)
+ cache_put(conn, cache, 'test_key', 42)
+
+ result = cache_contains_keys(conn, cache, [5, 'test_key'])
assert result.value is True
- result = cache_contains_keys(client, cache, [5, 'non-existent-key'])
+ result = cache_contains_keys(conn, cache, [5, 'non-existent-key'])
assert result.value is False
def test_get_and_put(client, cache):
- result = cache_get_and_put(client, cache, 'test_key', 42)
+ conn = client.random_node
+
+ result = cache_get_and_put(conn, cache, 'test_key', 42)
assert result.status == 0
assert result.value is None
- result = cache_get(client, cache, 'test_key')
+ result = cache_get(conn, cache, 'test_key')
assert result.status == 0
assert result.value is 42
- result = cache_get_and_put(client, cache, 'test_key', 1234)
+ result = cache_get_and_put(conn, cache, 'test_key', 1234)
assert result.status == 0
assert result.value == 42
def test_get_and_replace(client, cache):
- result = cache_get_and_replace(client, cache, 'test_key', 42)
+ conn = client.random_node
+
+ result = cache_get_and_replace(conn, cache, 'test_key', 42)
assert result.status == 0
assert result.value is None
- result = cache_get(client, cache, 'test_key')
+ result = cache_get(conn, cache, 'test_key')
assert result.status == 0
assert result.value is None
- cache_put(client, cache, 'test_key', 42)
+ cache_put(conn, cache, 'test_key', 42)
- result = cache_get_and_replace(client, cache, 'test_key', 1234)
+ result = cache_get_and_replace(conn, cache, 'test_key', 1234)
assert result.status == 0
assert result.value == 42
def test_get_and_remove(client, cache):
- result = cache_get_and_remove(client, cache, 'test_key')
+ conn = client.random_node
+
+ result = cache_get_and_remove(conn, cache, 'test_key')
assert result.status == 0
assert result.value is None
- cache_put(client, cache, 'test_key', 42)
+ cache_put(conn, cache, 'test_key', 42)
- result = cache_get_and_remove(client, cache, 'test_key')
+ result = cache_get_and_remove(conn, cache, 'test_key')
assert result.status == 0
assert result.value == 42
def test_put_if_absent(client, cache):
- result = cache_put_if_absent(client, cache, 'test_key', 42)
+ conn = client.random_node
+
+ result = cache_put_if_absent(conn, cache, 'test_key', 42)
assert result.status == 0
assert result.value is True
- result = cache_put_if_absent(client, cache, 'test_key', 1234)
+ result = cache_put_if_absent(conn, cache, 'test_key', 1234)
assert result.status == 0
assert result.value is False
def test_get_and_put_if_absent(client, cache):
- result = cache_get_and_put_if_absent(client, cache, 'test_key', 42)
+ conn = client.random_node
+
+ result = cache_get_and_put_if_absent(conn, cache, 'test_key', 42)
assert result.status == 0
assert result.value is None
- result = cache_get_and_put_if_absent(client, cache, 'test_key', 1234)
+ result = cache_get_and_put_if_absent(conn, cache, 'test_key', 1234)
assert result.status == 0
assert result.value == 42
- result = cache_get_and_put_if_absent(client, cache, 'test_key', 5678)
+ result = cache_get_and_put_if_absent(conn, cache, 'test_key', 5678)
assert result.status == 0
assert result.value == 42
def test_replace(client, cache):
- result = cache_replace(client, cache, 'test_key', 42)
+ conn = client.random_node
+
+ result = cache_replace(conn, cache, 'test_key', 42)
assert result.status == 0
assert result.value is False
- cache_put(client, cache, 'test_key', 1234)
+ cache_put(conn, cache, 'test_key', 1234)
- result = cache_replace(client, cache, 'test_key', 42)
+ result = cache_replace(conn, cache, 'test_key', 42)
assert result.status == 0
assert result.value is True
- result = cache_get(client, cache, 'test_key')
+ result = cache_get(conn, cache, 'test_key')
assert result.status == 0
assert result.value == 42
def test_replace_if_equals(client, cache):
- result = cache_replace_if_equals(client, cache, 'my_test', 42, 1234)
+ conn = client.random_node
+
+ result = cache_replace_if_equals(conn, cache, 'my_test', 42, 1234)
assert result.status == 0
assert result.value is False
- cache_put(client, cache, 'my_test', 42)
+ cache_put(conn, cache, 'my_test', 42)
- result = cache_replace_if_equals(client, cache, 'my_test', 42, 1234)
+ result = cache_replace_if_equals(conn, cache, 'my_test', 42, 1234)
assert result.status == 0
assert result.value is True
- result = cache_get(client, cache, 'my_test')
+ result = cache_get(conn, cache, 'my_test')
assert result.status == 0
assert result.value == 1234
def test_clear(client, cache):
- result = cache_put(client, cache, 'my_test', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test', 42)
assert result.status == 0
- result = cache_clear(client, cache)
+ result = cache_clear(conn, cache)
assert result.status == 0
- result = cache_get(client, cache, 'my_test')
+ result = cache_get(conn, cache, 'my_test')
assert result.status == 0
assert result.value is None
def test_clear_key(client, cache):
- result = cache_put(client, cache, 'my_test', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test', 42)
assert result.status == 0
- result = cache_put(client, cache, 'another_test', 24)
+ result = cache_put(conn, cache, 'another_test', 24)
assert result.status == 0
- result = cache_clear_key(client, cache, 'my_test')
+ result = cache_clear_key(conn, cache, 'my_test')
assert result.status == 0
- result = cache_get(client, cache, 'my_test')
+ result = cache_get(conn, cache, 'my_test')
assert result.status == 0
assert result.value is None
- result = cache_get(client, cache, 'another_test')
+ result = cache_get(conn, cache, 'another_test')
assert result.status == 0
assert result.value == 24
def test_clear_keys(client, cache):
- result = cache_put(client, cache, 'my_test_key', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test_key', 42)
assert result.status == 0
- result = cache_put(client, cache, 'another_test', 24)
+ result = cache_put(conn, cache, 'another_test', 24)
assert result.status == 0
- result = cache_clear_keys(client, cache, [
+ result = cache_clear_keys(conn, cache, [
'my_test_key',
'nonexistent_key',
])
assert result.status == 0
- result = cache_get(client, cache, 'my_test_key')
+ result = cache_get(conn, cache, 'my_test_key')
assert result.status == 0
assert result.value is None
- result = cache_get(client, cache, 'another_test')
+ result = cache_get(conn, cache, 'another_test')
assert result.status == 0
assert result.value == 24
def test_remove_key(client, cache):
- result = cache_put(client, cache, 'my_test_key', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test_key', 42)
assert result.status == 0
- result = cache_remove_key(client, cache, 'my_test_key')
+ result = cache_remove_key(conn, cache, 'my_test_key')
assert result.status == 0
assert result.value is True
- result = cache_remove_key(client, cache, 'non_existent_key')
+ result = cache_remove_key(conn, cache, 'non_existent_key')
assert result.status == 0
assert result.value is False
def test_remove_if_equals(client, cache):
- result = cache_put(client, cache, 'my_test', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test', 42)
assert result.status == 0
- result = cache_remove_if_equals(client, cache, 'my_test', 1234)
+ result = cache_remove_if_equals(conn, cache, 'my_test', 1234)
assert result.status == 0
assert result.value is False
- result = cache_remove_if_equals(client, cache, 'my_test', 42)
+ result = cache_remove_if_equals(conn, cache, 'my_test', 42)
assert result.status == 0
assert result.value is True
- result = cache_get(client, cache, 'my_test')
+ result = cache_get(conn, cache, 'my_test')
assert result.status == 0
assert result.value is None
def test_remove_keys(client, cache):
- result = cache_put(client, cache, 'my_test', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test', 42)
assert result.status == 0
- result = cache_put(client, cache, 'another_test', 24)
+ result = cache_put(conn, cache, 'another_test', 24)
assert result.status == 0
- result = cache_remove_keys(client, cache, ['my_test', 'non_existent'])
+ result = cache_remove_keys(conn, cache, ['my_test', 'non_existent'])
assert result.status == 0
- result = cache_get(client, cache, 'my_test')
+ result = cache_get(conn, cache, 'my_test')
assert result.status == 0
assert result.value is None
- result = cache_get(client, cache, 'another_test')
+ result = cache_get(conn, cache, 'another_test')
assert result.status == 0
assert result.value == 24
def test_remove_all(client, cache):
- result = cache_put(client, cache, 'my_test', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test', 42)
assert result.status == 0
- result = cache_put(client, cache, 'another_test', 24)
+ result = cache_put(conn, cache, 'another_test', 24)
assert result.status == 0
- result = cache_remove_all(client, cache)
+ result = cache_remove_all(conn, cache)
assert result.status == 0
- result = cache_get(client, cache, 'my_test')
+ result = cache_get(conn, cache, 'my_test')
assert result.status == 0
assert result.value is None
- result = cache_get(client, cache, 'another_test')
+ result = cache_get(conn, cache, 'another_test')
assert result.status == 0
assert result.value is None
def test_cache_get_size(client, cache):
- result = cache_put(client, cache, 'my_test', 42)
+ conn = client.random_node
+
+ result = cache_put(conn, cache, 'my_test', 42)
assert result.status == 0
- result = cache_get_size(client, cache)
+ result = cache_get_size(conn, cache)
assert result.status == 0
assert result.value == 1
diff --git a/tests/test_scan.py b/tests/test_scan.py
index 77e9613..2f0e056 100644
--- a/tests/test_scan.py
+++ b/tests/test_scan.py
@@ -20,47 +20,49 @@
def test_scan(client, cache):
+ conn = client.random_node
page_size = 10
- result = cache_put_all(client, cache, {
+ result = cache_put_all(conn, cache, {
'key_{}'.format(v): v for v in range(page_size * 2)
})
assert result.status == 0
- result = scan(client, cache, page_size)
+ result = scan(conn, cache, page_size)
assert result.status == 0
assert len(result.value['data']) == page_size
assert result.value['more'] is True
cursor = result.value['cursor']
- result = scan_cursor_get_page(client, cursor)
+ result = scan_cursor_get_page(conn, cursor)
assert result.status == 0
assert len(result.value['data']) == page_size
assert result.value['more'] is False
- result = scan_cursor_get_page(client, cursor)
+ result = scan_cursor_get_page(conn, cursor)
assert result.status != 0
def test_close_resource(client, cache):
+ conn = client.random_node
page_size = 10
- result = cache_put_all(client, cache, {
+ result = cache_put_all(conn, cache, {
'key_{}'.format(v): v for v in range(page_size * 2)
})
assert result.status == 0
- result = scan(client, cache, page_size)
+ result = scan(conn, cache, page_size)
assert result.status == 0
assert len(result.value['data']) == page_size
assert result.value['more'] is True
cursor = result.value['cursor']
- result = resource_close(client, cursor)
+ result = resource_close(conn, cursor)
assert result.status == 0
- result = scan_cursor_get_page(client, cursor)
+ result = scan_cursor_get_page(conn, cursor)
assert result.status != 0
diff --git a/tests/test_sql.py b/tests/test_sql.py
index d983a20..87383d3 100644
--- a/tests/test_sql.py
+++ b/tests/test_sql.py
@@ -47,11 +47,13 @@
def test_sql(client):
+ conn = client.random_node
+
# cleanup
client.sql(drop_query)
result = sql_fields(
- client,
+ conn,
'PUBLIC',
create_query,
page_size,
@@ -62,7 +64,7 @@
for i, data_line in enumerate(initial_data, start=1):
fname, lname, grade = data_line
result = sql_fields(
- client,
+ conn,
'PUBLIC',
insert_query,
page_size,
@@ -71,12 +73,12 @@
)
assert result.status == 0, result.message
- result = cache_get_configuration(client, 'SQL_PUBLIC_STUDENT')
+ result = cache_get_configuration(conn, 'SQL_PUBLIC_STUDENT')
assert result.status == 0, result.message
binary_type_name = result.value[PROP_QUERY_ENTITIES][0]['value_type_name']
result = sql(
- client,
+ conn,
'SQL_PUBLIC_STUDENT',
binary_type_name,
'TRUE',
@@ -93,7 +95,7 @@
cursor = result.value['cursor']
while result.value['more']:
- result = sql_cursor_get_page(client, cursor)
+ result = sql_cursor_get_page(conn, cursor)
assert result.status == 0, result.message
for wrapped_object in result.value['data'].values():
@@ -101,17 +103,19 @@
assert data.type_id == entity_id(binary_type_name)
# repeat cleanup
- result = sql_fields(client, 'PUBLIC', drop_query, page_size)
+ result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
assert result.status == 0
def test_sql_fields(client):
+ conn = client.random_node
+
# cleanup
client.sql(drop_query)
result = sql_fields(
- client,
+ conn,
'PUBLIC',
create_query,
page_size,
@@ -122,7 +126,7 @@
for i, data_line in enumerate(initial_data, start=1):
fname, lname, grade = data_line
result = sql_fields(
- client,
+ conn,
'PUBLIC',
insert_query,
page_size,
@@ -132,7 +136,7 @@
assert result.status == 0, result.message
result = sql_fields(
- client,
+ conn,
'PUBLIC',
select_query,
page_size,
@@ -144,13 +148,13 @@
cursor = result.value['cursor']
- result = sql_fields_cursor_get_page(client, cursor, field_count=4)
+ result = sql_fields_cursor_get_page(conn, cursor, field_count=4)
assert result.status == 0
assert len(result.value['data']) == len(initial_data) - page_size
assert result.value['more'] is False
# repeat cleanup
- result = sql_fields(client, 'PUBLIC', drop_query, page_size)
+ result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
assert result.status == 0
diff --git a/tests/util.py b/tests/util.py
new file mode 100644
index 0000000..1d6acd6
--- /dev/null
+++ b/tests/util.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import glob
+import os
+import psutil
+import re
+import signal
+import subprocess
+import time
+
+
+def wait_for_condition(condition, interval=0.1, timeout=10, error=None):
+ start = time.time()
+ res = condition()
+
+ while not res and time.time() - start < timeout:
+ time.sleep(interval)
+ res = condition()
+
+ if res:
+ return True
+
+ if error is not None:
+ raise Exception(error)
+
+ return False
+
+
+def is_windows():
+ return os.name == "nt"
+
+
+def get_test_dir():
+ return os.path.dirname(os.path.realpath(__file__))
+
+
+def get_ignite_dirs():
+ ignite_home = os.getenv("IGNITE_HOME")
+ if ignite_home is not None:
+ yield ignite_home
+
+ proj_dir = os.path.abspath(os.path.join(get_test_dir(), "..", ".."))
+ yield os.path.join(proj_dir, "ignite")
+ yield os.path.join(proj_dir, "incubator_ignite")
+
+
+def get_ignite_runner():
+ ext = ".bat" if is_windows() else ".sh"
+ for ignite_dir in get_ignite_dirs():
+ runner = os.path.join(ignite_dir, "bin", "ignite" + ext)
+ print("Probing Ignite runner at '{0}'...".format(runner))
+ if os.path.exists(runner):
+ return runner
+
+ raise Exception(f"Ignite not found. IGNITE_HOME {os.getenv('IGNITE_HOME')}")
+
+
+def get_ignite_config_path(use_ssl=False):
+ if use_ssl:
+ file_name = "ignite-config-ssl.xml"
+ else:
+ file_name = "ignite-config.xml"
+
+ return os.path.join(get_test_dir(), "config", file_name)
+
+
+def check_server_started(idx=1):
+ log_file = os.path.join(get_test_dir(), "logs", f"ignite-log-{idx}.txt")
+ if not os.path.exists(log_file):
+ return False
+
+ pattern = re.compile('^Topology snapshot.*')
+
+ with open(log_file) as f:
+ for line in f.readlines():
+ if pattern.match(line):
+ return True
+
+ return False
+
+
+def kill_process_tree(pid):
+ if is_windows():
+ subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)])
+ else:
+ children = psutil.Process(pid).children(recursive=True)
+ for child in children:
+ os.kill(child.pid, signal.SIGKILL)
+ os.kill(pid, signal.SIGKILL)
+
+
+def _start_ignite(idx=1, debug=False, use_ssl=False):
+ clear_logs(idx)
+
+ runner = get_ignite_runner()
+
+ env = os.environ.copy()
+ env['IGNITE_INSTANCE_INDEX'] = str(idx)
+ env['IGNITE_CLIENT_PORT'] = str(10800 + idx)
+
+ if debug:
+ env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
+ "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
+
+ ignite_cmd = [runner, get_ignite_config_path(use_ssl)]
+ print("Starting Ignite server node:", ignite_cmd)
+
+ srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
+
+ started = wait_for_condition(lambda: check_server_started(idx), timeout=30)
+ if started:
+ return srv
+
+ kill_process_tree(srv.pid)
+ raise Exception("Failed to start Ignite: timeout while trying to connect")
+
+
+def start_ignite_gen(idx=1, use_ssl=False):
+ srv = _start_ignite(idx, use_ssl=use_ssl)
+ yield srv
+ kill_process_tree(srv.pid)
+
+
+def get_log_files(idx=1):
+ logs_pattern = os.path.join(get_test_dir(), "logs", "ignite-log-{0}*.txt".format(idx))
+ return glob.glob(logs_pattern)
+
+
+def clear_logs(idx=1):
+ for f in get_log_files(idx):
+ os.remove(f)
+
+
+def read_log_file(file, idx):
+ i = -1
+ with open(file) as f:
+ lines = f.readlines()
+ for line in lines:
+ i += 1
+
+ if i < read_log_file.last_line[idx]:
+ continue
+
+ if i > read_log_file.last_line[idx]:
+ read_log_file.last_line[idx] = i
+
+ # Example: Client request received [reqId=1, addr=/127.0.0.1:51694,
+ # req=org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest@1f33101e]
+ res = re.match("Client request received .*?req=org.apache.ignite.internal.processors."
+ "platform.client.cache.ClientCache([a-zA-Z]+)Request@", line)
+
+ if res is not None:
+ yield res.group(1)
+
+
+def get_request_grid_idx(message="Get"):
+ res = -1
+ for i in range(1, 5):
+ for log_file in get_log_files(i):
+ for log in read_log_file(log_file, i):
+ if log == message:
+ res = i # Do not exit early to advance all log positions
+ return res
+
+
+read_log_file.last_line = [0, 0, 0, 0, 0]
\ No newline at end of file
diff --git a/tox.ini b/tox.ini
index 6e70234..69db226 100644
--- a/tox.ini
+++ b/tox.ini
@@ -15,23 +15,22 @@
[tox]
skipsdist = True
-envlist = py{36,37,38}-{no-ssl,ssl,ssl-password}-docker
+envlist = py{36,37,38}-{no-ssl,ssl,ssl-password}
[travis]
python =
- 3.6: py36-{no-ssl,ssl,ssl-password}-docker
- 3.7: py37-{no-ssl,ssl,ssl-password}-docker
- 3.8: py38-{no-ssl,ssl,ssl-password}-docker
+ 3.6: py36-{no-ssl,ssl,ssl-password}
+ 3.7: py37-{no-ssl,ssl,ssl-password}
+ 3.8: py38-{no-ssl,ssl,ssl-password}
[testenv]
-passenv = TEAMCITY_VERSION
+passenv = TEAMCITY_VERSION IGNITE_HOME
envdir = {homedir}/.virtualenvs/pyignite-{envname}
deps =
-r ./requirements/install.txt
-r ./requirements/tests.txt
recreate = True
usedevelop = True
-allowlist_externals = docker-compose
commands =
pytest {env:PYTESTARGS:} {posargs}
@@ -41,45 +40,17 @@
[ssl]
setenv:
- PYTEST_ADDOPTS = --examples --use-ssl=True --ssl-certfile={toxinidir}/tests/config/ssl/client_full.pem
+ PYTEST_ADDOPTS = --examples --use-ssl=True --ssl-certfile={toxinidir}/tests/config/ssl/client_full.pem --ssl-version=TLSV1_2
[ssl-password]
setenv:
- PYTEST_ADDOPTS = --examples --use-ssl=True --ssl-certfile={toxinidir}/tests/config/ssl/client_with_pass_full.pem --ssl-keyfile-password=654321
-
-[docker]
-commands_pre =
- docker-compose down
- docker-compose up -d ignite
-commands_post =
- docker-compose down
-
-[docker-ssl]
-commands_pre =
- docker-compose down
- docker-compose up -d ignite-ssl
-commands_post = {[docker]commands_post}
+ PYTEST_ADDOPTS = --examples --use-ssl=True --ssl-certfile={toxinidir}/tests/config/ssl/client_with_pass_full.pem --ssl-keyfile-password=654321 --ssl-version=TLSV1_2
[testenv:py{36,37,38}-no-ssl]
setenv: {[no-ssl]setenv}
-[testenv:py{36,37,38}-no-ssl-docker]
-commands_pre = {[docker]commands_pre}
-setenv: {[no-ssl]setenv}
-commands_post = {[docker]commands_post}
-
[testenv:py{36,37,38}-ssl]
setenv: {[ssl]setenv}
-[testenv:py{36,37,38}-ssl-docker]
-commands_pre = {[docker-ssl]commands_pre}
-setenv: {[ssl]setenv}
-commands_post = {[docker]commands_post}
-
[testenv:py{36,37,38}-ssl-password]
setenv: {[ssl-password]setenv}
-
-[testenv:py{36,37,38}-ssl-password-docker]
-commands_pre = {[docker-ssl]commands_pre}
-setenv: {[ssl-password]setenv}
-commands_post = {[docker]commands_post}