IGNITE-12467 Implement transactions, rewrite async connections using protocol and transport - Fixes #40.
diff --git a/docs/source/pyignite.datatypes.transactions.rst b/docs/source/pyignite.datatypes.transactions.rst
new file mode 100644
index 0000000..9b38468
--- /dev/null
+++ b/docs/source/pyignite.datatypes.transactions.rst
@@ -0,0 +1,21 @@
+..  Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+..      http://www.apache.org/licenses/LICENSE-2.0
+
+..  Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+pyignite.datatypes.transactions module
+=======================================
+
+.. automodule:: pyignite.datatypes.transactions
+    :members:
+    :show-inheritance:
\ No newline at end of file
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.rst
index c2a36fe..2e52500 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.rst
@@ -41,6 +41,7 @@
     pyignite.aio_client
     pyignite.cluster
     pyignite.aio_cluster
+    pyignite.transaction
     pyignite.cursors
     pyignite.exceptions
 
diff --git a/docs/source/pyignite.transaction.rst b/docs/source/pyignite.transaction.rst
new file mode 100644
index 0000000..7c6b016
--- /dev/null
+++ b/docs/source/pyignite.transaction.rst
@@ -0,0 +1,22 @@
+..  Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+..      http://www.apache.org/licenses/LICENSE-2.0
+
+..  Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+pyignite.transaction module
+=========================
+
+.. automodule:: pyignite.transaction
+    :members:
+    :undoc-members:
+    :show-inheritance:
diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index b6b534b..7a92a9a 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -15,6 +15,7 @@
 import asyncio
 from typing import Any, Iterable, Optional, Union
 
+from .api.tx_api import get_tx_connection
 from .datatypes import ExpiryPolicy
 from .datatypes.internal import AnyDataObject
 from .exceptions import CacheCreationError, CacheError, ParameterError
@@ -91,6 +92,9 @@
         super().__init__(client, name, expiry_policy)
 
     async def _get_best_node(self, key=None, key_hint=None):
+        tx_conn = get_tx_connection()
+        if tx_conn:
+            return tx_conn
         return await self.client.get_best_node(self, key, key_hint)
 
     async def settings(self) -> Optional[dict]:
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 8c2ca56..26d243d 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -14,8 +14,9 @@
 # limitations under the License.
 import asyncio
 import random
+import sys
 from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict
+from typing import Iterable, Type, Union, Any, Dict, Optional
 
 from .aio_cluster import AioCluster
 from .api import cache_get_node_partitions_async
@@ -27,10 +28,11 @@
 from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache
 from .connection import AioConnection
 from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
-from .datatypes import BinaryObject
-from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
-from .queries.query import CacheInfo
+from .datatypes import BinaryObject, TransactionConcurrency, TransactionIsolation
+from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors, NotSupportedError
+from .queries.cache_info import CacheInfo
 from .stream import AioBinaryStream, READ_BACKWARD
+from .transaction import AioTransaction
 from .utils import cache_id, entity_id, status_to_exception
 
 
@@ -471,9 +473,9 @@
         elif isinstance(cache, AioCache):
             c_info = cache.cache_info
         else:
-            c_info = None
+            c_info = CacheInfo(protocol_context=self.protocol_context)
 
-        if c_info:
+        if c_info.cache_id:
             schema = None
 
         return AioSqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
@@ -487,3 +489,21 @@
         :return: :py:class:`~pyignite.aio_cluster.AioCluster` instance.
         """
         return AioCluster(self)
+
+    def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
+                 isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
+                 timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
+        """
+        Start async thin client transaction.
+
+        :param concurrency: (optional) transaction concurrency, see
+                :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
+        :param isolation: (optional) transaction isolation level, see
+                :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
+        :param timeout: (optional) transaction timeout in seconds if float, in millis if int
+        :param label: (optional) transaction label.
+        :return: :py:class:`~pyignite.transaction.AioTransaction` instance.
+        """
+        if sys.version_info < (3, 7):
+            raise NotSupportedError(f"Transactions are not supported in async client on current python {sys.version}")
+        return AioTransaction(self, concurrency, isolation, timeout, label)
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
index ddf1e7a..30e93ff 100644
--- a/pyignite/api/affinity.py
+++ b/pyignite/api/affinity.py
@@ -68,27 +68,23 @@
 ])
 
 
-def cache_get_node_partitions(conn: 'Connection', caches: Union[int, Iterable[int]], query_id: int = None) -> APIResult:
+def cache_get_node_partitions(conn: 'Connection', caches: Union[int, Iterable[int]]) -> APIResult:
     """
     Gets partition mapping for an Ignite cache or a number of caches. See
     “IEP-23: Best Effort Affinity for thin clients”.
 
     :param conn: connection to Ignite server,
-    :param caches: cache ID(s) the mapping is provided for,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
+    :param caches: cache ID(s) the mapping is provided for
     :return: API result data object.
     """
-    return __cache_get_node_partitions(conn, caches, query_id)
+    return __cache_get_node_partitions(conn, caches)
 
 
-async def cache_get_node_partitions_async(conn: 'AioConnection', caches: Union[int, Iterable[int]],
-                                          query_id: int = None) -> APIResult:
+async def cache_get_node_partitions_async(conn: 'AioConnection', caches: Union[int, Iterable[int]]) -> APIResult:
     """
     Async version of cache_get_node_partitions.
     """
-    return await __cache_get_node_partitions(conn, caches, query_id)
+    return await __cache_get_node_partitions(conn, caches)
 
 
 def __post_process_partitions(result):
@@ -135,13 +131,12 @@
     return result
 
 
-def __cache_get_node_partitions(conn, caches, query_id):
+def __cache_get_node_partitions(conn, caches):
     query_struct = Query(
         OP_CACHE_PARTITIONS,
         [
             ('cache_ids', cache_ids),
-        ],
-        query_id=query_id
+        ]
     )
     if not is_iterable(caches):
         caches = [caches]
diff --git a/pyignite/api/binary.py b/pyignite/api/binary.py
index 345e8e8..b49ab8b 100644
--- a/pyignite/api/binary.py
+++ b/pyignite/api/binary.py
@@ -26,34 +26,30 @@
 from ..queries.response import BinaryTypeResponse
 
 
-def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
+def get_binary_type(conn: 'Connection', binary_type: Union[str, int]) -> APIResult:
     """
     Gets the binary type information by type ID.
 
     :param conn: connection to Ignite server,
     :param binary_type: binary type name or ID,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object.
     """
-    return __get_binary_type(conn, binary_type, query_id)
+    return __get_binary_type(conn, binary_type)
 
 
-async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int], query_id=None) -> APIResult:
+async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int]) -> APIResult:
     """
     Async version of get_binary_type.
     """
-    return await __get_binary_type(conn, binary_type, query_id)
+    return await __get_binary_type(conn, binary_type)
 
 
-def __get_binary_type(conn, binary_type, query_id):
+def __get_binary_type(conn, binary_type):
     query_struct = Query(
         OP_GET_BINARY_TYPE,
         [
             ('type_id', Int),
         ],
-        query_id=query_id,
         response_type=BinaryTypeResponse
     )
 
@@ -63,7 +59,7 @@
 
 
 def put_binary_type(connection: 'Connection', type_name: str, affinity_key_field: str = None,
-                    is_enum=False, schema: dict = None, query_id=None) -> APIResult:
+                    is_enum=False, schema: dict = None) -> APIResult:
     """
     Registers binary type information in cluster.
 
@@ -76,12 +72,9 @@
      parameter names as keys and an integers as values. When register binary
      type, pass a dict of field names: field types. Binary type with no fields
      is OK,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object.
     """
-    return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id)
+    return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema)
 
 
 async def put_binary_type_async(connection: 'AioConnection', type_name: str, affinity_key_field: str = None,
@@ -89,7 +82,7 @@
     """
     Async version of put_binary_type.
     """
-    return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id)
+    return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema)
 
 
 def __post_process_put_binary(type_id):
@@ -103,7 +96,7 @@
     return internal
 
 
-def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id):
+def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema):
     # prepare data
     if schema is None:
         schema = {}
@@ -158,8 +151,7 @@
                 ('is_enum', Bool),
                 ('enums', enum_struct),
                 ('schema', schema_struct),
-            ],
-            query_id=query_id,
+            ]
         )
     else:
         query_struct = Query(
@@ -171,8 +163,7 @@
                 ('binary_fields', binary_fields_struct),
                 ('is_enum', Bool),
                 ('schema', schema_struct),
-            ],
-            query_id=query_id,
+            ]
         )
     return query_perform(query_struct, connection, query_params=data,
                          post_process_fun=__post_process_put_binary(type_id))
diff --git a/pyignite/api/cache_config.py b/pyignite/api/cache_config.py
index 7f2869b..d4a5f81 100644
--- a/pyignite/api/cache_config.py
+++ b/pyignite/api/cache_config.py
@@ -39,7 +39,7 @@
 from .result import APIResult
 from ..datatypes.prop_codes import PROP_EXPIRY_POLICY
 from ..exceptions import NotSupportedByClusterError
-from ..queries.query import CacheInfo
+from ..queries.cache_info import CacheInfo
 
 
 def compact_cache_config(cache_config: dict) -> dict:
@@ -60,27 +60,23 @@
     return result
 
 
-def cache_get_configuration(connection: 'Connection', cache_info: CacheInfo, query_id=None) -> 'APIResult':
+def cache_get_configuration(connection: 'Connection', cache_info: CacheInfo) -> 'APIResult':
     """
     Gets configuration for the given cache.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Result value is OrderedDict with
      the cache configuration parameters.
     """
-    return __cache_get_configuration(connection, cache_info, query_id)
+    return __cache_get_configuration(connection, cache_info)
 
 
-async def cache_get_configuration_async(
-        connection: 'AioConnection', cache_info: CacheInfo, query_id=None) -> 'APIResult':
+async def cache_get_configuration_async(connection: 'AioConnection', cache_info: CacheInfo) -> 'APIResult':
     """
     Async version of cache_get_configuration.
     """
-    return await __cache_get_configuration(connection, cache_info, query_id)
+    return await __cache_get_configuration(connection, cache_info)
 
 
 def __post_process_cache_config(result):
@@ -89,13 +85,12 @@
     return result
 
 
-def __cache_get_configuration(connection, cache_info, query_id):
+def __cache_get_configuration(connection, cache_info):
     query_struct = Query(
         OP_CACHE_GET_CONFIGURATION,
         [
             ('cache_info', CacheInfo)
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(query_struct, connection,
                          query_params={
@@ -108,106 +103,94 @@
                          )
 
 
-def cache_create(connection: 'Connection', name: str, query_id=None) -> 'APIResult':
+def cache_create(connection: 'Connection', name: str) -> 'APIResult':
     """
     Creates a cache with a given name. Returns error if a cache with specified
     name already exists.
 
     :param connection: connection to Ignite server,
     :param name: cache name,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if a cache is
      created successfully, non-zero status and an error description otherwise.
     """
 
-    return __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name, query_id)
+    return __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name)
 
 
-async def cache_create_async(connection: 'AioConnection', name: str, query_id=None) -> 'APIResult':
+async def cache_create_async(connection: 'AioConnection', name: str) -> 'APIResult':
     """
     Async version of cache_create.
     """
 
-    return await __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name, query_id)
+    return await __cache_create_with_name(OP_CACHE_CREATE_WITH_NAME, connection, name)
 
 
-def cache_get_or_create(connection: 'Connection', name: str, query_id=None) -> 'APIResult':
+def cache_get_or_create(connection: 'Connection', name: str) -> 'APIResult':
     """
     Creates a cache with a given name. Does nothing if the cache exists.
 
     :param connection: connection to Ignite server,
     :param name: cache name,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if a cache is
      created successfully, non-zero status and an error description otherwise.
     """
 
-    return __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name, query_id)
+    return __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name)
 
 
-async def cache_get_or_create_async(connection: 'AioConnection', name: str, query_id=None) -> 'APIResult':
+async def cache_get_or_create_async(connection: 'AioConnection', name: str) -> 'APIResult':
     """
     Async version of cache_get_or_create.
     """
-    return await __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name, query_id)
+    return await __cache_create_with_name(OP_CACHE_GET_OR_CREATE_WITH_NAME, connection, name)
 
 
-def __cache_create_with_name(op_code, conn, name, query_id):
-    query_struct = Query(op_code, [('cache_name', String)], query_id=query_id)
+def __cache_create_with_name(op_code, conn, name):
+    query_struct = Query(op_code, [('cache_name', String)])
     return query_perform(query_struct, conn, query_params={'cache_name': name})
 
 
-def cache_destroy(connection: 'Connection', cache: Union[str, int], query_id=None) -> 'APIResult':
+def cache_destroy(connection: 'Connection', cache: Union[str, int]) -> 'APIResult':
     """
     Destroys cache with a given name.
 
     :param connection: connection to Ignite server,
     :param cache: name or ID of the cache,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object.
     """
-    return __cache_destroy(connection, cache, query_id)
+    return __cache_destroy(connection, cache)
 
 
-async def cache_destroy_async(connection: 'AioConnection', cache: Union[str, int], query_id=None) -> 'APIResult':
+async def cache_destroy_async(connection: 'AioConnection', cache: Union[str, int]) -> 'APIResult':
     """
     Async version of cache_destroy.
     """
-    return await __cache_destroy(connection, cache, query_id)
+    return await __cache_destroy(connection, cache)
 
 
-def __cache_destroy(connection, cache, query_id):
-    query_struct = Query(OP_CACHE_DESTROY, [('cache_id', Int)], query_id=query_id)
+def __cache_destroy(connection, cache):
+    query_struct = Query(OP_CACHE_DESTROY, [('cache_id', Int)])
 
     return query_perform(query_struct, connection, query_params={'cache_id': cache_id(cache)})
 
 
-def cache_get_names(connection: 'Connection', query_id=None) -> 'APIResult':
+def cache_get_names(connection: 'Connection') -> 'APIResult':
     """
     Gets existing cache names.
 
     :param connection: connection to Ignite server,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a list of cache
      names, non-zero status and an error description otherwise.
     """
 
-    return __cache_get_names(connection, query_id)
+    return __cache_get_names(connection)
 
 
-async def cache_get_names_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
+async def cache_get_names_async(connection: 'AioConnection') -> 'APIResult':
     """
     Async version of cache_get_names.
     """
-    return await __cache_get_names(connection, query_id)
+    return await __cache_get_names(connection)
 
 
 def __post_process_cache_names(result):
@@ -216,14 +199,14 @@
     return result
 
 
-def __cache_get_names(connection, query_id):
-    query_struct = Query(OP_CACHE_GET_NAMES, query_id=query_id)
+def __cache_get_names(connection):
+    query_struct = Query(OP_CACHE_GET_NAMES)
     return query_perform(query_struct, connection,
                          response_config=[('cache_names', StringArray)],
                          post_process_fun=__post_process_cache_names)
 
 
-def cache_create_with_config(connection: 'Connection', cache_props: dict, query_id=None) -> 'APIResult':
+def cache_create_with_config(connection: 'Connection', cache_props: dict) -> 'APIResult':
     """
     Creates cache with provided configuration. An error is returned
     if the name is already in use.
@@ -232,23 +215,20 @@
     :param cache_props: cache configuration properties to create cache with
      in form of dictionary {property code: python value}.
      You must supply at least name (PROP_NAME),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if cache was created,
      non-zero status and an error description otherwise.
     """
-    return __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props, query_id)
+    return __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props)
 
 
-async def cache_create_with_config_async(connection: 'AioConnection', cache_props: dict, query_id=None) -> 'APIResult':
+async def cache_create_with_config_async(connection: 'AioConnection', cache_props: dict) -> 'APIResult':
     """
     Async version of cache_create_with_config.
     """
-    return await __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props, query_id)
+    return await __cache_create_with_config(OP_CACHE_CREATE_WITH_CONFIGURATION, connection, cache_props)
 
 
-def cache_get_or_create_with_config(connection: 'Connection', cache_props: dict, query_id=None) -> 'APIResult':
+def cache_get_or_create_with_config(connection: 'Connection', cache_props: dict) -> 'APIResult':
     """
     Creates cache with provided configuration. Does nothing if the name
     is already in use.
@@ -257,25 +237,20 @@
     :param cache_props: cache configuration properties to create cache with
      in form of dictionary {property code: python value}.
      You must supply at least name (PROP_NAME),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if cache was created,
      non-zero status and an error description otherwise.
     """
-    return __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props, query_id)
+    return __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props)
 
 
-async def cache_get_or_create_with_config_async(connection: 'AioConnection', cache_props: dict,
-                                                query_id=None) -> 'APIResult':
+async def cache_get_or_create_with_config_async(connection: 'AioConnection', cache_props: dict) -> 'APIResult':
     """
     Async version of cache_get_or_create_with_config.
     """
-    return await __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props,
-                                            query_id)
+    return await __cache_create_with_config(OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION, connection, cache_props)
 
 
-def __cache_create_with_config(op_code, connection, cache_props, query_id):
+def __cache_create_with_config(op_code, connection, cache_props):
     prop_types, prop_values = {}, {}
     is_expiry_policy_supported = connection.protocol_context.is_expiry_policy_supported()
     for i, prop_item in enumerate(cache_props.items()):
@@ -289,5 +264,5 @@
     prop_values['param_count'] = len(cache_props)
 
     following = [('param_count', Short)] + list(prop_types.items())
-    query_struct = ConfigQuery(op_code, following, query_id=query_id)
+    query_struct = ConfigQuery(op_code, following)
     return query_perform(query_struct, connection, query_params=prop_values)
diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py
index e134239..50c71bd 100644
--- a/pyignite/api/cluster.py
+++ b/pyignite/api/cluster.py
@@ -20,25 +20,22 @@
 from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE
 
 
-def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
+def cluster_get_state(connection: 'Connection') -> 'APIResult':
     """
     Get cluster state.
 
     :param connection: Connection to use,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a state
      retrieved on success, non-zero status and an error description on failure.
     """
-    return __cluster_get_state(connection, query_id)
+    return __cluster_get_state(connection)
 
 
-async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
+async def cluster_get_state_async(connection: 'AioConnection') -> 'APIResult':
     """
     Async version of cluster_get_state
     """
-    return await __cluster_get_state(connection, query_id)
+    return await __cluster_get_state(connection)
 
 
 def __post_process_get_state(result):
@@ -47,11 +44,11 @@
     return result
 
 
-def __cluster_get_state(connection, query_id):
+def __cluster_get_state(connection):
     if not connection.protocol_context.is_cluster_api_supported():
         raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
 
-    query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
+    query_struct = Query(OP_CLUSTER_GET_STATE)
     return query_perform(
         query_struct, connection,
         response_config=[('state', Byte)],
@@ -59,26 +56,23 @@
     )
 
 
-def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
+def cluster_set_state(connection: 'Connection', state: int) -> 'APIResult':
     """
     Set cluster state.
 
     :param connection: Connection to use,
     :param state: State to set,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if a value
      is written, non-zero status and an error description otherwise.
     """
-    return __cluster_set_state(connection, state, query_id)
+    return __cluster_set_state(connection, state)
 
 
-async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
+async def cluster_set_state_async(connection: 'AioConnection', state: int) -> 'APIResult':
     """
     Async version of cluster_get_state
     """
-    return await __cluster_set_state(connection, state, query_id)
+    return await __cluster_set_state(connection, state)
 
 
 def __post_process_set_state(result):
@@ -87,7 +81,7 @@
     return result
 
 
-def __cluster_set_state(connection, state, query_id):
+def __cluster_set_state(connection, state):
     if not connection.protocol_context.is_cluster_api_supported():
         raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
 
@@ -95,8 +89,7 @@
         OP_CLUSTER_CHANGE_STATE,
         [
             ('state', Byte)
-        ],
-        query_id=query_id
+        ]
     )
     return query_perform(
         query_struct, connection,
diff --git a/pyignite/api/key_value.py b/pyignite/api/key_value.py
index 5038051..5b3f72c 100644
--- a/pyignite/api/key_value.py
+++ b/pyignite/api/key_value.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Any, Iterable, Optional, Union
+from typing import Any, Iterable, Union
 
 from pyignite.connection import AioConnection, Connection
 from pyignite.queries.op_codes import (
@@ -28,12 +28,11 @@
 from pyignite.queries import Query, query_perform
 
 from .result import APIResult
-from ..queries.query import CacheInfo
+from ..queries.cache_info import CacheInfo
 
 
 def cache_put(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
-              key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-              query_id: Optional[int] = None) -> 'APIResult':
+              key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache (overwriting existing value if any).
 
@@ -45,33 +44,28 @@
      should be converted,
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted.
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if a value
      is written, non-zero status and an error description otherwise.
     """
-    return __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return __cache_put(connection, cache_info, key, value, key_hint, value_hint)
 
 
 async def cache_put_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
-                          key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                          query_id: Optional[int] = None) -> 'APIResult':
+                          key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Async version of cache_put
     """
-    return await __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return await __cache_put(connection, cache_info, key, value, key_hint, value_hint)
 
 
-def __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_put(connection, cache_info, key, value, key_hint, value_hint):
     query_struct = Query(
         OP_CACHE_PUT,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -83,8 +77,8 @@
     )
 
 
-def cache_get(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-              query_id: Optional[int] = None) -> 'APIResult':
+def cache_get(connection: 'Connection', cache_info: CacheInfo, key: Any,
+              key_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Retrieves a value from cache by key.
 
@@ -92,32 +86,28 @@
     :param cache_info: cache meta info,
     :param key: key for the cache entry. Can be of any supported type,
     :param key_hint: (optional) Ignite data type, for which the given key
-     should be converted,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
+     should be converted
     :return: API result data object. Contains zero status and a value
      retrieved on success, non-zero status and an error description on failure.
     """
-    return __cache_get(connection, cache_info, key, key_hint, query_id)
+    return __cache_get(connection, cache_info, key, key_hint)
 
 
 async def cache_get_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
-                          key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+                          key_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Async version of cache_get
     """
-    return await __cache_get(connection, cache_info, key, key_hint, query_id)
+    return await __cache_get(connection, cache_info, key, key_hint)
 
 
-def __cache_get(connection, cache_info, key, key_hint, query_id):
+def __cache_get(connection, cache_info, key, key_hint):
     query_struct = Query(
         OP_CACHE_GET,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -132,40 +122,34 @@
     )
 
 
-def cache_get_all(connection: 'Connection', cache_info: CacheInfo, keys: Iterable,
-                  query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_all(connection: 'Connection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Retrieves multiple key-value pairs from cache.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
     :param keys: list of keys or tuples of (key, key_hint),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a dict, made of
      retrieved key-value pairs, non-zero status and an error description
      on failure.
     """
-    return __cache_get_all(connection, cache_info, keys, query_id)
+    return __cache_get_all(connection, cache_info, keys)
 
 
-async def cache_get_all_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable,
-                              query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_all_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Async version of cache_get_all.
     """
-    return await __cache_get_all(connection, cache_info, keys, query_id)
+    return await __cache_get_all(connection, cache_info, keys)
 
 
-def __cache_get_all(connection, cache_info, keys, query_id):
+def __cache_get_all(connection, cache_info, keys):
     query_struct = Query(
         OP_CACHE_GET_ALL,
         [
             ('cache_info', CacheInfo),
             ('keys', AnyDataArray()),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -180,8 +164,7 @@
     )
 
 
-def cache_put_all(connection: 'Connection', cache_info: CacheInfo, pairs: dict,
-                  query_id: Optional[int] = None) -> 'APIResult':
+def cache_put_all(connection: 'Connection', cache_info: CacheInfo, pairs: dict) -> 'APIResult':
     """
     Puts multiple key-value pairs to cache (overwriting existing associations
     if any).
@@ -191,31 +174,26 @@
     :param pairs: dictionary type parameters, contains key-value pairs to save.
      Each key or value can be an item of representable Python type or a tuple
      of (item, hint),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status if key-value pairs
      are written, non-zero status and an error description otherwise.
     """
-    return __cache_put_all(connection, cache_info, pairs, query_id)
+    return __cache_put_all(connection, cache_info, pairs)
 
 
-async def cache_put_all_async(connection: 'AioConnection', cache_info: CacheInfo, pairs: dict,
-                              query_id: Optional[int] = None) -> 'APIResult':
+async def cache_put_all_async(connection: 'AioConnection', cache_info: CacheInfo, pairs: dict) -> 'APIResult':
     """
     Async version of cache_put_all.
     """
-    return await __cache_put_all(connection, cache_info, pairs, query_id)
+    return await __cache_put_all(connection, cache_info, pairs)
 
 
-def __cache_put_all(connection, cache_info, pairs, query_id):
+def __cache_put_all(connection, cache_info, pairs):
     query_struct = Query(
         OP_CACHE_PUT_ALL,
         [
             ('cache_info', CacheInfo),
             ('data', Map),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -226,8 +204,8 @@
     )
 
 
-def cache_contains_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-                       query_id: Optional[int] = None) -> 'APIResult':
+def cache_contains_key(connection: 'Connection', cache_info: CacheInfo, key: Any,
+                       key_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Returns a value indicating whether given key is present in cache.
 
@@ -235,33 +213,29 @@
     :param cache_info: cache meta info,
     :param key: key for the cache entry. Can be of any supported type,
     :param key_hint: (optional) Ignite data type, for which the given key
-     should be converted,
-    :param query_id: a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
+     should be converted
     :return: API result data object. Contains zero status and a bool value
      retrieved on success: `True` when key is present, `False` otherwise,
      non-zero status and an error description on failure.
     """
-    return __cache_contains_key(connection, cache_info, key, key_hint, query_id)
+    return __cache_contains_key(connection, cache_info, key, key_hint)
 
 
 async def cache_contains_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
-                                   key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+                                   key_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Async version of cache_contains_key.
     """
-    return await __cache_contains_key(connection, cache_info, key, key_hint, query_id)
+    return await __cache_contains_key(connection, cache_info, key, key_hint)
 
 
-def __cache_contains_key(connection, cache_info, key, key_hint, query_id):
+def __cache_contains_key(connection, cache_info, key, key_hint):
     query_struct = Query(
         OP_CACHE_CONTAINS_KEY,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -277,39 +251,34 @@
 
 
 def cache_contains_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable,
-                        query_id: Optional[int] = None) -> 'APIResult':
+                        ) -> 'APIResult':
     """
     Returns a value indicating whether all given keys are present in cache.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
     :param keys: a list of keys or (key, type hint) tuples,
-    :param query_id: a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a bool value
      retrieved on success: `True` when all keys are present, `False` otherwise,
      non-zero status and an error description on failure.
     """
-    return __cache_contains_keys(connection, cache_info, keys, query_id)
+    return __cache_contains_keys(connection, cache_info, keys)
 
 
-async def cache_contains_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable,
-                                    query_id: Optional[int] = None) -> 'APIResult':
+async def cache_contains_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Async version of cache_contains_keys.
     """
-    return await __cache_contains_keys(connection, cache_info, keys, query_id)
+    return await __cache_contains_keys(connection, cache_info, keys)
 
 
-def __cache_contains_keys(connection, cache_info, keys, query_id):
+def __cache_contains_keys(connection, cache_info, keys):
     query_struct = Query(
         OP_CACHE_CONTAINS_KEYS,
         [
             ('cache_info', CacheInfo),
             ('keys', AnyDataArray()),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -325,8 +294,7 @@
 
 
 def cache_get_and_put(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
-                      key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                      query_id: Optional[int] = None) -> 'APIResult':
+                      key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache_info, and returns the previous value
     for that key, or null value if there was not such key.
@@ -339,26 +307,24 @@
      should be converted,
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted.
-    :param query_id: a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and an old value
      or None if a value is written, non-zero status and an error description
      in case of error.
     """
-    return __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint)
 
 
-async def cache_get_and_put_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
-                                  key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                                  query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_and_put_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_get_and_put.
     """
-    return await __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return await __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint)
 
 
-def __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint):
     query_struct = Query(
         OP_CACHE_GET_AND_PUT,
         [
@@ -366,7 +332,6 @@
             ('key', key_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
         ],
-        query_id=query_id,
     )
     return query_perform(
         query_struct, connection,
@@ -383,8 +348,7 @@
 
 
 def cache_get_and_replace(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
-                          key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                          query_id: Optional[int] = None) -> 'APIResult':
+                          key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache, returning previous value
     for that key, if and only if there is a value currently mapped
@@ -398,32 +362,29 @@
      should be converted,
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted.
-    :param query_id: a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and an old value
      or None on success, non-zero status and an error description otherwise.
     """
-    return __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id)
+    return __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint)
 
 
-async def cache_get_and_replace_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
-                                      key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                                      query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_and_replace_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_get_and_replace.
     """
-    return await __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id)
+    return await __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint)
 
 
-def __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id):
+def __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint):
     query_struct = Query(
         OP_CACHE_GET_AND_REPLACE, [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -439,8 +400,9 @@
     )
 
 
-def cache_get_and_remove(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-                         query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_and_remove(
+        connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Removes the cache entry with specified key, returning the value.
 
@@ -449,28 +411,24 @@
     :param key: key for the cache entry. Can be of any supported type,
     :param key_hint: (optional) Ignite data type, for which the given key
      should be converted,
-    :param query_id: a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and an old value
      or None, non-zero status and an error description otherwise.
     """
-    return __cache_get_and_remove(connection, cache_info, key, key_hint, query_id)
+    return __cache_get_and_remove(connection, cache_info, key, key_hint)
 
 
 async def cache_get_and_remove_async(
-        connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-        query_id: Optional[int] = None) -> 'APIResult':
-    return await __cache_get_and_remove(connection, cache_info, key, key_hint, query_id)
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
+    return await __cache_get_and_remove(connection, cache_info, key, key_hint)
 
 
-def __cache_get_and_remove(connection, cache_info, key, key_hint, query_id):
+def __cache_get_and_remove(connection, cache_info, key, key_hint):
     query_struct = Query(
         OP_CACHE_GET_AND_REMOVE, [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -486,8 +444,7 @@
 
 
 def cache_put_if_absent(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
-                        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                        query_id: Optional[int] = None) -> 'APIResult':
+                        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache only if the key
     does not already exist.
@@ -500,33 +457,30 @@
      should be converted,
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted.
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
 
 
-async def cache_put_if_absent_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
-                                    key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                                    query_id: Optional[int] = None) -> 'APIResult':
+async def cache_put_if_absent_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_put_if_absent.
     """
-    return await __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return await __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
 
 
-def __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint):
     query_struct = Query(
         OP_CACHE_PUT_IF_ABSENT,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -543,8 +497,7 @@
 
 
 def cache_get_and_put_if_absent(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
-                                key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                                query_id: Optional[int] = None) -> 'APIResult':
+                                key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache only if the key does not
     already exist.
@@ -557,33 +510,30 @@
      should be converted,
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted.
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and an old value
      or None on success, non-zero status and an error description otherwise.
     """
-    return __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
 
 
-async def cache_get_and_put_if_absent_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
-                                            key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                                            query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_and_put_if_absent_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_get_and_put_if_absent.
     """
-    return await __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return await __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint)
 
 
-def __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint):
     query_struct = Query(
         OP_CACHE_GET_AND_PUT_IF_ABSENT,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -600,8 +550,7 @@
 
 
 def cache_replace(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
-                  key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                  query_id: Optional[int] = None) -> 'APIResult':
+                  key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache only if the key already exist.
 
@@ -613,34 +562,31 @@
      should be converted,
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted.
-    :param query_id: a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a boolean
      success code, or non-zero status and an error description if something
      has gone wrong.
     """
-    return __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return __cache_replace(connection, cache_info, key, value, key_hint, value_hint)
 
 
-async def cache_replace_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
-                              key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-                              query_id: Optional[int] = None) -> 'APIResult':
+async def cache_replace_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+        key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_replace.
     """
-    return await __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id)
+    return await __cache_replace(connection, cache_info, key, value, key_hint, value_hint)
 
 
-def __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id):
+def __cache_replace(connection, cache_info, key, value, key_hint, value_hint):
     query_struct = Query(
         OP_CACHE_REPLACE,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -658,7 +604,7 @@
 
 def cache_replace_if_equals(connection: 'Connection', cache_info: CacheInfo, key: Any, sample: Any, value: Any,
                             key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
-                            value_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+                            value_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Puts a value with a given key to cache only if the key already exists
     and value equals provided sample.
@@ -674,29 +620,26 @@
      the given sample should be converted
     :param value_hint: (optional) Ignite data type, for which the given value
      should be converted,
-    :param query_id: (optional) a value generated by client and returned
-     as-is in response.query_id. When the parameter is omitted, a random
-     value is generated,
     :return: API result data object. Contains zero status and a boolean
      success code, or non-zero status and an error description if something
      has gone wrong.
     """
     return __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint,
-                                     sample_hint, value_hint, query_id)
+                                     sample_hint, value_hint)
 
 
 async def cache_replace_if_equals_async(
         connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any, value: Any,
-        key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
-        query_id: Optional[int] = None) -> 'APIResult':
+        key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_replace_if_equals.
     """
     return await __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint,
-                                           sample_hint, value_hint, query_id)
+                                           sample_hint, value_hint)
 
 
-def __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint, sample_hint, value_hint, query_id):
+def __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint, sample_hint, value_hint):
     query_struct = Query(
         OP_CACHE_REPLACE_IF_EQUALS,
         [
@@ -704,8 +647,7 @@
             ('key', key_hint or AnyDataObject),
             ('sample', sample_hint or AnyDataObject),
             ('value', value_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -722,36 +664,31 @@
     )
 
 
-def cache_clear(connection: 'Connection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear(connection: 'Connection', cache_info: CacheInfo) -> 'APIResult':
     """
     Clears the cache without notifying listeners or cache writers.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
-    :param query_id: (optional) a value generated by client and returned
-     as-is in response.query_id. When the parameter is omitted, a random
-     value is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __cache_clear(connection, cache_info, query_id)
+    return __cache_clear(connection, cache_info)
 
 
-async def cache_clear_async(
-        connection: 'AioConnection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_async(connection: 'AioConnection', cache_info: CacheInfo) -> 'APIResult':
     """
     Async version of cache_clear.
     """
-    return await __cache_clear(connection, cache_info, query_id)
+    return await __cache_clear(connection, cache_info)
 
 
-def __cache_clear(connection, cache_info, query_id):
+def __cache_clear(connection, cache_info):
     query_struct = Query(
         OP_CACHE_CLEAR,
         [
             ('cache_info', CacheInfo),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -761,8 +698,9 @@
     )
 
 
-def cache_clear_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-                    query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear_key(
+        connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Clears the cache key without notifying listeners or cache writers.
 
@@ -771,31 +709,28 @@
     :param key:  key for the cache entry,
     :param key_hint: (optional) Ignite data type, for which the given key
      should be converted,
-    :param query_id: (optional) a value generated by client and returned
-     as-is in response.query_id. When the parameter is omitted, a random
-     value is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __cache_clear_key(connection, cache_info, key, key_hint, query_id)
+    return __cache_clear_key(connection, cache_info, key, key_hint)
 
 
-async def cache_clear_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
-                                key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_key_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_clear_key.
     """
-    return await __cache_clear_key(connection, cache_info, key, key_hint, query_id)
+    return await __cache_clear_key(connection, cache_info, key, key_hint)
 
 
-def __cache_clear_key(connection, cache_info, key, key_hint, query_id):
+def __cache_clear_key(connection, cache_info, key, key_hint):
     query_struct = Query(
         OP_CACHE_CLEAR_KEY,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -806,40 +741,33 @@
     )
 
 
-def cache_clear_keys(
-        connection: 'Connection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Clears the cache keys without notifying listeners or cache writers.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
     :param keys: list of keys or tuples of (key, key_hint),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __cache_clear_keys(connection, cache_info, keys, query_id)
+    return __cache_clear_keys(connection, cache_info, keys)
 
 
-async def cache_clear_keys_async(
-        connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None
-) -> 'APIResult':
+async def cache_clear_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Async version of cache_clear_keys.
     """
-    return await __cache_clear_keys(connection, cache_info, keys, query_id)
+    return await __cache_clear_keys(connection, cache_info, keys)
 
 
-def __cache_clear_keys(connection, cache_info, keys, query_id):
+def __cache_clear_keys(connection, cache_info, keys):
     query_struct = Query(
         OP_CACHE_CLEAR_KEYS,
         [
             ('cache_info', CacheInfo),
             ('keys', AnyDataArray()),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -850,8 +778,9 @@
     )
 
 
-def cache_remove_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-                     query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_key(
+        connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Clears the cache key without notifying listeners or cache writers.
 
@@ -859,33 +788,30 @@
     :param cache_info: cache meta info,
     :param key:  key for the cache entry,
     :param key_hint: (optional) Ignite data type, for which the given key
-     should be converted,
-    :param query_id: (optional) a value generated by client and returned
-     as-is in response.query_id. When the parameter is omitted, a random
-     value is generated,
+     should be converted
     :return: API result data object. Contains zero status and a boolean
      success code, or non-zero status and an error description if something
      has gone wrong.
     """
-    return __cache_remove_key(connection, cache_info, key, key_hint, query_id)
+    return __cache_remove_key(connection, cache_info, key, key_hint)
 
 
-async def cache_remove_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
-                                 key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_key_async(
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_remove_key.
     """
-    return await __cache_remove_key(connection, cache_info, key, key_hint, query_id)
+    return await __cache_remove_key(connection, cache_info, key, key_hint)
 
 
-def __cache_remove_key(connection, cache_info, key, key_hint, query_id):
+def __cache_remove_key(connection, cache_info, key, key_hint):
     query_struct = Query(
         OP_CACHE_REMOVE_KEY,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -901,8 +827,7 @@
 
 
 def cache_remove_if_equals(connection: 'Connection', cache_info: CacheInfo, key: Any, sample: Any,
-                           key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
-                           query_id: Optional[int] = None) -> 'APIResult':
+                           key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None) -> 'APIResult':
     """
     Removes an entry with a given key if provided value is equal to
     actual value, notifying listeners and cache writers.
@@ -914,35 +839,32 @@
     :param key_hint: (optional) Ignite data type, for which the given key
      should be converted,
     :param sample_hint: (optional) Ignite data type, for whic
-     the given sample should be converted
-    :param query_id: (optional) a value generated by client and returned
-     as-is in response.query_id. When the parameter is omitted, a random
-     value is generated,
+     the given sample should be converted,
     :return: API result data object. Contains zero status and a boolean
      success code, or non-zero status and an error description if something
      has gone wrong.
     """
-    return __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id)
+    return __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint)
 
 
 async def cache_remove_if_equals_async(
-        connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any, key_hint: 'IgniteDataType' = None,
-        sample_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
+        connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any,
+        key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None
+) -> 'APIResult':
     """
     Async version of cache_remove_if_equals.
     """
-    return await __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id)
+    return await __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint)
 
 
-def __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id):
+def __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint):
     query_struct = Query(
         OP_CACHE_REMOVE_IF_EQUALS,
         [
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('sample', sample_hint or AnyDataObject),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -958,40 +880,33 @@
     )
 
 
-def cache_remove_keys(
-        connection: 'Connection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Removes entries with given keys, notifying listeners and cache writers.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
     :param keys: list of keys or tuples of (key, key_hint),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __cache_remove_keys(connection, cache_info, keys, query_id)
+    return __cache_remove_keys(connection, cache_info, keys)
 
 
-async def cache_remove_keys_async(
-        connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None
-) -> 'APIResult':
+async def cache_remove_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable) -> 'APIResult':
     """
     Async version of cache_remove_keys.
     """
-    return await __cache_remove_keys(connection, cache_info, keys, query_id)
+    return await __cache_remove_keys(connection, cache_info, keys)
 
 
-def __cache_remove_keys(connection, cache_info, keys, query_id):
+def __cache_remove_keys(connection, cache_info, keys):
     query_struct = Query(
         OP_CACHE_REMOVE_KEYS,
         [
             ('cache_info', CacheInfo),
             ('keys', AnyDataArray()),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -1002,36 +917,31 @@
     )
 
 
-def cache_remove_all(connection: 'Connection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_all(connection: 'Connection', cache_info: CacheInfo) -> 'APIResult':
     """
     Removes all entries from cache_info, notifying listeners and cache writers.
 
     :param connection: connection to Ignite server,
     :param cache_info: cache meta info,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __cache_remove_all(connection, cache_info, query_id)
+    return __cache_remove_all(connection, cache_info)
 
 
-async def cache_remove_all_async(
-        connection: 'AioConnection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_all_async(connection: 'AioConnection', cache_info: CacheInfo) -> 'APIResult':
     """
     Async version of cache_remove_all.
     """
-    return await __cache_remove_all(connection, cache_info, query_id)
+    return await __cache_remove_all(connection, cache_info)
 
 
-def __cache_remove_all(connection, cache_info, query_id):
+def __cache_remove_all(connection, cache_info):
     query_struct = Query(
         OP_CACHE_REMOVE_ALL,
         [
             ('cache_info', CacheInfo),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -1041,8 +951,9 @@
     )
 
 
-def cache_get_size(connection: 'Connection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None,
-                   query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_size(
+        connection: 'Connection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None
+) -> 'APIResult':
     """
     Gets the number of entries in cache.
 
@@ -1051,24 +962,20 @@
     :param peek_modes: (optional) limit count to near cache partition
      (PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
      (PeekModes.BACKUP). Defaults to pimary cache partitions (PeekModes.PRIMARY),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a number of
      cache entries on success, non-zero status and an error description
      otherwise.
     """
-    return __cache_get_size(connection, cache_info, peek_modes, query_id)
+    return __cache_get_size(connection, cache_info, peek_modes)
 
 
 async def cache_get_size_async(
-        connection: 'AioConnection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None,
-        query_id: Optional[int] = None
+        connection: 'AioConnection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None
 ) -> 'APIResult':
-    return await __cache_get_size(connection, cache_info, peek_modes, query_id)
+    return await __cache_get_size(connection, cache_info, peek_modes)
 
 
-def __cache_get_size(connection, cache_info, peek_modes, query_id):
+def __cache_get_size(connection, cache_info, peek_modes):
     if peek_modes is None:
         peek_modes = []
     elif not isinstance(peek_modes, (list, tuple)):
@@ -1079,8 +986,7 @@
         [
             ('cache_info', CacheInfo),
             ('peek_modes', ByteArray),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, connection,
@@ -1095,8 +1001,10 @@
     )
 
 
-def cache_local_peek(conn: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-                     peek_modes: Union[int, list, tuple] = None, query_id: Optional[int] = None) -> 'APIResult':
+def cache_local_peek(
+        conn: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+        peek_modes: Union[int, list, tuple] = None
+) -> 'APIResult':
     """
     Peeks at in-memory cached value using default optional peek mode.
 
@@ -1111,26 +1019,23 @@
     :param peek_modes: (optional) limit count to near cache partition
      (PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
      (PeekModes.BACKUP). Defaults to primary cache partitions (PeekModes.PRIMARY),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a peeked value
      (null if not found).
     """
-    return __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id)
+    return __cache_local_peek(conn, cache_info, key, key_hint, peek_modes)
 
 
 async def cache_local_peek_async(
         conn: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
-        peek_modes: Union[int, list, tuple] = None, query_id: Optional[int] = None
+        peek_modes: Union[int, list, tuple] = None,
 ) -> 'APIResult':
     """
     Async version of cache_local_peek.
     """
-    return await __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id)
+    return await __cache_local_peek(conn, cache_info, key, key_hint, peek_modes)
 
 
-def __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id):
+def __cache_local_peek(conn, cache_info, key, key_hint, peek_modes):
     if peek_modes is None:
         peek_modes = []
     elif not isinstance(peek_modes, (list, tuple)):
@@ -1142,8 +1047,7 @@
             ('cache_info', CacheInfo),
             ('key', key_hint or AnyDataObject),
             ('peek_modes', ByteArray),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, conn,
diff --git a/pyignite/api/sql.py b/pyignite/api/sql.py
index 267bc5b..0f41194 100644
--- a/pyignite/api/sql.py
+++ b/pyignite/api/sql.py
@@ -23,12 +23,12 @@
 )
 from pyignite.utils import deprecated
 from .result import APIResult
-from ..queries.query import CacheInfo
+from ..queries.cache_info import CacheInfo
 from ..queries.response import SQLResponse
 
 
-def scan(conn: 'Connection', cache_info: CacheInfo, page_size: int, partitions: int = -1, local: bool = False,
-         query_id: int = None) -> APIResult:
+def scan(conn: 'Connection', cache_info: CacheInfo, page_size: int, partitions: int = -1,
+         local: bool = False) -> APIResult:
     """
     Performs scan query.
 
@@ -39,9 +39,6 @@
      (negative to query entire cache),
     :param local: (optional) pass True if this query should be executed
      on local node only. Defaults to False,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a value
      of type dict with results on success, non-zero status and an error
      description otherwise.
@@ -53,15 +50,15 @@
      * `more`: bool, True if more data is available for subsequent
        ‘scan_cursor_get_page’ calls.
     """
-    return __scan(conn, cache_info, page_size, partitions, local, query_id)
+    return __scan(conn, cache_info, page_size, partitions, local)
 
 
 async def scan_async(conn: 'AioConnection', cache_info: CacheInfo, page_size: int, partitions: int = -1,
-                     local: bool = False, query_id: int = None) -> APIResult:
+                     local: bool = False) -> APIResult:
     """
     Async version of scan.
     """
-    return await __scan(conn, cache_info, page_size, partitions, local, query_id)
+    return await __scan(conn, cache_info, page_size, partitions, local)
 
 
 def __query_result_post_process(result):
@@ -70,7 +67,7 @@
     return result
 
 
-def __scan(conn, cache_info, page_size, partitions, local, query_id):
+def __scan(conn, cache_info, page_size, partitions, local):
     query_struct = Query(
         OP_QUERY_SCAN,
         [
@@ -79,8 +76,7 @@
             ('page_size', Int),
             ('partitions', Int),
             ('local', Bool),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, conn,
@@ -100,16 +96,13 @@
     )
 
 
-def scan_cursor_get_page(conn: 'Connection', cursor: int, query_id: int = None) -> APIResult:
+def scan_cursor_get_page(conn: 'Connection', cursor: int) -> APIResult:
     """
     Fetches the next scan query cursor page by cursor ID that is obtained
     from `scan` function.
 
     :param conn: connection to Ignite server,
     :param cursor: cursor ID,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a value
      of type dict with results on success, non-zero status and an error
      description otherwise.
@@ -120,20 +113,19 @@
      * `more`: bool, True if more data is available for subsequent
        ‘scan_cursor_get_page’ calls.
     """
-    return __scan_cursor_get_page(conn, cursor, query_id)
+    return __scan_cursor_get_page(conn, cursor)
 
 
-async def scan_cursor_get_page_async(conn: 'AioConnection', cursor: int, query_id: int = None) -> APIResult:
-    return await __scan_cursor_get_page(conn, cursor, query_id)
+async def scan_cursor_get_page_async(conn: 'AioConnection', cursor: int) -> APIResult:
+    return await __scan_cursor_get_page(conn, cursor)
 
 
-def __scan_cursor_get_page(conn, cursor, query_id):
+def __scan_cursor_get_page(conn, cursor):
     query_struct = Query(
         OP_QUERY_SCAN_CURSOR_GET_PAGE,
         [
             ('cursor', Long),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, conn,
@@ -154,7 +146,7 @@
     conn: 'Connection', cache_info: CacheInfo,
     table_name: str, query_str: str, page_size: int, query_args=None,
     distributed_joins: bool = False, replicated_only: bool = False,
-    local: bool = False, timeout: int = 0, query_id: int = None
+    local: bool = False, timeout: int = 0
 ) -> APIResult:
     """
     Executes an SQL query over data stored in the cluster. The query returns
@@ -173,9 +165,6 @@
      on local node only. Defaults to False,
     :param timeout: (optional) non-negative timeout value in ms. Zero disables
      timeout (default),
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a value
      of type dict with results on success, non-zero status and an error
      description otherwise.
@@ -203,8 +192,7 @@
             ('replicated_only', Bool),
             ('page_size', Int),
             ('timeout', Long),
-        ],
-        query_id=query_id,
+        ]
     )
     result = query_struct.perform(
         conn,
@@ -232,17 +220,12 @@
 
 @deprecated(version='1.2.0', reason="This API is deprecated and will be removed in the following major release. "
                                     "Use sql_fields instead")
-def sql_cursor_get_page(
-    conn: 'Connection', cursor: int, query_id: int = None,
-) -> APIResult:
+def sql_cursor_get_page(conn: 'Connection', cursor: int) -> APIResult:
     """
     Retrieves the next SQL query cursor page by cursor ID from `sql`.
 
     :param conn: connection to Ignite server,
     :param cursor: cursor ID,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a value
      of type dict with results on success, non-zero status and an error
      description otherwise.
@@ -258,8 +241,7 @@
         OP_QUERY_SQL_CURSOR_GET_PAGE,
         [
             ('cursor', Long),
-        ],
-        query_id=query_id,
+        ]
     )
     result = query_struct.perform(
         conn,
@@ -283,7 +265,7 @@
     local: bool = False, replicated_only: bool = False,
     enforce_join_order: bool = False, collocated: bool = False,
     lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
-    timeout: int = 0, query_id: int = None
+    timeout: int = 0
 ) -> APIResult:
     """
     Performs SQL fields query.
@@ -313,9 +295,6 @@
     :param max_rows: (optional) query-wide maximum of rows.
     :param timeout: (optional) non-negative timeout value in ms. Zero disables
      timeout.
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a value
      of type dict with results on success, non-zero status and an error
      description otherwise.
@@ -329,7 +308,7 @@
     """
     return __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins,
                         local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows,
-                        timeout, query_id)
+                        timeout)
 
 
 async def sql_fields_async(
@@ -339,19 +318,19 @@
         local: bool = False, replicated_only: bool = False,
         enforce_join_order: bool = False, collocated: bool = False,
         lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
-        timeout: int = 0, query_id: int = None
+        timeout: int = 0
 ) -> APIResult:
     """
     Async version of sql_fields.
     """
     return await __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type,
                               distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy,
-                              include_field_names, max_rows, timeout, query_id)
+                              include_field_names, max_rows, timeout)
 
 
 def __sql_fields(
         conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins, local,
-        replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout, query_id
+        replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout
 ):
     if query_args is None:
         query_args = []
@@ -375,7 +354,6 @@
             ('timeout', Long),
             ('include_field_names', Bool),
         ],
-        query_id=query_id,
         response_type=SQLResponse
     )
 
@@ -403,16 +381,13 @@
     )
 
 
-def sql_fields_cursor_get_page(conn: 'Connection', cursor: int, field_count: int, query_id: int = None) -> APIResult:
+def sql_fields_cursor_get_page(conn: 'Connection', cursor: int, field_count: int) -> APIResult:
     """
     Retrieves the next query result page by cursor ID from `sql_fields`.
 
     :param conn: connection to Ignite server,
     :param cursor: cursor ID,
     :param field_count: a number of fields in a row,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status and a value
      of type dict with results on success, non-zero status and an error
      description otherwise.
@@ -423,24 +398,22 @@
      * `more`: bool, True if more data is available for subsequent
        ‘sql_fields_cursor_get_page’ calls.
     """
-    return __sql_fields_cursor_get_page(conn, cursor, field_count, query_id)
+    return __sql_fields_cursor_get_page(conn, cursor, field_count)
 
 
-async def sql_fields_cursor_get_page_async(conn: 'AioConnection', cursor: int, field_count: int,
-                                           query_id: int = None) -> APIResult:
+async def sql_fields_cursor_get_page_async(conn: 'AioConnection', cursor: int, field_count: int) -> APIResult:
     """
     Async version sql_fields_cursor_get_page.
     """
-    return await __sql_fields_cursor_get_page(conn, cursor, field_count, query_id)
+    return await __sql_fields_cursor_get_page(conn, cursor, field_count)
 
 
-def __sql_fields_cursor_get_page(conn, cursor, field_count, query_id):
+def __sql_fields_cursor_get_page(conn, cursor, field_count):
     query_struct = Query(
         OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE,
         [
             ('cursor', Long),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, conn,
@@ -469,32 +442,28 @@
     return result
 
 
-def resource_close(conn: 'Connection', cursor: int, query_id: int = None) -> APIResult:
+def resource_close(conn: 'Connection', cursor: int) -> APIResult:
     """
     Closes a resource, such as query cursor.
 
     :param conn: connection to Ignite server,
     :param cursor: cursor ID,
-    :param query_id: (optional) a value generated by client and returned as-is
-     in response.query_id. When the parameter is omitted, a random value
-     is generated,
     :return: API result data object. Contains zero status on success,
      non-zero status and an error description otherwise.
     """
-    return __resource_close(conn, cursor, query_id)
+    return __resource_close(conn, cursor)
 
 
-async def resource_close_async(conn: 'AioConnection', cursor: int, query_id: int = None) -> APIResult:
-    return await __resource_close(conn, cursor, query_id)
+async def resource_close_async(conn: 'AioConnection', cursor: int) -> APIResult:
+    return await __resource_close(conn, cursor)
 
 
-def __resource_close(conn, cursor, query_id):
+def __resource_close(conn, cursor):
     query_struct = Query(
         OP_RESOURCE_CLOSE,
         [
             ('cursor', Long),
-        ],
-        query_id=query_id,
+        ]
     )
     return query_perform(
         query_struct, conn,
diff --git a/pyignite/api/tx_api.py b/pyignite/api/tx_api.py
new file mode 100644
index 0000000..ee8de07
--- /dev/null
+++ b/pyignite/api/tx_api.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextvars
+
+import attr
+
+from pyignite.datatypes import Byte, String, Long, Int, Bool
+from pyignite.exceptions import CacheError
+from pyignite.queries import Query, query_perform
+from pyignite.queries.op_codes import OP_TX_START, OP_TX_END
+
+__CURRENT_TX = contextvars.ContextVar('current_tx', default=None)
+
+
+def get_tx_id():
+    ctx = __CURRENT_TX.get() if __CURRENT_TX else None
+    return ctx.tx_id if ctx else None
+
+
+def get_tx_connection():
+    ctx = __CURRENT_TX.get() if __CURRENT_TX else None
+    return ctx.conn if ctx else None
+
+
+@attr.s
+class TransactionContext:
+    tx_id = attr.ib(type=int, default=None)
+    conn = attr.ib(default=None)
+
+
+def tx_start(conn, concurrency, isolation, timeout: int = 0, label: str = None):
+    result = __tx_start(conn, concurrency, isolation, timeout, label)
+    return __tx_start_post_process(result, conn)
+
+
+async def tx_start_async(conn, concurrency, isolation, timeout: int = 0, label: str = None):
+    result = await __tx_start(conn, concurrency, isolation, timeout, label)
+    return __tx_start_post_process(result, conn)
+
+
+def __tx_start(conn, concurrency, isolation, timeout, label):
+    query_struct = Query(
+        OP_TX_START,
+        [
+            ('concurrency', Byte),
+            ('isolation', Byte),
+            ('timeout', Long),
+            ('label', String)
+        ]
+    )
+    return query_perform(
+        query_struct, conn,
+        query_params={
+            'concurrency': concurrency,
+            'isolation': isolation,
+            'timeout': timeout,
+            'label': label
+        },
+        response_config=[
+            ('tx_id', Int)
+        ]
+    )
+
+
+def tx_end(tx_id, committed):
+    ctx = __CURRENT_TX.get()
+
+    if not ctx or ctx.tx_id != tx_id:
+        raise CacheError("Cannot commit transaction from different thread or coroutine")
+
+    try:
+        return __tx_end(ctx.conn, tx_id, committed)
+    finally:
+        __CURRENT_TX.set(None)
+
+
+async def tx_end_async(tx_id, committed):
+    ctx = __CURRENT_TX.get()
+
+    if not ctx or ctx.tx_id != tx_id:
+        raise CacheError("Cannot commit transaction from different thread or coroutine")
+
+    try:
+        return await __tx_end(ctx.conn, tx_id, committed)
+    finally:
+        __CURRENT_TX.set(None)
+
+
+def __tx_end(conn, tx_id, committed):
+    query_struct = Query(
+        OP_TX_END,
+        [
+            ('tx_id', Int),
+            ('committed', Bool)
+        ],
+    )
+    return query_perform(
+        query_struct, conn,
+        query_params={
+            'tx_id': tx_id,
+            'committed': committed
+        }
+    )
+
+
+def __tx_start_post_process(result, conn):
+    if result.status == 0:
+        tx_id = result.value['tx_id']
+        __CURRENT_TX.set(TransactionContext(tx_id, conn))
+        result.value = tx_id
+    return result
diff --git a/pyignite/cache.py b/pyignite/cache.py
index c0aaaec..79fa0f5 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -15,10 +15,11 @@
 
 from typing import Any, Iterable, Optional, Tuple, Union
 
+from .api.tx_api import get_tx_connection
 from .datatypes import prop_codes, ExpiryPolicy
 from .datatypes.internal import AnyDataObject
 from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError, NotSupportedByClusterError
-from .queries.query import CacheInfo
+from .queries.cache_info import CacheInfo
 from .utils import cache_id, status_to_exception
 from .api.cache_config import (
     cache_create, cache_create_with_config, cache_get_or_create, cache_get_or_create_with_config, cache_destroy,
@@ -177,7 +178,8 @@
         super().__init__(client, name, expiry_policy)
 
     def _get_best_node(self, key=None, key_hint=None):
-        return self.client.get_best_node(self, key, key_hint)
+        tx_conn = get_tx_connection()
+        return tx_conn if tx_conn else self.client.get_best_node(self, key, key_hint)
 
     @property
     def settings(self) -> Optional[dict]:
diff --git a/pyignite/client.py b/pyignite/client.py
index 01ee373..b411a2b 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -44,7 +44,7 @@
 import random
 import re
 from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict
+from typing import Iterable, Type, Union, Any, Dict, Optional
 
 from .api import cache_get_node_partitions
 from .api.binary import get_binary_type, put_binary_type
@@ -54,12 +54,13 @@
 from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
 from .connection import Connection
 from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER, AFFINITY_RETRIES, AFFINITY_DELAY
-from .datatypes import BinaryObject, AnyDataObject
+from .datatypes import BinaryObject, AnyDataObject, TransactionConcurrency, TransactionIsolation
 from .datatypes.base import IgniteDataType
 from .datatypes.internal import tc_map
 from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
-from .queries.query import CacheInfo
+from .queries.cache_info import CacheInfo
 from .stream import BinaryStream, READ_BACKWARD
+from .transaction import Transaction
 from .utils import (
     cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable,
     get_field_by_id, unsigned
@@ -734,9 +735,9 @@
         elif isinstance(cache, Cache):
             c_info = cache.cache_info
         else:
-            c_info = None
+            c_info = CacheInfo(protocol_context=self.protocol_context)
 
-        if c_info:
+        if c_info.cache_id:
             schema = None
 
         return SqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
@@ -750,3 +751,19 @@
         :return: :py:class:`~pyignite.cluster.Cluster` instance.
         """
         return Cluster(self)
+
+    def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
+                 isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
+                 timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction':
+        """
+        Start thin client transaction.
+
+        :param concurrency: (optional) transaction concurrency, see
+                :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
+        :param isolation: (optional) transaction isolation level, see
+                :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
+        :param timeout: (optional) transaction timeout in seconds if float, in millis if int
+        :param label: (optional) transaction label.
+        :return: :py:class:`~pyignite.transaction.Transaction` instance.
+        """
+        return Transaction(self, concurrency, isolation, timeout, label)
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index 020f8d4..86993ba 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -29,9 +29,7 @@
 # limitations under the License.
 
 import asyncio
-from asyncio import Lock
 from collections import OrderedDict
-from io import BytesIO
 from typing import Union
 
 from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
@@ -39,10 +37,66 @@
 from .bitmask_feature import BitmaskFeature
 from .connection import BaseConnection
 
-from .handshake import HandshakeRequest, HandshakeResponse
+from .handshake import HandshakeRequest, HandshakeResponse, OP_HANDSHAKE
 from .protocol_context import ProtocolContext
 from .ssl import create_ssl_context
-from ..stream import AioBinaryStream
+from ..stream.binary_stream import BinaryStreamBase
+
+
+class BaseProtocol(asyncio.Protocol):
+    def __init__(self, conn, handshake_fut):
+        super().__init__()
+        self._buffer = bytearray()
+        self._conn = conn
+        self._handshake_fut = handshake_fut
+
+    def connection_lost(self, exc):
+        self.__process_connection_error(exc if exc else SocketError("Connection closed"))
+
+    def connection_made(self, transport: asyncio.WriteTransport) -> None:
+        try:
+            self.__send_handshake(transport, self._conn)
+        except Exception as e:
+            self._handshake_fut.set_exception(e)
+
+    def data_received(self, data: bytes) -> None:
+        self._buffer += data
+        while self.__has_full_response():
+            packet_sz = self.__packet_size(self._buffer)
+            packet = self._buffer[0:packet_sz]
+            if not self._handshake_fut.done():
+                hs_response = self.__parse_handshake(packet, self._conn.client)
+                self._handshake_fut.set_result(hs_response)
+            else:
+                self._conn.on_message(packet)
+            self._buffer = self._buffer[packet_sz:len(self._buffer)]
+
+    def __has_full_response(self):
+        if len(self._buffer) > 4:
+            response_len = int.from_bytes(self._buffer[0:4], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
+            return response_len + 4 <= len(self._buffer)
+
+    @staticmethod
+    def __packet_size(buffer):
+        return int.from_bytes(buffer[0:4], byteorder=PROTOCOL_BYTE_ORDER, signed=True) + 4
+
+    def __process_connection_error(self, exc):
+        connected = self._handshake_fut.done()
+        if not connected:
+            self._handshake_fut.set_exception(exc)
+        self._conn.on_connection_lost(exc, connected)
+
+    @staticmethod
+    def __send_handshake(transport, conn):
+        hs_request = HandshakeRequest(conn.protocol_context, conn.username, conn.password)
+        with BinaryStreamBase(client=conn.client) as stream:
+            hs_request.from_python(stream)
+            transport.write(stream.getvalue())
+
+    @staticmethod
+    def __parse_handshake(data, client):
+        with BinaryStreamBase(client, data) as stream:
+            return HandshakeResponse.parse(stream, client.protocol_context)
 
 
 class AioConnection(BaseConnection):
@@ -94,21 +148,22 @@
         :param password: (optional) password to authenticate to Ignite cluster.
         """
         super().__init__(client, host, port, username, password, **ssl_params)
-        self._mux = Lock()
-        self._reader = None
-        self._writer = None
+        self._pending_reqs = {}
+        self._transport = None
+        self._loop = asyncio.get_event_loop()
+        self._closed = False
 
     @property
     def closed(self) -> bool:
         """ Tells if socket is closed. """
-        return self._writer is None
+        return self._closed or not self._transport or self._transport.is_closing()
 
     async def connect(self) -> Union[dict, OrderedDict]:
         """
         Connect to the given server node with protocol version fallback.
         """
-        async with self._mux:
-            return await self._connect()
+        self._closed = False
+        return await self._connect()
 
     async def _connect(self) -> Union[dict, OrderedDict]:
         detecting_protocol = False
@@ -139,6 +194,20 @@
         self.failed = False
         return result
 
+    def on_connection_lost(self, error, reconnect=False):
+        self.failed = True
+        for _, fut in self._pending_reqs.items():
+            fut.set_exception(error)
+        self._pending_reqs.clear()
+        if reconnect and not self._closed:
+            self._loop.create_task(self._reconnect())
+
+    def on_message(self, data):
+        req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
+        if req_id in self._pending_reqs:
+            self._pending_reqs[req_id].set_result(data)
+            del self._pending_reqs[req_id]
+
     async def _connect_version(self) -> Union[dict, OrderedDict]:
         """
         Connect to the given server node using protocol version
@@ -146,122 +215,56 @@
         """
 
         ssl_context = create_ssl_context(self.ssl_params)
-        self._reader, self._writer = await asyncio.open_connection(self.host, self.port, ssl=ssl_context)
+        handshake_fut = self._loop.create_future()
+        self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut),
+                                                                host=self.host, port=self.port, ssl=ssl_context)
+        hs_response = await handshake_fut
 
-        protocol_context = self.client.protocol_context
+        if hs_response.op_code == 0:
+            self._close_transport()
+            self._process_handshake_error(hs_response)
 
-        hs_request = HandshakeRequest(
-            protocol_context,
-            self.username,
-            self.password
-        )
-
-        with AioBinaryStream(self.client) as stream:
-            await hs_request.from_python_async(stream)
-            await self._send(stream.getvalue(), reconnect=False)
-
-        with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
-            hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
-
-            if hs_response.op_code == 0:
-                self._close()
-                self._process_handshake_error(hs_response)
-
-            return hs_response
+        return hs_response
 
     async def reconnect(self):
-        async with self._mux:
-            await self._reconnect()
+        await self._reconnect()
 
     async def _reconnect(self):
         if self.alive:
             return
 
-        self._close()
-
+        self._close_transport()
         # connect and silence the connection errors
         try:
             await self._connect()
         except connection_errors:
             pass
 
-    async def request(self, data: Union[bytes, bytearray]) -> bytearray:
+    async def request(self, query_id, data: Union[bytes, bytearray]) -> bytearray:
         """
         Perform request.
-
+        :param query_id: id of query.
         :param data: bytes to send.
         """
-        async with self._mux:
-            await self._send(data)
-            return await self._recv()
-
-    async def _send(self, data: Union[bytes, bytearray], reconnect=True):
-        if self.closed:
+        if not self.alive:
             raise SocketError('Attempt to use closed connection.')
 
-        try:
-            self._writer.write(data)
-            await self._writer.drain()
-        except connection_errors:
-            self.failed = True
-            if reconnect:
-                await self._reconnect()
-            raise
+        return await self._send(query_id, data)
 
-    async def _recv(self, reconnect=True) -> bytearray:
-        if self.closed:
-            raise SocketError('Attempt to use closed connection.')
-
-        data = bytearray(1024)
-        buffer = memoryview(data)
-        bytes_total_received, bytes_to_receive = 0, 0
-        while True:
-            try:
-                chunk = await self._reader.read(len(buffer))
-                bytes_received = len(chunk)
-                if bytes_received == 0:
-                    raise SocketError('Connection broken.')
-
-                buffer[0:bytes_received] = chunk
-                bytes_total_received += bytes_received
-            except connection_errors:
-                self.failed = True
-                if reconnect:
-                    await self._reconnect()
-                raise
-
-            if bytes_total_received < 4:
-                continue
-            elif bytes_to_receive == 0:
-                response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
-                bytes_to_receive = response_len
-
-                if response_len + 4 > len(data):
-                    buffer.release()
-                    data.extend(bytearray(response_len + 4 - len(data)))
-                    buffer = memoryview(data)[bytes_total_received:]
-                    continue
-
-            if bytes_total_received >= bytes_to_receive:
-                buffer.release()
-                break
-
-            buffer = buffer[bytes_received:]
-
-        return data
+    async def _send(self, query_id, data):
+        fut = self._loop.create_future()
+        self._pending_reqs[query_id] = fut
+        self._transport.write(data)
+        return await fut
 
     async def close(self):
-        async with self._mux:
-            self._close()
+        self._closed = True
+        self._close_transport()
 
-    def _close(self):
+    def _close_transport(self):
         """
         Close connection.
         """
-        if self._writer:
-            try:
-                self._writer.close()
-            except connection_errors:
-                pass
-
-            self._writer, self._reader = None, None
+        if self._transport:
+            self._transport.close()
+            self._transport = None
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
index be23e56..0f43aa4 100644
--- a/pyignite/connection/protocol_context.py
+++ b/pyignite/connection/protocol_context.py
@@ -87,6 +87,12 @@
         """
         return self.version >= (1, 4, 0)
 
+    def is_transactions_supported(self) -> bool:
+        """
+        Check whether transactions supported by the current protocol.
+        """
+        return self.version >= (1, 6, 0)
+
     def is_feature_flags_supported(self) -> bool:
         """
         Check whether feature flags supported by the current protocol.
diff --git a/pyignite/datatypes/__init__.py b/pyignite/datatypes/__init__.py
index 4f78dce..0ebe56a 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/datatypes/__init__.py
@@ -27,3 +27,4 @@
 from .standard import *
 from .cluster_state import ClusterState
 from .expiry_policy import ExpiryPolicy
+from .transactions import TransactionIsolation, TransactionConcurrency
diff --git a/pyignite/datatypes/cache_config.py b/pyignite/datatypes/cache_config.py
index a2b4322..4ac28e4 100644
--- a/pyignite/datatypes/cache_config.py
+++ b/pyignite/datatypes/cache_config.py
@@ -21,6 +21,7 @@
 __all__ = [
     'get_cache_config_struct', 'CacheMode', 'PartitionLossPolicy',
     'RebalanceMode', 'WriteSynchronizationMode', 'IndexType',
+    'CacheAtomicityMode'
 ]
 
 
diff --git a/pyignite/datatypes/transactions.py b/pyignite/datatypes/transactions.py
new file mode 100644
index 0000000..83e6c06
--- /dev/null
+++ b/pyignite/datatypes/transactions.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from enum import IntEnum
+
+
+class TransactionConcurrency(IntEnum):
+    """
+    Defines different cache transaction concurrency control.
+    """
+
+    #: Optimistic concurrency control.
+    OPTIMISTIC = 0
+
+    #: Pessimistic concurrency control.
+    PESSIMISTIC = 1
+
+
+class TransactionIsolation(IntEnum):
+    """
+    Defines different cache transaction isolation levels.
+    """
+
+    #: Read committed isolation level.Read committed isolation level.
+    READ_COMMITTED = 0
+
+    #: Repeatable read isolation level.
+    REPEATABLE_READ = 1
+
+    #: Serializable isolation level.
+    SERIALIZABLE = 2
diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py
index 215ccd0..fdf1261 100644
--- a/pyignite/exceptions.py
+++ b/pyignite/exceptions.py
@@ -103,10 +103,18 @@
 
 class NotSupportedByClusterError(Exception):
     """
-    This exception is raised, whenever cluster is not supported specific
+    This exception is raised, whenever cluster does not supported specific
     operation probably because it is outdated.
     """
     pass
 
 
+class NotSupportedError(Exception):
+    """
+    This exception is raised, whenever client does not support specific
+    operation.
+    """
+    pass
+
+
 connection_errors = (IOError, OSError, EOFError)
diff --git a/pyignite/queries/cache_info.py b/pyignite/queries/cache_info.py
new file mode 100644
index 0000000..6caf3ce
--- /dev/null
+++ b/pyignite/queries/cache_info.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import attr
+
+from pyignite.api.tx_api import get_tx_id
+from pyignite.connection.protocol_context import ProtocolContext
+from pyignite.constants import PROTOCOL_BYTE_ORDER
+from pyignite.datatypes import ExpiryPolicy
+from pyignite.exceptions import NotSupportedByClusterError
+
+
+@attr.s
+class CacheInfo:
+    cache_id = attr.ib(kw_only=True, type=int, default=0)
+    expiry_policy = attr.ib(kw_only=True, type=ExpiryPolicy, default=None)
+    protocol_context = attr.ib(kw_only=True, type=ProtocolContext)
+
+    TRANSACTIONS_MASK = 0x02
+    EXPIRY_POLICY_MASK = 0x04
+
+    @classmethod
+    async def from_python_async(cls, stream, value):
+        return cls.from_python(stream, value)
+
+    @classmethod
+    def from_python(cls, stream, value):
+        cache_id = value.cache_id if value else 0
+        expiry_policy = value.expiry_policy if value else None
+        flags = 0
+
+        stream.write(cache_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
+
+        if expiry_policy:
+            if not value.protocol_context.is_expiry_policy_supported():
+                raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
+            flags |= cls.EXPIRY_POLICY_MASK
+
+        tx_id = get_tx_id()
+        if value.protocol_context.is_transactions_supported() and tx_id:
+            flags |= cls.TRANSACTIONS_MASK
+
+        stream.write(flags.to_bytes(1, byteorder=PROTOCOL_BYTE_ORDER))
+
+        if expiry_policy:
+            ExpiryPolicy.write_policy(stream, expiry_policy)
+
+        if flags & cls.TRANSACTIONS_MASK:
+            stream.write(tx_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py
index c152f7c..cf19b11 100644
--- a/pyignite/queries/op_codes.py
+++ b/pyignite/queries/op_codes.py
@@ -66,5 +66,8 @@
 OP_GET_BINARY_TYPE = 3002
 OP_PUT_BINARY_TYPE = 3003
 
+OP_TX_START = 4000
+OP_TX_END = 4001
+
 OP_CLUSTER_GET_STATE = 5000
 OP_CLUSTER_CHANGE_STATE = 5001
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index d971eef..4bcab9f 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -15,16 +15,12 @@
 
 import ctypes
 from io import SEEK_CUR
-from random import randint
 
 import attr
 
 from pyignite.api.result import APIResult
 from pyignite.connection import Connection, AioConnection
-from pyignite.connection.protocol_context import ProtocolContext
-from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED, PROTOCOL_BYTE_ORDER
-from pyignite.datatypes import ExpiryPolicy
-from pyignite.exceptions import NotSupportedByClusterError
+from pyignite.constants import MAX_LONG, RHF_TOPOLOGY_CHANGED
 from pyignite.queries.response import Response
 from pyignite.stream import AioBinaryStream, BinaryStream, READ_BACKWARD
 
@@ -47,42 +43,29 @@
     return _internal()
 
 
-@attr.s
-class CacheInfo:
-    cache_id = attr.ib(kw_only=True, type=int)
-    expiry_policy = attr.ib(kw_only=True, type=ExpiryPolicy, default=None)
-    protocol_context = attr.ib(kw_only=True, type=ProtocolContext)
+_QUERY_COUNTER = 0
 
-    @classmethod
-    async def from_python_async(cls, stream, value):
-        return cls.from_python(stream, value)
 
-    @classmethod
-    def from_python(cls, stream, value):
-        cache_id = value.cache_id if value else 0
-        expiry_policy = value.expiry_policy if value else None
-        flags = 0
-
-        stream.write(cache_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
-
-        if expiry_policy:
-            if not value.protocol_context.is_expiry_policy_supported():
-                raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
-            flags |= 0x04
-
-        stream.write(flags.to_bytes(1, byteorder=PROTOCOL_BYTE_ORDER))
-        if expiry_policy:
-            ExpiryPolicy.write_policy(stream, expiry_policy)
+def _get_query_id():
+    global _QUERY_COUNTER
+    if _QUERY_COUNTER >= MAX_LONG:
+        return 0
+    _QUERY_COUNTER += 1
+    return _QUERY_COUNTER
 
 
 @attr.s
 class Query:
     op_code = attr.ib(type=int)
     following = attr.ib(type=list, factory=list)
-    query_id = attr.ib(type=int, default=None)
+    query_id = attr.ib(type=int)
     response_type = attr.ib(type=type(Response), default=Response)
     _query_c_type = None
 
+    @query_id.default
+    def _set_query_id(self):
+        return _get_query_id()
+
     @classmethod
     def build_c_type(cls):
         if cls._query_c_type is None:
@@ -119,14 +102,14 @@
         self.__write_header(stream, header, init_pos)
 
     def _build_header(self, stream):
+        global _QUERY_COUNTER
         header_class = self.build_c_type()
         header_len = ctypes.sizeof(header_class)
         stream.seek(header_len, SEEK_CUR)
 
         header = header_class()
         header.op_code = self.op_code
-        if self.query_id is None:
-            header.query_id = randint(MIN_LONG, MAX_LONG)
+        header.query_id = self.query_id
 
         return header
 
@@ -185,7 +168,7 @@
         """
         with AioBinaryStream(conn.client) as stream:
             await self.from_python_async(stream, query_params)
-            data = await conn.request(stream.getvalue())
+            data = await conn.request(self.query_id, stream.getvalue())
 
         response_struct = self.response_type(protocol_context=conn.protocol_context,
                                              following=response_config, **kwargs)
diff --git a/pyignite/transaction.py b/pyignite/transaction.py
new file mode 100644
index 0000000..5bafa6b
--- /dev/null
+++ b/pyignite/transaction.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+from typing import Union
+
+from pyignite.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async
+from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
+from pyignite.exceptions import CacheError
+from pyignite.utils import status_to_exception
+
+
+def _convert_to_millis(timeout: Union[int, float]) -> int:
+    if isinstance(timeout, float):
+        return math.floor(timeout * 1000)
+    return timeout
+
+
+class Transaction:
+    """
+    Thin client transaction.
+    """
+    def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
+                 isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
+        self.client, self.concurrency = client, concurrency
+        self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
+        self.label, self.closed = label, False
+        self.tx_id = self.__start_tx()
+
+    def commit(self) -> None:
+        """
+        Commit transaction.
+        """
+        if not self.closed:
+            self.closed = True
+            return self.__end_tx(True)
+
+    def rollback(self) -> None:
+        """
+        Rollback transaction.
+        """
+        self.close()
+
+    def close(self) -> None:
+        """
+        Close transaction.
+        """
+        if not self.closed:
+            self.closed = True
+            return self.__end_tx(False)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    @status_to_exception(CacheError)
+    def __start_tx(self):
+        conn = self.client.random_node
+        return tx_start(conn, self.concurrency, self.isolation, self.timeout, self.label)
+
+    @status_to_exception(CacheError)
+    def __end_tx(self, committed):
+        return tx_end(self.tx_id, committed)
+
+
+class AioTransaction:
+    """
+    Async thin client transaction.
+    """
+    def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
+                 isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
+        self.client, self.concurrency = client, concurrency
+        self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
+        self.label, self.closed = label, False
+
+    def __await__(self):
+        return (yield from self.__aenter__().__await__())
+
+    async def commit(self) -> None:
+        """
+        Commit transaction.
+        """
+        if not self.closed:
+            self.closed = True
+            return await self.__end_tx(True)
+
+    async def rollback(self) -> None:
+        """
+        Rollback transaction.
+        """
+        await self.close()
+
+    async def close(self) -> None:
+        """
+        Close transaction.
+        """
+        if not self.closed:
+            self.closed = True
+            return await self.__end_tx(False)
+
+    async def __aenter__(self):
+        self.tx_id = await self.__start_tx()
+        self.closed = False
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.close()
+
+    @status_to_exception(CacheError)
+    async def __start_tx(self):
+        conn = await self.client.random_node()
+        return await tx_start_async(conn, self.concurrency, self.isolation, self.timeout, self.label)
+
+    @status_to_exception(CacheError)
+    async def __end_tx(self, committed):
+        return await tx_end_async(self.tx_id, committed)
diff --git a/requirements/install.txt b/requirements/install.txt
index feb4eb6..aa8290f 100644
--- a/requirements/install.txt
+++ b/requirements/install.txt
@@ -1,3 +1,4 @@
 # these pip packages are necessary for the pyignite to run
 
 attrs>=20.3.0
+contextvars>=2.4;python_version<"3.7"
diff --git a/tests/affinity/conftest.py b/tests/affinity/conftest.py
index da645c1..eca31b2 100644
--- a/tests/affinity/conftest.py
+++ b/tests/affinity/conftest.py
@@ -54,7 +54,7 @@
 
 
 @pytest.fixture
-async def async_client(connection_param):
+async def async_client(connection_param, event_loop):
     client = AioClient(partition_aware=True)
     try:
         await client.connect(connection_param)
diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py
index 90c71b2..0d0ec24 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -52,7 +52,7 @@
 
 async def patched_send_async(self, *args, **kwargs):
     """Patched send function that push to queue idx of server to which request is routed."""
-    buf = args[0]
+    buf = args[1]
     if buf and len(buf) >= 6:
         op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
         # Filter only caches operation.
@@ -229,7 +229,7 @@
 
 
 @pytest.fixture
-async def async_client_routed():
+async def async_client_routed(event_loop):
     client = AioClient(partition_aware=True)
     try:
         await client.connect(client_routed_connection_string)
diff --git a/tests/affinity/test_affinity_single_connection.py b/tests/affinity/test_affinity_single_connection.py
index c3d2473..c679bdd 100644
--- a/tests/affinity/test_affinity_single_connection.py
+++ b/tests/affinity/test_affinity_single_connection.py
@@ -29,7 +29,7 @@
 
 
 @pytest.fixture
-async def async_client():
+async def async_client(event_loop):
     client = AioClient(partition_aware=True)
     try:
         await client.connect('127.0.0.1', 10801)
diff --git a/tests/common/test_transactions.py b/tests/common/test_transactions.py
new file mode 100644
index 0000000..0cfa46a
--- /dev/null
+++ b/tests/common/test_transactions.py
@@ -0,0 +1,231 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import itertools
+import sys
+import time
+
+import pytest
+
+from pyignite import AioClient, Client
+from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
+from pyignite.datatypes.cache_config import CacheAtomicityMode
+from pyignite.datatypes.prop_codes import PROP_NAME, PROP_CACHE_ATOMICITY_MODE
+from pyignite.exceptions import CacheError
+
+
+@pytest.fixture
+def connection_param():
+    return [('127.0.0.1', 10800 + i) for i in range(1, 4)]
+
+
+@pytest.fixture(params=['with-partition-awareness', 'without-partition-awareness'])
+async def async_client(request, connection_param, event_loop):
+    client = AioClient(partition_aware=request.param == 'with-partition-awareness')
+    try:
+        await client.connect(connection_param)
+        if not client.protocol_context.is_transactions_supported():
+            pytest.skip(f'skipped {request.node.name}, transaction api is not supported.')
+        elif sys.version_info < (3, 7):
+            pytest.skip(f'skipped {request.node.name}, transaction api is not supported'
+                        f'for async client on python {sys.version}')
+        else:
+            yield client
+    finally:
+        await client.close()
+
+
+@pytest.fixture(params=['with-partition-awareness', 'without-partition-awareness'])
+def client(request, connection_param):
+    client = Client(partition_aware=request.param == 'with-partition-awareness')
+    try:
+        client.connect(connection_param)
+        if not client.protocol_context.is_transactions_supported():
+            pytest.skip(f'skipped {request.node.name}, transaction api is not supported.')
+        else:
+            yield client
+    finally:
+        client.close()
+
+
+@pytest.fixture
+def tx_cache(client):
+    cache = client.get_or_create_cache({
+        PROP_NAME: 'tx_cache',
+        PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL
+    })
+    yield cache
+    cache.destroy()
+
+
+@pytest.fixture
+async def async_tx_cache(async_client):
+    cache = await async_client.get_or_create_cache({
+        PROP_NAME: 'tx_cache',
+        PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL
+    })
+    yield cache
+    await cache.destroy()
+
+
+@pytest.mark.parametrize(
+    ['iso_level', 'concurrency'],
+    itertools.product(
+        [iso_level for iso_level in TransactionIsolation],
+        [concurrency for concurrency in TransactionConcurrency]
+    )
+)
+def test_simple_transaction(client, tx_cache, iso_level, concurrency):
+    with client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+        tx_cache.put(1, 1)
+        tx.commit()
+
+    assert tx_cache.get(1) == 1
+
+    with client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+        tx_cache.put(1, 10)
+        tx.rollback()
+
+    assert tx_cache.get(1) == 1
+
+    with client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+        tx_cache.put(1, 10)
+
+    assert tx_cache.get(1) == 1
+
+
+@pytest.mark.parametrize(
+    ['iso_level', 'concurrency'],
+    itertools.product(
+        [iso_level for iso_level in TransactionIsolation],
+        [concurrency for concurrency in TransactionConcurrency]
+    )
+)
+@pytest.mark.asyncio
+async def test_simple_transaction_async(async_client, async_tx_cache, iso_level, concurrency):
+    async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+        await async_tx_cache.put(1, 1)
+        await tx.commit()
+
+    assert await async_tx_cache.get(1) == 1
+
+    async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+        await async_tx_cache.put(1, 10)
+        await tx.rollback()
+
+    assert await async_tx_cache.get(1) == 1
+
+    async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+        async_tx_cache.put(1, 10)
+
+    assert await async_tx_cache.get(1) == 1
+
+
+def test_transactions_timeout(client, tx_cache):
+    with client.tx_start(timeout=2.0, label='tx-sync') as tx:
+        tx_cache.put(1, 1)
+        time.sleep(3.0)
+        with pytest.raises(CacheError) as to_error:
+            tx.commit()
+            assert 'tx-sync' in str(to_error) and 'timed out' in str(to_error)
+
+
+@pytest.mark.asyncio
+async def test_transactions_timeout_async(async_client, async_tx_cache):
+    async def update(i, timeout):
+        async with async_client.tx_start(
+                label=f'tx-{i}', timeout=timeout, isolation=TransactionIsolation.READ_COMMITTED,
+                concurrency=TransactionConcurrency.PESSIMISTIC
+        ) as tx:
+            k1, k2 = (1, 2) if i % 2 == 0 else (2, 1)
+            v = f'value-{i}'
+
+            await async_tx_cache.put(k1, v)
+            await async_tx_cache.put(k2, v)
+
+            await tx.commit()
+
+    task = asyncio.gather(*[update(i, 2.0) for i in range(20)], return_exceptions=True)
+    await asyncio.sleep(5.0)
+    assert task.done()  # Check that all transactions completed or rolled-back on timeout
+    for i, ex in enumerate(task.result()):
+        if ex:
+            assert 'TransactionTimeoutException' in str(ex) or \
+                'Cache transaction timed out'  # check that transaction was rolled back.
+            assert f'tx-{i}' in str(ex)  # check that tx label presents in error
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize('iso_level', [iso_level for iso_level in TransactionIsolation])
+async def test_concurrent_pessimistic_transactions_same_key(async_client, async_tx_cache, iso_level):
+    async def update(i):
+        async with async_client.tx_start(
+                label=f'tx_lbl_{i}', isolation=iso_level, concurrency=TransactionConcurrency.PESSIMISTIC
+        ) as tx:
+            await async_tx_cache.put(1, f'test-{i}')
+            await tx.commit()
+
+    res = await asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
+    assert not any(res)  # Checks that all transactions proceeds
+
+
+@pytest.mark.asyncio
+async def test_concurrent_optimistic_transactions_no_deadlock(async_client, async_tx_cache, event_loop):
+    """
+    Check that optimistic transactions are deadlock safe.
+    """
+    async def update(i):
+        async with async_client.tx_start(
+                label=f'tx-{i}', isolation=TransactionIsolation.SERIALIZABLE,
+                concurrency=TransactionConcurrency.OPTIMISTIC
+        ) as tx:
+            k1, k2 = (1, 2) if i % 2 == 0 else (2, 1)
+            v = f'value-{i}'
+
+            await async_tx_cache.put(k1, v)
+            await async_tx_cache.put(k2, v)
+
+            await tx.commit()
+
+    task = asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
+    await asyncio.sleep(2.0)
+    assert task.done()  # Check that there are not any deadlock.
+    assert not all(task.result())  # Check that some (or all) transactions proceeds.
+    for i, ex in enumerate(task.result()):
+        if ex:
+            assert 'lock conflict' in str(ex)  # check optimistic prepare phase failed
+            assert f'tx-{i}' in str(ex)  # check that tx label presents in error
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ['iso_level', 'concurrency'],
+    itertools.product(
+        [iso_level for iso_level in TransactionIsolation],
+        [concurrency for concurrency in TransactionConcurrency]
+    )
+)
+async def test_concurrent_transactions(async_client, async_tx_cache, iso_level, concurrency):
+    async def update(i):
+        async with async_client.tx_start(isolation=iso_level, concurrency=concurrency) as tx:
+            await async_tx_cache.put(i, f'test-{i}')
+            if i % 2 == 0:
+                await tx.commit()
+            else:
+                await tx.rollback()
+
+    await asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
+    assert await async_tx_cache.get_all(list(range(20))) == {i: f'test-{i}' for i in range(20) if i % 2 == 0}
diff --git a/tests/config/ignite-config.xml.jinja2 b/tests/config/ignite-config.xml.jinja2
index 325a581..22b103e 100644
--- a/tests/config/ignite-config.xml.jinja2
+++ b/tests/config/ignite-config.xml.jinja2
@@ -60,6 +60,7 @@
                 <property name="host" value="127.0.0.1"/>
                 <property name="port" value="{{ ignite_client_port }}"/>
                 <property name="portRange" value="0"/>
+                <property name="threadPoolSize" value="100"/>
                 {% if use_ssl %}
                     <property name="sslEnabled" value="true"/>
                     <property name="useIgniteSslContextFactory" value="false"/>
diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py
index f1ffcfd..e94853a 100644
--- a/tests/custom/test_cluster.py
+++ b/tests/custom/test_cluster.py
@@ -47,10 +47,9 @@
 @pytest.fixture(autouse=True)
 def cluster_api_supported(request, server1):
     client = Client()
-    client.connect('127.0.0.1', 10801)
-
-    if not client.protocol_context.is_cluster_api_supported():
-        pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.')
+    with client.connect('127.0.0.1', 10801):
+        if not client.protocol_context.is_cluster_api_supported():
+            pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.')
 
 
 def test_cluster_set_active(with_persistence):