blob: a3d89d717464be8eaee0e5701099efb7266f7a23 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import random
import sys
import struct
import warnings
from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import TSocket, TTransport
from tzlocal import get_localzone_name
from iotdb.utils.SessionDataSet import SessionDataSet
from .template.Template import Template
from .template.TemplateQueryType import TemplateQueryType
from .thrift.common.ttypes import TEndPoint
from .thrift.rpc.IClientRPCService import (
Client,
TSCreateTimeseriesReq,
TSCreateAlignedTimeseriesReq,
TSInsertRecordReq,
TSInsertStringRecordReq,
TSInsertTabletReq,
TSExecuteStatementReq,
TSOpenSessionReq,
TSCreateMultiTimeseriesReq,
TSCloseSessionReq,
TSInsertTabletsReq,
TSInsertRecordsReq,
TSInsertRecordsOfOneDeviceReq,
TSCreateSchemaTemplateReq,
TSDropSchemaTemplateReq,
TSAppendSchemaTemplateReq,
TSPruneSchemaTemplateReq,
TSSetSchemaTemplateReq,
TSUnsetSchemaTemplateReq,
TSQueryTemplateReq,
)
from .thrift.rpc.ttypes import (
TSDeleteDataReq,
TSProtocolVersion,
TSSetTimeZoneReq,
TSRawDataQueryReq,
TSLastDataQueryReq,
TSInsertStringRecordsOfOneDeviceReq,
)
from .tsfile.utils.date_utils import parse_date_to_int
from .utils import rpc_utils
from .utils.exception import IoTDBConnectionException, RedirectException
logger = logging.getLogger("IoTDB")
warnings.simplefilter("always", DeprecationWarning)
class Session(object):
DEFAULT_FETCH_SIZE = 5000
DEFAULT_USER = "root"
DEFAULT_PASSWORD = "root"
DEFAULT_ZONE_ID = get_localzone_name()
RETRY_NUM = 3
SQL_DIALECT = "tree"
def __init__(
self,
host,
port,
user=DEFAULT_USER,
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
use_ssl=False,
ca_certs=None,
connection_timeout_in_ms=None,
):
self.__host = host
self.__port = port
self.__hosts = None
self.__ports = None
self.__default_endpoint = TEndPoint(self.__host, self.__port)
self.__user = user
self.__password = password
if fetch_size > 0:
self.__fetch_size = fetch_size
else:
logger.warning(
f"fetch_size {fetch_size} is illegal, use default fetch_size {self.DEFAULT_FETCH_SIZE}"
)
self.__fetch_size = self.DEFAULT_FETCH_SIZE
self.__is_close = True
self.__client = None
self.__default_connection = None
self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
self.__enable_rpc_compression = None
self.__enable_redirection = enable_redirection
self.__device_id_to_endpoint = None
self.__endpoint_to_connection = None
self.sql_dialect = self.SQL_DIALECT
self.database = None
self.__use_ssl = use_ssl
self.__ca_certs = ca_certs
self.__connection_timeout_in_ms = connection_timeout_in_ms
self.__time_precision = "ms"
@classmethod
def init_from_node_urls(
cls,
node_urls,
user=DEFAULT_USER,
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
use_ssl=False,
ca_certs=None,
connection_timeout_in_ms=None,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
session = Session(
None,
None,
user,
password,
fetch_size,
zone_id,
enable_redirection,
use_ssl=use_ssl,
ca_certs=ca_certs,
connection_timeout_in_ms=connection_timeout_in_ms,
)
session.__hosts = []
session.__ports = []
for node_url in node_urls:
split = node_url.split(":")
session.__hosts.append(split[0])
session.__ports.append(int(split[1]))
session.__host = session.__hosts[0]
session.__port = session.__ports[0]
session.__default_endpoint = TEndPoint(session.__host, session.__port)
return session
def open(self, enable_rpc_compression=False):
if not self.__is_close:
return
self.__enable_rpc_compression = enable_rpc_compression
if self.__hosts is None:
self.__default_connection = self.init_connection(self.__default_endpoint)
else:
for i in range(0, len(self.__hosts)):
self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
try:
self.__default_connection = self.init_connection(
self.__default_endpoint
)
except Exception as e:
if not self.reconnect():
if str(e).startswith("Could not connect to any of"):
error_msg = (
"Cluster has no nodes to connect because: "
+ self.connection_error_msg()
)
else:
error_msg = str(e)
raise IoTDBConnectionException(error_msg) from None
break
self.__client = self.__default_connection.client
self.__session_id = self.__default_connection.session_id
self.__statement_id = self.__default_connection.statement_id
self.__is_close = False
if self.__enable_redirection:
self.__device_id_to_endpoint = {}
self.__endpoint_to_connection = {
str(self.__default_endpoint): self.__default_connection
}
def init_connection(self, endpoint):
transport = self.__get_transport(endpoint)
if self.__enable_rpc_compression:
client = Client(TCompactProtocol.TCompactProtocolAccelerated(transport))
else:
client = Client(TBinaryProtocol.TBinaryProtocolAccelerated(transport))
configuration = {"version": "V_1_0", "sql_dialect": self.sql_dialect}
if self.database is not None:
configuration["db"] = self.database
open_req = TSOpenSessionReq(
client_protocol=self.protocol_version,
username=self.__user,
password=self.__password,
zoneId=self.__zone_id,
configuration=configuration,
)
try:
open_resp = client.openSession(open_req)
rpc_utils.verify_success(open_resp.status)
if open_resp.configuration is not None:
if "timestamp_precision" in open_resp.configuration:
self.__time_precision = open_resp.configuration[
"timestamp_precision"
]
if self.protocol_version != open_resp.serverProtocolVersion:
logger.exception(
"Protocol differ, Client version is {}, but Server version is {}".format(
self.protocol_version, open_resp.serverProtocolVersion
)
)
# version is less than 0.10
if open_resp.serverProtocolVersion == 0:
raise TTransport.TException(message="Protocol not supported.")
session_id = open_resp.sessionId
statement_id = client.requestStatementId(session_id)
except Exception as e:
transport.close()
raise IoTDBConnectionException(e) from None
if self.__zone_id is not None:
request = TSSetTimeZoneReq(session_id, self.__zone_id)
try:
client.setTimeZone(request)
except TTransport.TException as e:
raise IoTDBConnectionException(
"Could not set time zone because: ", e
) from None
else:
self.__zone_id = self.get_time_zone()
return SessionConnection(client, transport, session_id, statement_id)
def __get_transport(self, endpoint):
if self.__use_ssl:
import ssl
from thrift.transport import TSSLSocket
if sys.version_info >= (3, 10):
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
else:
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_verify_locations(cafile=self.__ca_certs)
socket = TSSLSocket.TSSLSocket(
host=endpoint.ip, port=endpoint.port, ssl_context=context
)
else:
socket = TSocket.TSocket(endpoint.ip, endpoint.port)
socket.setTimeout(self.__connection_timeout_in_ms)
transport = TTransport.TFramedTransport(socket)
if not transport.isOpen():
try:
transport.open()
except TTransport.TTransportException as e:
raise IoTDBConnectionException(e) from None
return transport
def is_open(self):
return not self.__is_close
def close(self):
if self.__is_close:
return
try:
if self.__enable_redirection:
for connection in self.__endpoint_to_connection.values():
req = TSCloseSessionReq(connection.session_id)
connection.close_connection(req)
else:
req = TSCloseSessionReq(self.__session_id)
self.__default_connection.close_connection(req)
finally:
self.__is_close = True
def set_storage_group(self, group_name):
"""
create one database
:param group_name: String, database name (starts from root)
"""
try:
return rpc_utils.verify_success(
self.__client.setStorageGroup(self.__session_id, group_name)
)
except TTransport.TException as e:
if self.reconnect():
try:
return rpc_utils.verify_success(
self.__client.setStorageGroup(self.__session_id, group_name)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def delete_storage_group(self, storage_group):
"""
delete one database.
:param storage_group: String, path of the target database.
"""
groups = [storage_group]
return self.delete_storage_groups(groups)
def delete_storage_groups(self, storage_group_lst):
"""
delete multiple databases.
:param storage_group_lst: List, paths of the target databases.
"""
try:
return rpc_utils.verify_success(
self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
)
except TTransport.TException as e:
if self.reconnect():
try:
return rpc_utils.verify_success(
self.__client.deleteStorageGroups(
self.__session_id, storage_group_lst
)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def create_time_series(
self,
ts_path,
data_type,
encoding,
compressor,
props=None,
tags=None,
attributes=None,
alias=None,
):
"""
create single time series
:param ts_path: String, complete time series path (starts from root)
:param data_type: TSDataType, data type for this time series
:param encoding: TSEncoding, encoding for this time series
:param compressor: Compressor, compressing type for this time series
:param props: Dictionary, properties for time series
:param tags: Dictionary, tag map for time series
:param attributes: Dictionary, attribute map for time series
:param alias: String, measurement alias for time series
"""
request = TSCreateTimeseriesReq(
self.__session_id,
ts_path,
data_type,
encoding,
compressor,
props,
tags,
attributes,
alias,
)
try:
return rpc_utils.verify_success(self.__client.createTimeseries(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.createTimeseries(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def create_aligned_time_series(
self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
):
"""
create aligned time series
:param device_id: String, device id for timeseries (starts from root)
:param measurements_lst: List of String, measurement ids for time series
:param data_type_lst: List of TSDataType, data types for time series
:param encoding_lst: List of TSEncoding, encodings for time series
:param compressor_lst: List of Compressor, compressing types for time series
"""
request = TSCreateAlignedTimeseriesReq(
self.__session_id,
device_id,
measurements_lst,
data_type_lst,
encoding_lst,
compressor_lst,
)
try:
return rpc_utils.verify_success(
self.__client.createAlignedTimeseries(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.createAlignedTimeseries(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def create_multi_time_series(
self,
ts_path_lst,
data_type_lst,
encoding_lst,
compressor_lst,
props_lst=None,
tags_lst=None,
attributes_lst=None,
alias_lst=None,
):
"""
create multiple time series
:param ts_path_lst: List of String, complete time series paths (starts from root)
:param data_type_lst: List of TSDataType, data types for time series
:param encoding_lst: List of TSEncoding, encodings for time series
:param compressor_lst: List of Compressor, compressing types for time series
:param props_lst: List of Props Dictionary, properties for time series
:param tags_lst: List of tag Dictionary, tag maps for time series
:param attributes_lst: List of attribute Dictionary, attribute maps for time series
:param alias_lst: List of alias, measurement alias for time series
"""
request = TSCreateMultiTimeseriesReq(
self.__session_id,
ts_path_lst,
data_type_lst,
encoding_lst,
compressor_lst,
props_lst,
tags_lst,
attributes_lst,
alias_lst,
)
try:
return rpc_utils.verify_success(
self.__client.createMultiTimeseries(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.createMultiTimeseries(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def delete_time_series(self, paths_list):
"""
delete multiple time series, including data and schema
:param paths_list: List of time series path, which should be complete (starts from root)
"""
try:
return rpc_utils.verify_success(
self.__client.deleteTimeseries(self.__session_id, paths_list)
)
except TTransport.TException as e:
if self.reconnect():
try:
return rpc_utils.verify_success(
self.__client.deleteTimeseries(self.__session_id, paths_list)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def check_time_series_exists(self, path):
"""
check whether a specific time series exists
:param path: String, complete path of time series for checking
:return Boolean value indicates whether it exists.
"""
data_set = self.execute_query_statement("SHOW TIMESERIES {}".format(path))
result = data_set.has_next()
data_set.close_operation_handle()
return result
def delete_data(self, paths_list, end_time):
"""
delete all data <= end_time in multiple time series
:param paths_list: time series list that the data in.
:param end_time: data with time stamp less than or equal to time will be deleted.
"""
request = TSDeleteDataReq(
self.__session_id, paths_list, -9223372036854775808, end_time
)
try:
return rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def delete_data_in_range(self, paths_list, start_time, end_time):
"""
delete data >= start_time and data <= end_time in multiple timeseries
:param paths_list: time series list that the data in.
:param start_time: delete range start time.
:param end_time: delete range end time.
"""
request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
try:
return rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_str_record(self, device_id, timestamp, measurements, string_values):
"""special case for inserting one row of String (TEXT) value"""
if type(string_values) == str:
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
if self.__has_none_value(string_values):
filtered_measurements, filtered_values = zip(
*[(m, v) for m, v in zip(measurements, string_values) if v is not None]
)
measurements = list(filtered_measurements)
values = list(filtered_values)
if len(measurements) == 0 or len(values) == 0:
logger.info("All inserting values are none!")
return
request = self.gen_insert_str_record_req(
device_id, timestamp, measurements, string_values
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecord(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertStringRecord(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_aligned_str_record(
self, device_id, timestamp, measurements, string_values
):
"""special case for inserting one row of String (TEXT) value"""
if type(string_values) == str:
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
if self.__has_none_value(string_values):
filtered_measurements, filtered_values = zip(
*[(m, v) for m, v in zip(measurements, string_values) if v is not None]
)
measurements = list(filtered_measurements)
values = list(filtered_values)
if len(measurements) == 0 or len(values) == 0:
logger.info("All inserting values are none!")
return
request = self.gen_insert_str_record_req(
device_id, timestamp, measurements, string_values, True
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecord(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertStringRecord(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_record(self, device_id, timestamp, measurements, data_types, values):
"""
insert one row of record into database, if you want improve your performance, please use insertTablet method
for example a record at time=10086 with three measurements is:
timestamp, m1, m2, m3
10086, 125.3, True, text1
:param device_id: String, time series path for device
:param timestamp: Integer, indicate the timestamp of the row of data
:param measurements: List of String, sensor names
:param data_types: List of TSDataType, indicate the data type for each sensor
:param values: List, values to be inserted, for each sensor
"""
if self.__has_none_value(values):
filtered_measurements, filtered_data_types, filtered_values = zip(
*[
(m, d, v)
for m, d, v in zip(measurements, data_types, values)
if v is not None
]
)
measurements = list(filtered_measurements)
data_types = list(filtered_data_types)
values = list(filtered_values)
if len(measurements) == 0 or len(data_types) == 0 or len(values) == 0:
logger.info("All inserting values are none!")
return
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertRecord(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.insertRecord(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_records(
self, device_ids: list, times, measurements_lst, types_lst, values_lst
):
"""
insert multiple rows of data, records are independent to each other, in other words, there's no relationship
between those records
:param device_ids: List of String, time series paths for device
:param times: List of Integer, timestamps for records
:param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
:param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
:param values_lst: 2-D List, values to be inserted, for each device
"""
if self.__has_none_value(values_lst):
(
device_ids,
times,
measurements_lst,
types_lst,
values_lst,
) = self.__filter_lists_by_values(
device_ids, times, measurements_lst, types_lst, values_lst
)
if len(device_ids) == 0:
logger.info("All inserting values are none!")
return
if self.__enable_redirection:
request_group = {}
for i in range(len(device_ids)):
connection = self.get_connection(device_ids[i])
request = request_group.setdefault(
connection.client,
TSInsertRecordsReq(connection.session_id, [], [], [], []),
)
request.prefixPaths.append(device_ids[i])
request.timestamps.append(times[i])
request.measurementsList.append(measurements_lst[i])
request.valuesList.append(
Session.value_to_bytes(types_lst[i], values_lst[i])
)
for client, request in request_group.items():
try:
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertRecords(request), request.prefixPaths
)
except RedirectException as e:
for device, endpoint in e.device_to_endpoint.items():
self.handle_redirection(device, endpoint)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
rpc_utils.verify_success(
self.__client.insertRecords(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
return 0
else:
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, types_lst, values_lst
)
try:
return rpc_utils.verify_success(self.__client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertRecords(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
def insert_aligned_record(
self, device_id, timestamp, measurements, data_types, values
):
"""
insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
for example a record at time=10086 with three measurements is:
timestamp, m1, m2, m3
10086, 125.3, True, text1
:param device_id: String, time series path for device
:param timestamp: Integer, indicate the timestamp of the row of data
:param measurements: List of String, sensor names
:param data_types: List of TSDataType, indicate the data type for each sensor
:param values: List, values to be inserted, for each sensor
"""
if self.__has_none_value(values):
filtered_measurements, filtered_data_types, filtered_values = zip(
*[
(m, d, v)
for m, d, v in zip(measurements, data_types, values)
if v is not None
]
)
measurements = list(filtered_measurements)
data_types = list(filtered_data_types)
values = list(filtered_values)
if len(measurements) == 0 or len(data_types) == 0 or len(values) == 0:
logger.info("All inserting values are none!")
return
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values, True
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertRecord(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.insertRecord(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_aligned_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
):
"""
insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
between those records
:param device_ids: List of String, time series paths for device
:param times: List of Integer, timestamps for records
:param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
:param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
:param values_lst: 2-D List, values to be inserted, for each device
"""
if self.__has_none_value(values_lst):
(
device_ids,
times,
measurements_lst,
types_lst,
values_lst,
) = self.__filter_lists_by_values(
device_ids, times, measurements_lst, types_lst, values_lst
)
if len(device_ids) == 0:
logger.info("All inserting values are none!")
return
if self.__enable_redirection:
request_group = {}
for i in range(len(device_ids)):
connection = self.get_connection(device_ids[i])
request = request_group.setdefault(
connection.client,
TSInsertRecordsReq(connection.session_id, [], [], [], [], True),
)
request.prefixPaths.append(device_ids[i])
request.timestamps.append(times[i])
request.measurementsList.append(measurements_lst[i])
request.valuesList.append(
Session.value_to_bytes(types_lst[i], values_lst[i])
)
for client, request in request_group.items():
try:
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertRecords(request), request.prefixPaths
)
except RedirectException as e:
for device, endpoint in e.device_to_endpoint.items():
self.handle_redirection(device, endpoint)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
rpc_utils.verify_success(
self.__client.insertRecords(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
return 0
else:
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, types_lst, values_lst, True
)
try:
return rpc_utils.verify_success(self.__client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertRecords(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
def test_insert_record(
self, device_id, timestamp, measurements, data_types, values
):
"""
this method NOT insert data into database and the server just return after accept the request, this method
should be used to test other time cost in client
:param device_id: String, time series path for device
:param timestamp: Integer, indicate the timestamp of the row of data
:param measurements: List of String, sensor names
:param data_types: List of TSDataType, indicate the data type for each sensor
:param values: List, values to be inserted, for each sensor
"""
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
try:
return rpc_utils.verify_success(self.__client.testInsertRecord(request))
except TTransport.TException as e:
if self.reconnect():
try:
return rpc_utils.verify_success(
self.__client.testInsertRecord(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def test_insert_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
):
"""
this method NOT insert data into database and the server just return after accept the request, this method
should be used to test other time cost in client
:param device_ids: List of String, time series paths for device
:param times: List of Integer, timestamps for records
:param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
:param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
:param values_lst: 2-D List, values to be inserted, for each device
"""
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, types_lst, values_lst
)
try:
return rpc_utils.verify_success(self.__client.testInsertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
return rpc_utils.verify_success(
self.__client.testInsertRecords(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def gen_insert_record_req(
self, device_id, timestamp, measurements, data_types, values, is_aligned=False
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
"length of data types does not equal to length of values!"
)
values_in_bytes = Session.value_to_bytes(data_types, values)
return TSInsertRecordReq(
self.__session_id,
device_id,
measurements,
values_in_bytes,
timestamp,
is_aligned,
)
def gen_insert_str_record_req(
self, device_id, timestamp, measurements, values, is_aligned=False
):
if len(values) != len(measurements):
raise RuntimeError(
"length of measurements does not equal to length of values!"
)
return TSInsertStringRecordReq(
self.__session_id, device_id, measurements, values, timestamp, is_aligned
)
def gen_insert_records_req(
self,
device_ids,
times,
measurements_lst,
types_lst,
values_lst,
is_aligned=False,
):
if (
(len(device_ids) != len(measurements_lst))
or (len(times) != len(types_lst))
or (len(device_ids) != len(times))
or (len(times) != len(values_lst))
):
raise RuntimeError(
"deviceIds, times, measurementsList and valuesList's size should be equal"
)
value_lst = []
for values, data_types, measurements in zip(
values_lst, types_lst, measurements_lst
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
"deviceIds, times, measurementsList and valuesList's size should be equal"
)
values_in_bytes = Session.value_to_bytes(data_types, values)
value_lst.append(values_in_bytes)
return TSInsertRecordsReq(
self.__session_id,
device_ids,
measurements_lst,
value_lst,
times,
is_aligned,
)
def insert_tablet(self, tablet):
"""
insert one tablet, in a tablet, for each timestamp, the number of measurements is same
for example three records in the same device can form a tablet:
timestamps, m1, m2, m3
1, 125.3, True, text1
2, 111.6, False, text2
3, 688.6, True, text3
Notice: From 0.13.0, the tablet can contain empty cell
The tablet itself is sorted (see docs of Tablet.py)
:param tablet: a tablet specified above
"""
request = self.gen_insert_tablet_req(tablet)
try:
connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
return self.handle_redirection(
tablet.get_insert_target_name(), e.redirect_node
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.insertTablet(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_tablets(self, tablet_lst):
"""
insert multiple tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
if self.__enable_redirection:
request_group = {}
for i in range(len(tablet_lst)):
connection = self.get_connection(tablet_lst[i].get_insert_target_name())
request = request_group.setdefault(
connection.client,
TSInsertTabletsReq(
connection.session_id, [], [], [], [], [], [], False
),
)
request.prefixPaths.append(tablet_lst[i].get_insert_target_name())
request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
request.measurementsList.append(tablet_lst[i].get_measurements())
request.valuesList.append(tablet_lst[i].get_binary_values())
request.sizeList.append(tablet_lst[i].get_row_number())
request.typesList.append(tablet_lst[i].get_data_types())
for client, request in request_group.items():
try:
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertTablets(request), request.prefixPaths
)
except RedirectException as e:
for device, endpoint in e.device_to_endpoint.items():
self.handle_redirection(device, endpoint)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
rpc_utils.verify_success(
self.__client.insertTablets(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
return 0
else:
request = self.gen_insert_tablets_req(tablet_lst)
try:
return rpc_utils.verify_success(self.__client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertTablets(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
def insert_aligned_tablet(self, tablet):
"""
insert one aligned tablet, in a tablet, for each timestamp, the number of measurements is same
for example three records in the same device can form a tablet:
timestamps, m1, m2, m3
1, 125.3, True, text1
2, 111.6, False, text2
3, 688.6, True, text3
Notice: From 0.13.0, the tablet can contain empty cell
The tablet itself is sorted (see docs of Tablet.py)
:param tablet: a tablet specified above
"""
request = self.gen_insert_tablet_req(tablet, True)
try:
connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
return self.handle_redirection(
tablet.get_insert_target_name(), e.redirect_node
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.insertTablet(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_aligned_tablets(self, tablet_lst):
"""
insert multiple aligned tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
if self.__enable_redirection:
request_group = {}
for i in range(len(tablet_lst)):
connection = self.get_connection(tablet_lst[i].get_insert_target_name())
request = request_group.setdefault(
connection.client,
TSInsertTabletsReq(
connection.session_id, [], [], [], [], [], [], True
),
)
request.prefixPaths.append(tablet_lst[i].get_insert_target_name())
request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
request.measurementsList.append(tablet_lst[i].get_measurements())
request.valuesList.append(tablet_lst[i].get_binary_values())
request.sizeList.append(tablet_lst[i].get_row_number())
request.typesList.append(tablet_lst[i].get_data_types())
for client, request in request_group.items():
try:
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertTablets(request), request.prefixPaths
)
except RedirectException as e:
for device, endpoint in e.device_to_endpoint.items():
self.handle_redirection(device, endpoint)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
rpc_utils.verify_success(
self.__client.insertTablets(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
return 0
else:
request = self.gen_insert_tablets_req(tablet_lst, True)
try:
return rpc_utils.verify_success(self.__client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertTablets(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(
self.connection_error_msg()
) from None
def insert_relational_tablet(self, tablet):
"""
insert one tablet, for example three column in the table1 can form a tablet:
timestamps, id1, attr1, m1
1, id:1, attr:1, 1.0
2, id:1, attr:1, 2.0
3, id:2, attr:2, 3.0
:param tablet: a tablet specified above
"""
request = self.gen_insert_relational_tablet_req(tablet)
try:
connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
return self.handle_redirection(
tablet.get_insert_target_name(), e.redirect_node
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(self.__client.insertTablet(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
):
# sort by timestamp
sorted_zipped = sorted(
zip(times_list, measurements_list, types_list, values_list)
)
result = zip(*sorted_zipped)
times_list, measurements_list, types_list, values_list = [
list(x) for x in result
]
return self.insert_records_of_one_device_sorted(
device_id, times_list, measurements_list, types_list, values_list
)
def insert_records_of_one_device_sorted(
self, device_id, times_list, measurements_list, types_list, values_list
):
"""
Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
executeBatch, we pack some insert request in batch and send them to server. If you want improve
your performance, please see insertTablet method
:param device_id: device id
:param times_list: timestamps list
:param measurements_list: measurements list
:param types_list: types list
:param values_list: values list
"""
# check parameter
size = len(times_list)
if (
size != len(measurements_list)
or size != len(types_list)
or size != len(values_list)
):
raise RuntimeError(
"insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
)
# check sorted
if not Session.check_sorted(times_list):
raise RuntimeError(
"insert records of one device error: timestamp not sorted"
)
request = self.gen_insert_records_of_one_device_request(
device_id, times_list, measurements_list, values_list, types_list
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertRecordsOfOneDevice(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_aligned_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
):
# sort by timestamp
sorted_zipped = sorted(
zip(times_list, measurements_list, types_list, values_list)
)
result = zip(*sorted_zipped)
times_list, measurements_list, types_list, values_list = [
list(x) for x in result
]
return self.insert_aligned_records_of_one_device_sorted(
device_id, times_list, measurements_list, types_list, values_list
)
def insert_aligned_records_of_one_device_sorted(
self, device_id, times_list, measurements_list, types_list, values_list
):
"""
Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
executeBatch, we pack some insert request in batch and send them to server. If you want to improve
your performance, please see insertTablet method
:param device_id: device id
:param times_list: timestamps list
:param measurements_list: measurements list
:param types_list: types list
:param values_list: values list
"""
# check parameter
size = len(times_list)
if (
size != len(measurements_list)
or size != len(types_list)
or size != len(values_list)
):
raise RuntimeError(
"insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
)
# check sorted
if not Session.check_sorted(times_list):
raise RuntimeError(
"insert records of one device error: timestamp not sorted"
)
request = self.gen_insert_records_of_one_device_request(
device_id, times_list, measurements_list, values_list, types_list, True
)
# send request
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertRecordsOfOneDevice(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def gen_insert_records_of_one_device_request(
self,
device_id,
times_list,
measurements_list,
values_list,
types_list,
is_aligned=False,
):
binary_value_list = []
for values, data_types, measurements in zip(
values_list, types_list, measurements_list
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
"insert records of one device error: deviceIds, times, measurementsList and valuesList's size should be equal"
)
values_in_bytes = Session.value_to_bytes(data_types, values)
binary_value_list.append(values_in_bytes)
return TSInsertRecordsOfOneDeviceReq(
self.__session_id,
device_id,
measurements_list,
binary_value_list,
times_list,
is_aligned,
)
def test_insert_tablet(self, tablet):
"""
this method NOT insert data into database and the server just return after accept the request, this method
should be used to test other time cost in client
:param tablet: a tablet of data
"""
request = self.gen_insert_tablet_req(tablet)
try:
return rpc_utils.verify_success(self.__client.testInsertTablet(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.testInsertTablet(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def test_insert_tablets(self, tablet_list):
"""
this method NOT insert data into database and the server just return after accept the request, this method
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
request = self.gen_insert_tablets_req(tablet_list)
try:
return rpc_utils.verify_success(self.__client.testInsertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.testInsertTablets(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def gen_insert_tablet_req(self, tablet, is_aligned=False):
return TSInsertTabletReq(
self.__session_id,
tablet.get_insert_target_name(),
tablet.get_measurements(),
tablet.get_binary_values(),
tablet.get_binary_timestamps(),
tablet.get_data_types(),
tablet.get_row_number(),
is_aligned,
)
def gen_insert_relational_tablet_req(self, tablet, is_aligned=False):
return TSInsertTabletReq(
self.__session_id,
tablet.get_insert_target_name(),
tablet.get_measurements(),
tablet.get_binary_values(),
tablet.get_binary_timestamps(),
tablet.get_data_types(),
tablet.get_row_number(),
is_aligned,
True,
tablet.get_column_categories(),
)
def gen_insert_tablets_req(self, tablet_lst, is_aligned=False):
device_id_lst = []
measurements_lst = []
values_lst = []
timestamps_lst = []
type_lst = []
size_lst = []
for tablet in tablet_lst:
device_id_lst.append(tablet.get_insert_target_name())
measurements_lst.append(tablet.get_measurements())
values_lst.append(tablet.get_binary_values())
timestamps_lst.append(tablet.get_binary_timestamps())
type_lst.append(tablet.get_data_types())
size_lst.append(tablet.get_row_number())
return TSInsertTabletsReq(
self.__session_id,
device_id_lst,
measurements_lst,
values_lst,
timestamps_lst,
type_lst,
size_lst,
is_aligned,
)
def execute_query_statement(self, sql, timeout=0):
"""
execute query sql statement and returns SessionDataSet
:param sql: String, query sql statement
:param timeout:
:return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py):
"""
request = TSExecuteStatementReq(
self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
)
try:
resp = self.__client.executeQueryStatementV2(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
resp = self.__client.executeQueryStatementV2(request)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
rpc_utils.verify_success(resp.status)
return SessionDataSet(
sql,
resp.columns,
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
self.__session_id,
self.__client,
self.__statement_id,
resp.queryResult,
resp.ignoreTimeStamp,
timeout,
resp.moreData,
self.__fetch_size,
self.__zone_id,
self.__time_precision,
resp.columnIndex2TsBlockColumnIndexList,
)
def execute_non_query_statement(self, sql):
"""
execute non-query sql statement
:param sql: String, non-query sql statement
"""
request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
try:
resp = self.__client.executeUpdateStatementV2(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
resp = self.__client.executeUpdateStatementV2(request)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
previous_db = self.database
if resp.database is not None:
self.database = resp.database
if previous_db != self.database and self.__endpoint_to_connection is not None:
iterator = iter(self.__endpoint_to_connection.items())
for entry in list(iterator):
endpoint, connection = entry
if connection != self.__default_connection:
try:
connection.change_database(sql)
except Exception as e:
self.__endpoint_to_connection.pop(endpoint)
return rpc_utils.verify_success(resp.status)
def execute_statement(self, sql: str, timeout=0):
request = TSExecuteStatementReq(
self.__session_id, sql, self.__statement_id, timeout
)
try:
resp = self.__client.executeStatementV2(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
resp = self.__client.executeStatementV2(request)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
rpc_utils.verify_success(resp.status)
if resp.columns:
return SessionDataSet(
sql,
resp.columns,
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
self.__session_id,
self.__client,
self.__statement_id,
resp.queryResult,
resp.ignoreTimeStamp,
timeout,
resp.moreData,
self.__fetch_size,
self.__zone_id,
self.__time_precision,
resp.columnIndex2TsBlockColumnIndexList,
)
else:
return None
@staticmethod
def value_to_bytes(data_types, values):
format_str_list = [">"]
values_tobe_packed = []
for data_type, value in zip(data_types, values):
# BOOLEAN
if data_type == 0:
format_str_list.append("c?")
values_tobe_packed.append(b"\x00")
values_tobe_packed.append(value)
# INT32
elif data_type == 1:
format_str_list.append("ci")
values_tobe_packed.append(b"\x01")
values_tobe_packed.append(value)
# INT64
elif data_type == 2:
format_str_list.append("cq")
values_tobe_packed.append(b"\x02")
values_tobe_packed.append(value)
# FLOAT
elif data_type == 3:
format_str_list.append("cf")
values_tobe_packed.append(b"\x03")
values_tobe_packed.append(value)
# DOUBLE
elif data_type == 4:
format_str_list.append("cd")
values_tobe_packed.append(b"\x04")
values_tobe_packed.append(value)
# TEXT
elif data_type == 5:
if isinstance(value, str):
value_bytes = bytes(value, "utf-8")
else:
value_bytes = value
format_str_list.append("ci")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
values_tobe_packed.append(b"\x05")
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
# TIMESTAMP
elif data_type == 8:
format_str_list.append("cq")
values_tobe_packed.append(b"\x08")
values_tobe_packed.append(value)
# DATE
elif data_type == 9:
format_str_list.append("ci")
values_tobe_packed.append(b"\x09")
values_tobe_packed.append(parse_date_to_int(value))
# BLOB
elif data_type == 10:
format_str_list.append("ci")
format_str_list.append(str(len(value)))
format_str_list.append("s")
values_tobe_packed.append(b"\x0a")
values_tobe_packed.append(len(value))
values_tobe_packed.append(value)
# STRING
elif data_type == 11:
if isinstance(value, str):
value_bytes = bytes(value, "utf-8")
else:
value_bytes = value
format_str_list.append("ci")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
values_tobe_packed.append(b"\x0b")
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
else:
raise RuntimeError("Unsupported data type:" + str(data_type))
format_str = "".join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
def get_time_zone(self):
if self.__zone_id is not None:
return self.__zone_id
try:
resp = self.__client.getTimeZone(self.__session_id)
except TTransport.TException as e:
raise IoTDBConnectionException(
"Could not get time zone because: ", e
) from None
return resp.timeZone
def set_time_zone(self, zone_id):
request = TSSetTimeZoneReq(self.__session_id, zone_id)
try:
status = self.__client.setTimeZone(request)
logger.debug(
"setting time zone_id as {}, message: {}".format(
zone_id, status.message
)
)
except TTransport.TException as e:
raise IoTDBConnectionException(
"Could not set time zone because: ", e
) from None
self.__zone_id = zone_id
@staticmethod
def check_sorted(timestamps):
for i in range(1, len(timestamps)):
if timestamps[i] < timestamps[i - 1]:
return False
return True
def execute_raw_data_query(
self, paths: list, start_time: int, end_time: int
) -> SessionDataSet:
"""
execute query statement and returns SessionDataSet
:param paths: String path list
:param start_time: Query start time
:param end_time: Query end time
:return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
"""
request = TSRawDataQueryReq(
self.__session_id,
paths,
self.__fetch_size,
startTime=start_time,
endTime=end_time,
statementId=self.__statement_id,
enableRedirectQuery=False,
)
try:
resp = self.__client.executeRawDataQueryV2(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
resp = self.__client.executeRawDataQueryV2(request)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
rpc_utils.verify_success(resp.status)
return SessionDataSet(
"",
resp.columns,
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
self.__session_id,
self.__client,
self.__statement_id,
resp.queryResult,
resp.ignoreTimeStamp,
0,
resp.moreData,
self.__fetch_size,
self.__zone_id,
self.__time_precision,
resp.columnIndex2TsBlockColumnIndexList,
)
def execute_last_data_query(self, paths: list, last_time: int) -> SessionDataSet:
"""
execute query statement and returns SessionDataSet
:param paths: String path list
:param last_time: Query last time
:return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
"""
request = TSLastDataQueryReq(
self.__session_id,
paths,
self.__fetch_size,
last_time,
self.__statement_id,
enableRedirectQuery=False,
)
try:
resp = self.__client.executeLastDataQueryV2(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
resp = self.__client.executeLastDataQueryV2(request)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
rpc_utils.verify_success(resp.status)
return SessionDataSet(
"",
resp.columns,
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
self.__statement_id,
self.__client,
self.__session_id,
resp.queryResult,
resp.ignoreTimeStamp,
0,
resp.moreData,
self.__fetch_size,
self.__zone_id,
self.__time_precision,
resp.columnIndex2TsBlockColumnIndexList,
)
def insert_string_records_of_one_device(
self,
device_id: str,
times: list,
measurements_list: list,
values_list: list,
have_sorted: bool = False,
):
"""
insert multiple row of string record into database:
timestamp, m1, m2, m3
0, text1, text2, text3
:param device_id: String, device id
:param times: Timestamp list
:param measurements_list: Measurements list
:param values_list: Value list
:param have_sorted: have these list been sorted by timestamp
"""
if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
raise RuntimeError(
"insert records of one device error: times, measurementsList and valuesList's size should be equal!"
)
request = self.gen_insert_string_records_of_one_device_request(
device_id, times, measurements_list, values_list, have_sorted, False
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecordsOfOneDevice(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def insert_aligned_string_records_of_one_device(
self,
device_id: str,
times: list,
measurements_list: list,
values: list,
have_sorted: bool = False,
):
if (len(times) != len(measurements_list)) or (len(times) != len(values)):
raise RuntimeError(
"insert records of one device error: times, measurementsList and valuesList's size should be equal!"
)
request = self.gen_insert_string_records_of_one_device_request(
device_id, times, measurements_list, values, have_sorted, True
)
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecordsOfOneDevice(request)
)
except RedirectException as e:
return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def reconnect(self):
if self.__hosts is None:
return False
connected = False
for i in range(1, self.RETRY_NUM + 1):
if (
self.__default_connection is not None
and self.__default_connection.transport is not None
):
self.__default_connection.transport.close()
curr_host_index = random.randint(0, len(self.__hosts) - 1)
try_host_num = 0
j = curr_host_index
while j < len(self.__hosts):
if try_host_num == len(self.__hosts):
break
self.__default_endpoint = TEndPoint(self.__hosts[j], self.__ports[j])
if j == len(self.__hosts) - 1:
j = -1
try_host_num += 1
try:
self.__default_connection = self.init_connection(
self.__default_endpoint
)
self.__client = self.__default_connection.client
self.__session_id = self.__default_connection.session_id
self.__statement_id = self.__default_connection.statement_id
connected = True
if self.__enable_redirection:
self.__endpoint_to_connection = {
str(self.__default_endpoint): self.__default_connection
}
except IoTDBConnectionException:
pass
j += 1
continue
break
if connected:
break
return connected
def connection_error_msg(self):
if self.__hosts is None:
msg = "Could not connect to [('%s', %s)]" % (self.__host, self.__port)
else:
node_list = []
for i in range(len(self.__hosts)):
node_list.append("('%s', %s)" % (self.__hosts[i], self.__ports[i]))
msg = "Could not connect to any of [%s]" % ", ".join(node_list)
return msg
def get_connection(self, device_id):
if (
self.__enable_redirection
and len(self.__device_id_to_endpoint) != 0
and device_id in self.__device_id_to_endpoint
):
endpoint = self.__device_id_to_endpoint[device_id]
if str(endpoint) in self.__endpoint_to_connection:
return self.__endpoint_to_connection[str(endpoint)]
return self.__default_connection
def handle_redirection(self, device_id, endpoint: TEndPoint):
if self.__enable_redirection:
if endpoint.ip == "0.0.0.0":
return 0
if (
device_id not in self.__device_id_to_endpoint
or self.__device_id_to_endpoint[device_id] != endpoint
):
self.__device_id_to_endpoint[device_id] = endpoint
if str(endpoint) in self.__endpoint_to_connection:
connection = self.__endpoint_to_connection[str(endpoint)]
else:
try:
connection = self.init_connection(endpoint)
except Exception:
connection = None
self.__endpoint_to_connection[str(endpoint)] = connection
if connection is None:
self.__device_id_to_endpoint.pop(device_id)
return 0
def gen_insert_string_records_of_one_device_request(
self,
device_id,
times,
measurements_list,
values_list,
have_sorted,
is_aligned=False,
):
if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
raise RuntimeError(
"insert records of one device error: times, measurementsList and valuesList's size should be equal!"
)
if not Session.check_sorted(times):
# sort by timestamp
sorted_zipped = sorted(zip(times, measurements_list, values_list))
result = zip(*sorted_zipped)
times_list, measurements_list, values_list = [list(x) for x in result]
request = TSInsertStringRecordsOfOneDeviceReq(
self.__session_id,
device_id,
measurements_list,
values_list,
times,
is_aligned,
)
return request
def __has_none_value(self, values_list) -> bool:
for item in values_list:
if isinstance(item, list):
if self.__has_none_value(item):
return True
elif item is None:
return True
return False
@staticmethod
def __filter_lists_by_values(
device_lst, time_lst, measurements_lst, types_lst, values_lst
):
filtered_devices = []
filtered_times = []
filtered_measurements = []
filtered_types = []
filtered_values = []
for device, time_, measurements, types, values in zip(
device_lst, time_lst, measurements_lst, types_lst, values_lst
):
filtered_row = [
(m, t, v)
for m, t, v in zip(measurements, types, values)
if v is not None
]
if filtered_row:
f_measurements, f_types, f_values = zip(*filtered_row)
filtered_measurements.append(list(f_measurements))
filtered_types.append(list(f_types))
filtered_values.append(list(f_values))
filtered_devices.append(device)
filtered_times.append(time_)
return (
filtered_devices,
filtered_times,
filtered_measurements,
filtered_types,
filtered_values,
)
def create_schema_template(self, template: Template):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
create device template, users using this method should use the template class as an argument
:param template: The template contains multiple child node(see Template.py)
"""
bytes_array = template.serialize
request = TSCreateSchemaTemplateReq(
self.__session_id, template.get_name(), bytes_array
)
try:
return rpc_utils.verify_success(self.__client.createSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.createSchemaTemplate(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def drop_schema_template(self, template_name: str):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
drop device template, this method should be used to the template unset anything
:param template_name: template name
"""
request = TSDropSchemaTemplateReq(self.__session_id, template_name)
try:
return rpc_utils.verify_success(self.__client.dropSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.dropSchemaTemplate(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def add_measurements_in_template(
self,
template_name: str,
measurements_path: list,
data_types: list,
encodings: list,
compressors: list,
is_aligned: bool = False,
):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
add measurements in the template, the template must already create. This function adds some measurements' node.
:param template_name: template name, string list, like ["name_x", "name_y", "name_z"]
:param measurements_path: when ths is_aligned is True, recommend the name like a.b,
like [python.x, python.y, iotdb.z]
:param data_types: using TSDataType(see IoTDBConstants.py)
:param encodings: using TSEncoding(see IoTDBConstants.py)
:param compressors: using Compressor(see IoTDBConstants.py)
:param is_aligned: True is aligned, False is unaligned
"""
request = TSAppendSchemaTemplateReq(
self.__session_id,
template_name,
is_aligned,
measurements_path,
data_types,
encodings,
compressors,
)
try:
return rpc_utils.verify_success(self.__client.appendSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.appendSchemaTemplate(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def delete_node_in_template(self, template_name: str, path: str):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
delete a node in the template, this node must be already in the template
:param template_name: template name
:param path: measurements path
"""
request = TSPruneSchemaTemplateReq(self.__session_id, template_name, path)
try:
return rpc_utils.verify_success(self.__client.pruneSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.pruneSchemaTemplate(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def set_schema_template(self, template_name, prefix_path):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
set template in prefix path, template already exit, prefix path is not measurements
:param template_name: template name
:param prefix_path: prefix path
"""
request = TSSetSchemaTemplateReq(self.__session_id, template_name, prefix_path)
try:
return rpc_utils.verify_success(self.__client.setSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.setSchemaTemplate(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def unset_schema_template(self, template_name, prefix_path):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
unset device template from prefix path, this method unsetting the template from entities,
which have already inserted records using the template, is not supported.
:param template_name: template name
:param prefix_path:
"""
request = TSUnsetSchemaTemplateReq(
self.__session_id, prefix_path, template_name
)
try:
return rpc_utils.verify_success(self.__client.unsetSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return rpc_utils.verify_success(
self.__client.unsetSchemaTemplate(request)
)
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def count_measurements_in_template(self, template_name: str):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
drop device template, this method should be used to the template unset anything
:param template_name: template name
"""
request = TSQueryTemplateReq(
self.__session_id,
template_name,
TemplateQueryType.COUNT_MEASUREMENTS.value,
)
try:
response = self.__client.querySchemaTemplate(request)
return response.count
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.count
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def is_measurement_in_template(self, template_name: str, path: str):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
judge the node in the template is measurement or not, this node must in the template
:param template_name: template name
:param path:
"""
request = TSQueryTemplateReq(
self.__session_id,
template_name,
TemplateQueryType.IS_MEASUREMENT.value,
path,
)
try:
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def is_path_exist_in_template(self, template_name: str, path: str):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
judge whether the node is a measurement or not in the template, this node must be in the template
:param template_name: template name
:param path:
"""
request = TSQueryTemplateReq(
self.__session_id, template_name, TemplateQueryType.PATH_EXIST.value, path
)
try:
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def show_measurements_in_template(self, template_name: str, pattern: str = ""):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
show all measurements under the pattern in template
:param template_name: template name
:param pattern: parent path, if defaulted, show all measurements
"""
request = TSQueryTemplateReq(
self.__session_id,
template_name,
TemplateQueryType.SHOW_MEASUREMENTS.value,
pattern,
)
try:
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def show_all_templates(self):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
show all device templates
"""
request = TSQueryTemplateReq(
self.__session_id,
"",
TemplateQueryType.SHOW_TEMPLATES.value,
)
try:
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def show_paths_template_set_on(self, template_name):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
show the path prefix where a device template is set
:param template_name:
"""
request = TSQueryTemplateReq(
self.__session_id, template_name, TemplateQueryType.SHOW_SET_TEMPLATES.value
)
try:
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
def show_paths_template_using_on(self, template_name):
warnings.warn(
"The APIs about template are deprecated and will be removed in future versions. Use sql instead.",
DeprecationWarning,
stacklevel=2,
)
"""
show the path prefix where a device template is used
:param template_name:
"""
request = TSQueryTemplateReq(
self.__session_id,
template_name,
TemplateQueryType.SHOW_USING_TEMPLATES.value,
)
try:
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None
class SessionConnection(object):
def __init__(
self,
client,
transport,
session_id,
statement_id,
):
self.client = client
self.transport = transport
self.session_id = session_id
self.statement_id = statement_id
def change_database(self, sql):
try:
self.client.executeUpdateStatementV2(
TSExecuteStatementReq(self.session_id, sql, self.statement_id)
)
except TTransport.TException as e:
raise IoTDBConnectionException(
"failed to change database",
e,
) from None
def close_connection(self, req):
try:
self.client.closeSession(req)
except TTransport.TException as e:
raise IoTDBConnectionException(
"Error occurs when closing session at server. Maybe server is down. Error message: ",
e,
) from None
finally:
if self.transport is not None:
self.transport.close()