blob: 267bc5bd657ee5adfab6ae6fbe9278daa5d77f40 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyignite.connection import AioConnection, Connection
from pyignite.datatypes import AnyDataArray, AnyDataObject, Bool, Int, Long, Map, Null, String, StructArray
from pyignite.datatypes.sql import StatementType
from pyignite.queries import Query, query_perform
from pyignite.queries.op_codes import (
OP_QUERY_SCAN, OP_QUERY_SCAN_CURSOR_GET_PAGE, OP_QUERY_SQL, OP_QUERY_SQL_CURSOR_GET_PAGE, OP_QUERY_SQL_FIELDS,
OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE, OP_RESOURCE_CLOSE
)
from pyignite.utils import deprecated
from .result import APIResult
from ..queries.query import CacheInfo
from ..queries.response import SQLResponse
def scan(conn: 'Connection', cache_info: CacheInfo, page_size: int, partitions: int = -1, local: bool = False,
query_id: int = None) -> APIResult:
"""
Performs scan query.
:param conn: connection to Ignite server,
:param cache_info: cache meta info.
:param page_size: cursor page size,
:param partitions: (optional) number of partitions to query
(negative to query entire cache),
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
Value dict is of following format:
* `cursor`: int, cursor ID,
* `data`: dict, result rows as key-value pairs,
* `more`: bool, True if more data is available for subsequent
‘scan_cursor_get_page’ calls.
"""
return __scan(conn, cache_info, page_size, partitions, local, query_id)
async def scan_async(conn: 'AioConnection', cache_info: CacheInfo, page_size: int, partitions: int = -1,
local: bool = False, query_id: int = None) -> APIResult:
"""
Async version of scan.
"""
return await __scan(conn, cache_info, page_size, partitions, local, query_id)
def __query_result_post_process(result):
if result.status == 0:
result.value = dict(result.value)
return result
def __scan(conn, cache_info, page_size, partitions, local, query_id):
query_struct = Query(
OP_QUERY_SCAN,
[
('cache_info', CacheInfo),
('filter', Null),
('page_size', Int),
('partitions', Int),
('local', Bool),
],
query_id=query_id,
)
return query_perform(
query_struct, conn,
query_params={
'cache_info': cache_info,
'filter': None,
'page_size': page_size,
'partitions': partitions,
'local': 1 if local else 0,
},
response_config=[
('cursor', Long),
('data', Map),
('more', Bool),
],
post_process_fun=__query_result_post_process
)
def scan_cursor_get_page(conn: 'Connection', cursor: int, query_id: int = None) -> APIResult:
"""
Fetches the next scan query cursor page by cursor ID that is obtained
from `scan` function.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
Value dict is of following format:
* `data`: dict, result rows as key-value pairs,
* `more`: bool, True if more data is available for subsequent
‘scan_cursor_get_page’ calls.
"""
return __scan_cursor_get_page(conn, cursor, query_id)
async def scan_cursor_get_page_async(conn: 'AioConnection', cursor: int, query_id: int = None) -> APIResult:
return await __scan_cursor_get_page(conn, cursor, query_id)
def __scan_cursor_get_page(conn, cursor, query_id):
query_struct = Query(
OP_QUERY_SCAN_CURSOR_GET_PAGE,
[
('cursor', Long),
],
query_id=query_id,
)
return query_perform(
query_struct, conn,
query_params={
'cursor': cursor,
},
response_config=[
('data', Map),
('more', Bool),
],
post_process_fun=__query_result_post_process
)
@deprecated(version='1.2.0', reason="This API is deprecated and will be removed in the following major release. "
"Use sql_fields instead")
def sql(
conn: 'Connection', cache_info: CacheInfo,
table_name: str, query_str: str, page_size: int, query_args=None,
distributed_joins: bool = False, replicated_only: bool = False,
local: bool = False, timeout: int = 0, query_id: int = None
) -> APIResult:
"""
Executes an SQL query over data stored in the cluster. The query returns
the whole record (key and value).
:param conn: connection to Ignite server,
:param cache_info: Cache meta info,
:param table_name: name of a type or SQL table,
:param query_str: SQL query string,
:param page_size: cursor page size,
:param query_args: (optional) query arguments,
:param distributed_joins: (optional) distributed joins. Defaults to False,
:param replicated_only: (optional) whether query contains only replicated
tables or not. Defaults to False,
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout (default),
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
Value dict is of following format:
* `cursor`: int, cursor ID,
* `data`: dict, result rows as key-value pairs,
* `more`: bool, True if more data is available for subsequent
‘sql_get_page’ calls.
"""
if query_args is None:
query_args = []
query_struct = Query(
OP_QUERY_SQL,
[
('cache_info', CacheInfo),
('table_name', String),
('query_str', String),
('query_args', AnyDataArray()),
('distributed_joins', Bool),
('local', Bool),
('replicated_only', Bool),
('page_size', Int),
('timeout', Long),
],
query_id=query_id,
)
result = query_struct.perform(
conn,
query_params={
'cache_info': cache_info,
'table_name': table_name,
'query_str': query_str,
'query_args': query_args,
'distributed_joins': 1 if distributed_joins else 0,
'local': 1 if local else 0,
'replicated_only': 1 if replicated_only else 0,
'page_size': page_size,
'timeout': timeout,
},
response_config=[
('cursor', Long),
('data', Map),
('more', Bool),
],
)
if result.status == 0:
result.value = dict(result.value)
return result
@deprecated(version='1.2.0', reason="This API is deprecated and will be removed in the following major release. "
"Use sql_fields instead")
def sql_cursor_get_page(
conn: 'Connection', cursor: int, query_id: int = None,
) -> APIResult:
"""
Retrieves the next SQL query cursor page by cursor ID from `sql`.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
Value dict is of following format:
* `data`: dict, result rows as key-value pairs,
* `more`: bool, True if more data is available for subsequent
‘sql_cursor_get_page’ calls.
"""
query_struct = Query(
OP_QUERY_SQL_CURSOR_GET_PAGE,
[
('cursor', Long),
],
query_id=query_id,
)
result = query_struct.perform(
conn,
query_params={
'cursor': cursor,
},
response_config=[
('data', Map),
('more', Bool),
],
)
if result.status == 0:
result.value = dict(result.value)
return result
def sql_fields(
conn: 'Connection', cache_info: CacheInfo,
query_str: str, page_size: int, query_args=None, schema: str = None,
statement_type: int = StatementType.ANY, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
timeout: int = 0, query_id: int = None
) -> APIResult:
"""
Performs SQL fields query.
:param conn: connection to Ignite server,
:param cache_info: cache meta info.
:param query_str: SQL query string,
:param page_size: cursor page size,
:param query_args: (optional) query arguments. List of values or
(value, type hint) tuples,
:param schema: schema for the query.
:param statement_type: (optional) statement type. Can be:
* StatementType.ALL − any type (default),
* StatementType.SELECT − select,
* StatementType.UPDATE − update.
:param distributed_joins: (optional) distributed joins.
:param local: (optional) pass True if this query should be executed
on local node only.
:param replicated_only: (optional) whether query contains only
replicated tables or not.
:param enforce_join_order: (optional) enforce join order.
:param collocated: (optional) whether your data is co-located or not.
:param lazy: (optional) lazy query execution.
:param include_field_names: (optional) include field names in result.
:param max_rows: (optional) query-wide maximum of rows.
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout.
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
Value dict is of following format:
* `cursor`: int, cursor ID,
* `data`: list, result values,
* `more`: bool, True if more data is available for subsequent
‘sql_fields_cursor_get_page’ calls.
"""
return __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins,
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows,
timeout, query_id)
async def sql_fields_async(
conn: 'AioConnection', cache_info: CacheInfo,
query_str: str, page_size: int, query_args=None, schema: str = None,
statement_type: int = StatementType.ANY, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
timeout: int = 0, query_id: int = None
) -> APIResult:
"""
Async version of sql_fields.
"""
return await __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type,
distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy,
include_field_names, max_rows, timeout, query_id)
def __sql_fields(
conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins, local,
replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout, query_id
):
if query_args is None:
query_args = []
query_struct = Query(
OP_QUERY_SQL_FIELDS,
[
('cache_info', CacheInfo),
('schema', String),
('page_size', Int),
('max_rows', Int),
('query_str', String),
('query_args', AnyDataArray()),
('statement_type', StatementType),
('distributed_joins', Bool),
('local', Bool),
('replicated_only', Bool),
('enforce_join_order', Bool),
('collocated', Bool),
('lazy', Bool),
('timeout', Long),
('include_field_names', Bool),
],
query_id=query_id,
response_type=SQLResponse
)
return query_perform(
query_struct, conn,
query_params={
'cache_info': cache_info,
'schema': schema,
'page_size': page_size,
'max_rows': max_rows,
'query_str': query_str,
'query_args': query_args,
'statement_type': statement_type,
'distributed_joins': distributed_joins,
'local': local,
'replicated_only': replicated_only,
'enforce_join_order': enforce_join_order,
'collocated': collocated,
'lazy': lazy,
'timeout': timeout,
'include_field_names': include_field_names,
},
include_field_names=include_field_names,
has_cursor=True,
)
def sql_fields_cursor_get_page(conn: 'Connection', cursor: int, field_count: int, query_id: int = None) -> APIResult:
"""
Retrieves the next query result page by cursor ID from `sql_fields`.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
:param field_count: a number of fields in a row,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
of type dict with results on success, non-zero status and an error
description otherwise.
Value dict is of following format:
* `data`: list, result values,
* `more`: bool, True if more data is available for subsequent
‘sql_fields_cursor_get_page’ calls.
"""
return __sql_fields_cursor_get_page(conn, cursor, field_count, query_id)
async def sql_fields_cursor_get_page_async(conn: 'AioConnection', cursor: int, field_count: int,
query_id: int = None) -> APIResult:
"""
Async version sql_fields_cursor_get_page.
"""
return await __sql_fields_cursor_get_page(conn, cursor, field_count, query_id)
def __sql_fields_cursor_get_page(conn, cursor, field_count, query_id):
query_struct = Query(
OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE,
[
('cursor', Long),
],
query_id=query_id,
)
return query_perform(
query_struct, conn,
query_params={
'cursor': cursor,
},
response_config=[
('data', StructArray([(f'field_{i}', AnyDataObject) for i in range(field_count)])),
('more', Bool),
],
post_process_fun=__post_process_sql_fields_cursor
)
def __post_process_sql_fields_cursor(result):
if result.status != 0:
return result
value = result.value
result.value = {
'data': [],
'more': value['more']
}
for row_dict in value['data']:
result.value['data'].append(list(row_dict.values()))
return result
def resource_close(conn: 'Connection', cursor: int, query_id: int = None) -> APIResult:
"""
Closes a resource, such as query cursor.
:param conn: connection to Ignite server,
:param cursor: cursor ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
return __resource_close(conn, cursor, query_id)
async def resource_close_async(conn: 'AioConnection', cursor: int, query_id: int = None) -> APIResult:
return await __resource_close(conn, cursor, query_id)
def __resource_close(conn, cursor, query_id):
query_struct = Query(
OP_RESOURCE_CLOSE,
[
('cursor', Long),
],
query_id=query_id,
)
return query_perform(
query_struct, conn,
query_params={
'cursor': cursor,
}
)