| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| import logging |
| import struct |
| import time |
| from thrift.protocol import TBinaryProtocol, TCompactProtocol |
| from thrift.transport import TSocket, TTransport |
| |
| from iotdb.utils.SessionDataSet import SessionDataSet |
| from .template.Template import Template |
| from .template.TemplateQueryType import TemplateQueryType |
| from .thrift.rpc.IClientRPCService import ( |
| Client, |
| TSCreateTimeseriesReq, |
| TSCreateAlignedTimeseriesReq, |
| TSInsertRecordReq, |
| TSInsertStringRecordReq, |
| TSInsertTabletReq, |
| TSExecuteStatementReq, |
| TSOpenSessionReq, |
| TSCreateMultiTimeseriesReq, |
| TSCloseSessionReq, |
| TSInsertTabletsReq, |
| TSInsertRecordsReq, |
| TSInsertRecordsOfOneDeviceReq, |
| TSCreateSchemaTemplateReq, |
| TSDropSchemaTemplateReq, |
| TSAppendSchemaTemplateReq, |
| TSPruneSchemaTemplateReq, |
| TSSetSchemaTemplateReq, |
| TSUnsetSchemaTemplateReq, |
| TSQueryTemplateReq, |
| ) |
| from .thrift.rpc.ttypes import ( |
| TSDeleteDataReq, |
| TSProtocolVersion, |
| TSSetTimeZoneReq, |
| TSRawDataQueryReq, |
| TSLastDataQueryReq, |
| TSInsertStringRecordsOfOneDeviceReq, |
| ) |
| # for debug |
| # from IoTDBConstants import * |
| # from SessionDataSet import SessionDataSet |
| # |
| # from thrift.protocol import TBinaryProtocol, TCompactProtocol |
| # from thrift.transport import TSocket, TTransport |
| # |
| # from iotdb.rpc.IClientRPCService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \ |
| # TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \ |
| # TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq |
| # from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq |
| from .utils.IoTDBConstants import TSDataType |
| |
| logger = logging.getLogger("IoTDB") |
| |
| |
| class Session(object): |
| SUCCESS_CODE = 200 |
| DEFAULT_FETCH_SIZE = 10000 |
| DEFAULT_USER = "root" |
| DEFAULT_PASSWORD = "root" |
| DEFAULT_ZONE_ID = time.strftime("%z") |
| |
| def __init__( |
| self, |
| host, |
| port, |
| user=DEFAULT_USER, |
| password=DEFAULT_PASSWORD, |
| fetch_size=DEFAULT_FETCH_SIZE, |
| zone_id=DEFAULT_ZONE_ID, |
| ): |
| self.__host = host |
| self.__port = port |
| self.__user = user |
| self.__password = password |
| self.__fetch_size = fetch_size |
| self.__is_close = True |
| self.__transport = None |
| self.__client = None |
| self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3 |
| self.__session_id = None |
| self.__statement_id = None |
| self.__zone_id = zone_id |
| |
| def open(self, enable_rpc_compression): |
| if not self.__is_close: |
| return |
| self.__transport = TTransport.TFramedTransport( |
| TSocket.TSocket(self.__host, self.__port) |
| ) |
| |
| if not self.__transport.isOpen(): |
| try: |
| self.__transport.open() |
| except TTransport.TTransportException as e: |
| logger.exception("TTransportException!", exc_info=e) |
| |
| if enable_rpc_compression: |
| self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport)) |
| else: |
| self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport)) |
| |
| open_req = TSOpenSessionReq( |
| client_protocol=self.protocol_version, |
| username=self.__user, |
| password=self.__password, |
| zoneId=self.__zone_id, |
| configuration={"version": "V_0_13"}, |
| ) |
| |
| try: |
| open_resp = self.__client.openSession(open_req) |
| |
| if self.protocol_version != open_resp.serverProtocolVersion: |
| logger.exception( |
| "Protocol differ, Client version is {}, but Server version is {}".format( |
| self.protocol_version, open_resp.serverProtocolVersion |
| ) |
| ) |
| # version is less than 0.10 |
| if open_resp.serverProtocolVersion == 0: |
| raise TTransport.TException(message="Protocol not supported.") |
| |
| self.__session_id = open_resp.sessionId |
| self.__statement_id = self.__client.requestStatementId(self.__session_id) |
| |
| except Exception as e: |
| self.__transport.close() |
| logger.exception("session closed because: ", exc_info=e) |
| |
| if self.__zone_id is not None: |
| self.set_time_zone(self.__zone_id) |
| else: |
| self.__zone_id = self.get_time_zone() |
| |
| self.__is_close = False |
| |
| def is_open(self): |
| return not self.__is_close |
| |
| def close(self): |
| if self.__is_close: |
| return |
| req = TSCloseSessionReq(self.__session_id) |
| try: |
| self.__client.closeSession(req) |
| except TTransport.TException as e: |
| logger.exception( |
| "Error occurs when closing session at server. Maybe server is down. Error message: ", |
| exc_info=e, |
| ) |
| finally: |
| self.__is_close = True |
| if self.__transport is not None: |
| self.__transport.close() |
| |
| def set_storage_group(self, group_name): |
| """ |
| set one storage group |
| :param group_name: String, storage group name (starts from root) |
| """ |
| status = self.__client.setStorageGroup(self.__session_id, group_name) |
| logger.debug( |
| "setting storage group {} message: {}".format(group_name, status.message) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def delete_storage_group(self, storage_group): |
| """ |
| delete one storage group. |
| :param storage_group: String, path of the target storage group. |
| """ |
| groups = [storage_group] |
| return self.delete_storage_groups(groups) |
| |
| def delete_storage_groups(self, storage_group_lst): |
| """ |
| delete multiple storage groups. |
| :param storage_group_lst: List, paths of the target storage groups. |
| """ |
| status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) |
| logger.debug( |
| "delete storage group(s) {} message: {}".format( |
| storage_group_lst, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def create_time_series( |
| self, |
| ts_path, |
| data_type, |
| encoding, |
| compressor, |
| props=None, |
| tags=None, |
| attributes=None, |
| alias=None, |
| ): |
| """ |
| create single time series |
| :param ts_path: String, complete time series path (starts from root) |
| :param data_type: TSDataType, data type for this time series |
| :param encoding: TSEncoding, encoding for this time series |
| :param compressor: Compressor, compressing type for this time series |
| :param props: Dictionary, properties for time series |
| :param tags: Dictionary, tag map for time series |
| :param attributes: Dictionary, attribute map for time series |
| :param alias: String, measurement alias for time series |
| """ |
| data_type = data_type.value |
| encoding = encoding.value |
| compressor = compressor.value |
| request = TSCreateTimeseriesReq( |
| self.__session_id, |
| ts_path, |
| data_type, |
| encoding, |
| compressor, |
| props, |
| tags, |
| attributes, |
| alias, |
| ) |
| status = self.__client.createTimeseries(request) |
| logger.debug( |
| "creating time series {} message: {}".format(ts_path, status.message) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def create_aligned_time_series( |
| self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst |
| ): |
| """ |
| create aligned time series |
| :param device_id: String, device id for timeseries (starts from root) |
| :param measurements_lst: List of String, measurement ids for time series |
| :param data_type_lst: List of TSDataType, data types for time series |
| :param encoding_lst: List of TSEncoding, encodings for time series |
| :param compressor_lst: List of Compressor, compressing types for time series |
| """ |
| data_type_lst = [data_type.value for data_type in data_type_lst] |
| encoding_lst = [encoding.value for encoding in encoding_lst] |
| compressor_lst = [compressor.value for compressor in compressor_lst] |
| |
| request = TSCreateAlignedTimeseriesReq( |
| self.__session_id, |
| device_id, |
| measurements_lst, |
| data_type_lst, |
| encoding_lst, |
| compressor_lst, |
| ) |
| status = self.__client.createAlignedTimeseries(request) |
| logger.debug( |
| "creating aligned time series of device {} message: {}".format( |
| measurements_lst, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def create_multi_time_series( |
| self, |
| ts_path_lst, |
| data_type_lst, |
| encoding_lst, |
| compressor_lst, |
| props_lst=None, |
| tags_lst=None, |
| attributes_lst=None, |
| alias_lst=None, |
| ): |
| """ |
| create multiple time series |
| :param ts_path_lst: List of String, complete time series paths (starts from root) |
| :param data_type_lst: List of TSDataType, data types for time series |
| :param encoding_lst: List of TSEncoding, encodings for time series |
| :param compressor_lst: List of Compressor, compressing types for time series |
| :param props_lst: List of Props Dictionary, properties for time series |
| :param tags_lst: List of tag Dictionary, tag maps for time series |
| :param attributes_lst: List of attribute Dictionary, attribute maps for time series |
| :param alias_lst: List of alias, measurement alias for time series |
| """ |
| data_type_lst = [data_type.value for data_type in data_type_lst] |
| encoding_lst = [encoding.value for encoding in encoding_lst] |
| compressor_lst = [compressor.value for compressor in compressor_lst] |
| |
| request = TSCreateMultiTimeseriesReq( |
| self.__session_id, |
| ts_path_lst, |
| data_type_lst, |
| encoding_lst, |
| compressor_lst, |
| props_lst, |
| tags_lst, |
| attributes_lst, |
| alias_lst, |
| ) |
| status = self.__client.createMultiTimeseries(request) |
| logger.debug( |
| "creating multiple time series {} message: {}".format( |
| ts_path_lst, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def delete_time_series(self, paths_list): |
| """ |
| delete multiple time series, including data and schema |
| :param paths_list: List of time series path, which should be complete (starts from root) |
| """ |
| status = self.__client.deleteTimeseries(self.__session_id, paths_list) |
| logger.debug( |
| "deleting multiple time series {} message: {}".format( |
| paths_list, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def check_time_series_exists(self, path): |
| """ |
| check whether a specific time series exists |
| :param path: String, complete path of time series for checking |
| :return Boolean value indicates whether it exists. |
| """ |
| data_set = self.execute_query_statement("SHOW TIMESERIES {}".format(path)) |
| result = data_set.has_next() |
| data_set.close_operation_handle() |
| return result |
| |
| def delete_data(self, paths_list, timestamp): |
| """ |
| delete all data <= time in multiple time series |
| :param paths_list: time series list that the data in. |
| :param timestamp: data with time stamp less than or equal to time will be deleted. |
| """ |
| request = TSDeleteDataReq(self.__session_id, paths_list, timestamp) |
| try: |
| status = self.__client.deleteData(request) |
| logger.debug( |
| "delete data from {}, message: {}".format(paths_list, status.message) |
| ) |
| except TTransport.TException as e: |
| logger.exception("data deletion fails because: ", e) |
| |
| def insert_str_record(self, device_id, timestamp, measurements, string_values): |
| """special case for inserting one row of String (TEXT) value""" |
| if type(string_values) == str: |
| string_values = [string_values] |
| if type(measurements) == str: |
| measurements = [measurements] |
| data_types = [TSDataType.TEXT.value for _ in string_values] |
| request = self.gen_insert_str_record_req( |
| device_id, timestamp, measurements, data_types, string_values |
| ) |
| status = self.__client.insertStringRecord(request) |
| logger.debug( |
| "insert one record to device {} message: {}".format( |
| device_id, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_str_record( |
| self, device_id, timestamp, measurements, string_values |
| ): |
| """special case for inserting one row of String (TEXT) value""" |
| if type(string_values) == str: |
| string_values = [string_values] |
| if type(measurements) == str: |
| measurements = [measurements] |
| data_types = [TSDataType.TEXT.value for _ in string_values] |
| request = self.gen_insert_str_record_req( |
| device_id, timestamp, measurements, data_types, string_values, True |
| ) |
| status = self.__client.insertStringRecord(request) |
| logger.debug( |
| "insert one record to device {} message: {}".format( |
| device_id, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_record(self, device_id, timestamp, measurements, data_types, values): |
| """ |
| insert one row of record into database, if you want improve your performance, please use insertTablet method |
| for example a record at time=10086 with three measurements is: |
| timestamp, m1, m2, m3 |
| 10086, 125.3, True, text1 |
| :param device_id: String, time series path for device |
| :param timestamp: Integer, indicate the timestamp of the row of data |
| :param measurements: List of String, sensor names |
| :param data_types: List of TSDataType, indicate the data type for each sensor |
| :param values: List, values to be inserted, for each sensor |
| """ |
| data_types = [data_type.value for data_type in data_types] |
| request = self.gen_insert_record_req( |
| device_id, timestamp, measurements, data_types, values |
| ) |
| status = self.__client.insertRecord(request) |
| logger.debug( |
| "insert one record to device {} message: {}".format( |
| device_id, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_records( |
| self, device_ids, times, measurements_lst, types_lst, values_lst |
| ): |
| """ |
| insert multiple rows of data, records are independent to each other, in other words, there's no relationship |
| between those records |
| :param device_ids: List of String, time series paths for device |
| :param times: List of Integer, timestamps for records |
| :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device |
| :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device |
| :param values_lst: 2-D List, values to be inserted, for each device |
| """ |
| type_values_lst = [] |
| for types in types_lst: |
| data_types = [data_type.value for data_type in types] |
| type_values_lst.append(data_types) |
| request = self.gen_insert_records_req( |
| device_ids, times, measurements_lst, type_values_lst, values_lst |
| ) |
| status = self.__client.insertRecords(request) |
| logger.debug( |
| "insert multiple records to devices {} message: {}".format( |
| device_ids, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_record( |
| self, device_id, timestamp, measurements, data_types, values |
| ): |
| """ |
| insert one row of aligned record into database, if you want improve your performance, please use insertTablet method |
| for example a record at time=10086 with three measurements is: |
| timestamp, m1, m2, m3 |
| 10086, 125.3, True, text1 |
| :param device_id: String, time series path for device |
| :param timestamp: Integer, indicate the timestamp of the row of data |
| :param measurements: List of String, sensor names |
| :param data_types: List of TSDataType, indicate the data type for each sensor |
| :param values: List, values to be inserted, for each sensor |
| """ |
| data_types = [data_type.value for data_type in data_types] |
| request = self.gen_insert_record_req( |
| device_id, timestamp, measurements, data_types, values, True |
| ) |
| status = self.__client.insertRecord(request) |
| logger.debug( |
| "insert one record to device {} message: {}".format( |
| device_id, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_records( |
| self, device_ids, times, measurements_lst, types_lst, values_lst |
| ): |
| """ |
| insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship |
| between those records |
| :param device_ids: List of String, time series paths for device |
| :param times: List of Integer, timestamps for records |
| :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device |
| :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device |
| :param values_lst: 2-D List, values to be inserted, for each device |
| """ |
| type_values_lst = [] |
| for types in types_lst: |
| data_types = [data_type.value for data_type in types] |
| type_values_lst.append(data_types) |
| request = self.gen_insert_records_req( |
| device_ids, times, measurements_lst, type_values_lst, values_lst, True |
| ) |
| status = self.__client.insertRecords(request) |
| logger.debug( |
| "insert multiple records to devices {} message: {}".format( |
| device_ids, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def test_insert_record( |
| self, device_id, timestamp, measurements, data_types, values |
| ): |
| """ |
| this method NOT insert data into database and the server just return after accept the request, this method |
| should be used to test other time cost in client |
| :param device_id: String, time series path for device |
| :param timestamp: Integer, indicate the timestamp of the row of data |
| :param measurements: List of String, sensor names |
| :param data_types: List of TSDataType, indicate the data type for each sensor |
| :param values: List, values to be inserted, for each sensor |
| """ |
| data_types = [data_type.value for data_type in data_types] |
| request = self.gen_insert_record_req( |
| device_id, timestamp, measurements, data_types, values |
| ) |
| status = self.__client.testInsertRecord(request) |
| logger.debug( |
| "testing! insert one record to device {} message: {}".format( |
| device_id, status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def test_insert_records( |
| self, device_ids, times, measurements_lst, types_lst, values_lst |
| ): |
| """ |
| this method NOT insert data into database and the server just return after accept the request, this method |
| should be used to test other time cost in client |
| :param device_ids: List of String, time series paths for device |
| :param times: List of Integer, timestamps for records |
| :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device |
| :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device |
| :param values_lst: 2-D List, values to be inserted, for each device |
| """ |
| type_values_lst = [] |
| for types in types_lst: |
| data_types = [data_type.value for data_type in types] |
| type_values_lst.append(data_types) |
| request = self.gen_insert_records_req( |
| device_ids, times, measurements_lst, type_values_lst, values_lst |
| ) |
| status = self.__client.testInsertRecords(request) |
| logger.debug( |
| "testing! insert multiple records, message: {}".format(status.message) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def gen_insert_record_req( |
| self, device_id, timestamp, measurements, data_types, values, is_aligned=False |
| ): |
| if (len(values) != len(data_types)) or (len(values) != len(measurements)): |
| raise RuntimeError( |
| "length of data types does not equal to length of values!" |
| ) |
| values_in_bytes = Session.value_to_bytes(data_types, values) |
| return TSInsertRecordReq( |
| self.__session_id, |
| device_id, |
| measurements, |
| values_in_bytes, |
| timestamp, |
| is_aligned, |
| ) |
| |
| def gen_insert_str_record_req( |
| self, device_id, timestamp, measurements, data_types, values, is_aligned=False |
| ): |
| if (len(values) != len(data_types)) or (len(values) != len(measurements)): |
| raise RuntimeError( |
| "length of data types does not equal to length of values!" |
| ) |
| return TSInsertStringRecordReq( |
| self.__session_id, device_id, measurements, values, timestamp, is_aligned |
| ) |
| |
| def gen_insert_records_req( |
| self, |
| device_ids, |
| times, |
| measurements_lst, |
| types_lst, |
| values_lst, |
| is_aligned=False, |
| ): |
| if ( |
| (len(device_ids) != len(measurements_lst)) |
| or (len(times) != len(types_lst)) |
| or (len(device_ids) != len(times)) |
| or (len(times) != len(values_lst)) |
| ): |
| raise RuntimeError( |
| "deviceIds, times, measurementsList and valuesList's size should be equal" |
| ) |
| |
| value_lst = [] |
| for values, data_types, measurements in zip( |
| values_lst, types_lst, measurements_lst |
| ): |
| if (len(values) != len(data_types)) or (len(values) != len(measurements)): |
| raise RuntimeError( |
| "deviceIds, times, measurementsList and valuesList's size should be equal" |
| ) |
| values_in_bytes = Session.value_to_bytes(data_types, values) |
| value_lst.append(values_in_bytes) |
| |
| return TSInsertRecordsReq( |
| self.__session_id, |
| device_ids, |
| measurements_lst, |
| value_lst, |
| times, |
| is_aligned, |
| ) |
| |
| def insert_tablet(self, tablet): |
| """ |
| insert one tablet, in a tablet, for each timestamp, the number of measurements is same |
| for example three records in the same device can form a tablet: |
| timestamps, m1, m2, m3 |
| 1, 125.3, True, text1 |
| 2, 111.6, False, text2 |
| 3, 688.6, True, text3 |
| Notice: From 0.13.0, the tablet can contain empty cell |
| The tablet itself is sorted (see docs of Tablet.py) |
| :param tablet: a tablet specified above |
| """ |
| status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet)) |
| logger.debug( |
| "insert one tablet to device {} message: {}".format( |
| tablet.get_device_id(), status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_tablets(self, tablet_lst): |
| """ |
| insert multiple tablets, tablets are independent to each other |
| :param tablet_lst: List of tablets |
| """ |
| status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst)) |
| logger.debug("insert multiple tablets, message: {}".format(status.message)) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_tablet(self, tablet): |
| """ |
| insert one aligned tablet, in a tablet, for each timestamp, the number of measurements is same |
| for example three records in the same device can form a tablet: |
| timestamps, m1, m2, m3 |
| 1, 125.3, True, text1 |
| 2, 111.6, False, text2 |
| 3, 688.6, True, text3 |
| Notice: From 0.13.0, the tablet can contain empty cell |
| The tablet itself is sorted (see docs of Tablet.py) |
| :param tablet: a tablet specified above |
| """ |
| status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True)) |
| logger.debug( |
| "insert one tablet to device {} message: {}".format( |
| tablet.get_device_id(), status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_tablets(self, tablet_lst): |
| """ |
| insert multiple aligned tablets, tablets are independent to each other |
| :param tablet_lst: List of tablets |
| """ |
| status = self.__client.insertTablets( |
| self.gen_insert_tablets_req(tablet_lst, True) |
| ) |
| logger.debug("insert multiple tablets, message: {}".format(status.message)) |
| |
| return Session.verify_success(status) |
| |
| def insert_records_of_one_device( |
| self, device_id, times_list, measurements_list, types_list, values_list |
| ): |
| # sort by timestamp |
| sorted_zipped = sorted( |
| zip(times_list, measurements_list, types_list, values_list) |
| ) |
| result = zip(*sorted_zipped) |
| times_list, measurements_list, types_list, values_list = [ |
| list(x) for x in result |
| ] |
| |
| return self.insert_records_of_one_device_sorted( |
| device_id, times_list, measurements_list, types_list, values_list |
| ) |
| |
| def insert_records_of_one_device_sorted( |
| self, device_id, times_list, measurements_list, types_list, values_list |
| ): |
| """ |
| Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc |
| executeBatch, we pack some insert request in batch and send them to server. If you want improve |
| your performance, please see insertTablet method |
| |
| :param device_id: device id |
| :param times_list: timestamps list |
| :param measurements_list: measurements list |
| :param types_list: types list |
| :param values_list: values list |
| :param have_sorted: have these list been sorted by timestamp |
| """ |
| # check parameter |
| size = len(times_list) |
| if ( |
| size != len(measurements_list) |
| or size != len(types_list) |
| or size != len(values_list) |
| ): |
| raise RuntimeError( |
| "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" |
| ) |
| |
| # check sorted |
| if not Session.check_sorted(times_list): |
| raise RuntimeError( |
| "insert records of one device error: timestamp not sorted" |
| ) |
| |
| request = self.gen_insert_records_of_one_device_request( |
| device_id, times_list, measurements_list, values_list, types_list |
| ) |
| |
| # send request |
| status = self.__client.insertRecordsOfOneDevice(request) |
| logger.debug("insert records of one device, message: {}".format(status.message)) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_records_of_one_device( |
| self, device_id, times_list, measurements_list, types_list, values_list |
| ): |
| # sort by timestamp |
| sorted_zipped = sorted( |
| zip(times_list, measurements_list, types_list, values_list) |
| ) |
| result = zip(*sorted_zipped) |
| times_list, measurements_list, types_list, values_list = [ |
| list(x) for x in result |
| ] |
| |
| return self.insert_aligned_records_of_one_device_sorted( |
| device_id, times_list, measurements_list, types_list, values_list |
| ) |
| |
| def insert_aligned_records_of_one_device_sorted( |
| self, device_id, times_list, measurements_list, types_list, values_list |
| ): |
| """ |
| Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc |
| executeBatch, we pack some insert request in batch and send them to server. If you want to improve |
| your performance, please see insertTablet method |
| |
| :param device_id: device id |
| :param times_list: timestamps list |
| :param measurements_list: measurements list |
| :param types_list: types list |
| :param values_list: values list |
| """ |
| # check parameter |
| size = len(times_list) |
| if ( |
| size != len(measurements_list) |
| or size != len(types_list) |
| or size != len(values_list) |
| ): |
| raise RuntimeError( |
| "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" |
| ) |
| |
| # check sorted |
| if not Session.check_sorted(times_list): |
| raise RuntimeError( |
| "insert records of one device error: timestamp not sorted" |
| ) |
| |
| request = self.gen_insert_records_of_one_device_request( |
| device_id, times_list, measurements_list, values_list, types_list, True |
| ) |
| |
| # send request |
| status = self.__client.insertRecordsOfOneDevice(request) |
| logger.debug("insert records of one device, message: {}".format(status.message)) |
| |
| return Session.verify_success(status) |
| |
| def gen_insert_records_of_one_device_request( |
| self, |
| device_id, |
| times_list, |
| measurements_list, |
| values_list, |
| types_list, |
| is_aligned=False, |
| ): |
| binary_value_list = [] |
| for values, data_types, measurements in zip( |
| values_list, types_list, measurements_list |
| ): |
| data_types = [data_type.value for data_type in data_types] |
| if (len(values) != len(data_types)) or (len(values) != len(measurements)): |
| raise RuntimeError( |
| "insert records of one device error: deviceIds, times, measurementsList and valuesList's size should be equal" |
| ) |
| values_in_bytes = Session.value_to_bytes(data_types, values) |
| binary_value_list.append(values_in_bytes) |
| |
| return TSInsertRecordsOfOneDeviceReq( |
| self.__session_id, |
| device_id, |
| measurements_list, |
| binary_value_list, |
| times_list, |
| is_aligned, |
| ) |
| |
| def test_insert_tablet(self, tablet): |
| """ |
| this method NOT insert data into database and the server just return after accept the request, this method |
| should be used to test other time cost in client |
| :param tablet: a tablet of data |
| """ |
| status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet)) |
| logger.debug( |
| "testing! insert one tablet to device {} message: {}".format( |
| tablet.get_device_id(), status.message |
| ) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def test_insert_tablets(self, tablet_list): |
| """ |
| this method NOT insert data into database and the server just return after accept the request, this method |
| should be used to test other time cost in client |
| :param tablet_list: List of tablets |
| """ |
| status = self.__client.testInsertTablets( |
| self.gen_insert_tablets_req(tablet_list) |
| ) |
| logger.debug( |
| "testing! insert multiple tablets, message: {}".format(status.message) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def gen_insert_tablet_req(self, tablet, is_aligned=False): |
| data_type_values = [data_type.value for data_type in tablet.get_data_types()] |
| return TSInsertTabletReq( |
| self.__session_id, |
| tablet.get_device_id(), |
| tablet.get_measurements(), |
| tablet.get_binary_values(), |
| tablet.get_binary_timestamps(), |
| data_type_values, |
| tablet.get_row_number(), |
| is_aligned, |
| ) |
| |
| def gen_insert_tablets_req(self, tablet_lst, is_aligned=False): |
| device_id_lst = [] |
| measurements_lst = [] |
| values_lst = [] |
| timestamps_lst = [] |
| type_lst = [] |
| size_lst = [] |
| for tablet in tablet_lst: |
| data_type_values = [ |
| data_type.value for data_type in tablet.get_data_types() |
| ] |
| device_id_lst.append(tablet.get_device_id()) |
| measurements_lst.append(tablet.get_measurements()) |
| values_lst.append(tablet.get_binary_values()) |
| timestamps_lst.append(tablet.get_binary_timestamps()) |
| type_lst.append(data_type_values) |
| size_lst.append(tablet.get_row_number()) |
| return TSInsertTabletsReq( |
| self.__session_id, |
| device_id_lst, |
| measurements_lst, |
| values_lst, |
| timestamps_lst, |
| type_lst, |
| size_lst, |
| is_aligned, |
| ) |
| |
| def execute_query_statement(self, sql, timeout=0): |
| """ |
| execute query sql statement and returns SessionDataSet |
| :param sql: String, query sql statement |
| :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py) |
| """ |
| request = TSExecuteStatementReq( |
| self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout |
| ) |
| resp = self.__client.executeQueryStatement(request) |
| return SessionDataSet( |
| sql, |
| resp.columns, |
| resp.dataTypeList, |
| resp.columnNameIndexMap, |
| resp.queryId, |
| self.__client, |
| self.__statement_id, |
| self.__session_id, |
| resp.queryDataSet, |
| resp.ignoreTimeStamp, |
| ) |
| |
| def execute_non_query_statement(self, sql): |
| """ |
| execute non-query sql statement |
| :param sql: String, non-query sql statement |
| """ |
| request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id) |
| try: |
| resp = self.__client.executeUpdateStatement(request) |
| status = resp.status |
| logger.debug( |
| "execute non-query statement {} message: {}".format(sql, status.message) |
| ) |
| return Session.verify_success(status) |
| except TTransport.TException as e: |
| raise RuntimeError("execution of non-query statement fails because: ", e) |
| |
| @staticmethod |
| def value_to_bytes(data_types, values): |
| format_str_list = [">"] |
| values_tobe_packed = [] |
| for data_type, value in zip(data_types, values): |
| if data_type == TSDataType.BOOLEAN.value: |
| format_str_list.append("c") |
| format_str_list.append("?") |
| values_tobe_packed.append(bytes([TSDataType.BOOLEAN.value])) |
| values_tobe_packed.append(value) |
| elif data_type == TSDataType.INT32.value: |
| format_str_list.append("c") |
| format_str_list.append("i") |
| values_tobe_packed.append(bytes([TSDataType.INT32.value])) |
| values_tobe_packed.append(value) |
| elif data_type == TSDataType.INT64.value: |
| format_str_list.append("c") |
| format_str_list.append("q") |
| values_tobe_packed.append(bytes([TSDataType.INT64.value])) |
| values_tobe_packed.append(value) |
| elif data_type == TSDataType.FLOAT.value: |
| format_str_list.append("c") |
| format_str_list.append("f") |
| values_tobe_packed.append(bytes([TSDataType.FLOAT.value])) |
| values_tobe_packed.append(value) |
| elif data_type == TSDataType.DOUBLE.value: |
| format_str_list.append("c") |
| format_str_list.append("d") |
| values_tobe_packed.append(bytes([TSDataType.DOUBLE.value])) |
| values_tobe_packed.append(value) |
| elif data_type == TSDataType.TEXT.value: |
| value_bytes = bytes(value, "utf-8") |
| format_str_list.append("c") |
| format_str_list.append("i") |
| format_str_list.append(str(len(value_bytes))) |
| format_str_list.append("s") |
| values_tobe_packed.append(bytes([TSDataType.TEXT.value])) |
| values_tobe_packed.append(len(value_bytes)) |
| values_tobe_packed.append(value_bytes) |
| else: |
| raise RuntimeError("Unsupported data type:" + str(data_type)) |
| format_str = "".join(format_str_list) |
| return struct.pack(format_str, *values_tobe_packed) |
| |
| def get_time_zone(self): |
| if self.__zone_id is not None: |
| return self.__zone_id |
| try: |
| resp = self.__client.getTimeZone(self.__session_id) |
| except TTransport.TException as e: |
| raise RuntimeError("Could not get time zone because: ", e) |
| return resp.timeZone |
| |
| def set_time_zone(self, zone_id): |
| request = TSSetTimeZoneReq(self.__session_id, zone_id) |
| try: |
| status = self.__client.setTimeZone(request) |
| logger.debug( |
| "setting time zone_id as {}, message: {}".format( |
| zone_id, status.message |
| ) |
| ) |
| except TTransport.TException as e: |
| raise RuntimeError("Could not set time zone because: ", e) |
| self.__zone_id = zone_id |
| |
| @staticmethod |
| def check_sorted(timestamps): |
| for i in range(1, len(timestamps)): |
| if timestamps[i] < timestamps[i - 1]: |
| return False |
| return True |
| |
| @staticmethod |
| def verify_success(status): |
| """ |
| verify success of operation |
| :param status: execution result status |
| """ |
| if status.code == Session.SUCCESS_CODE: |
| return 0 |
| |
| logger.error("error status is %s", status) |
| return -1 |
| |
| def execute_raw_data_query( |
| self, paths: list, start_time: int, end_time: int |
| ) -> SessionDataSet: |
| """ |
| execute query statement and returns SessionDataSet |
| :param paths: String path list |
| :param start_time: Query start time |
| :param end_time: Query end time |
| :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py) |
| """ |
| request = TSRawDataQueryReq( |
| self.__session_id, |
| paths, |
| self.__fetch_size, |
| startTime=start_time, |
| endTime=end_time, |
| statementId=self.__statement_id, |
| enableRedirectQuery=False, |
| ) |
| resp = self.__client.executeRawDataQuery(request) |
| return SessionDataSet( |
| "", |
| resp.columns, |
| resp.dataTypeList, |
| resp.columnNameIndexMap, |
| resp.queryId, |
| self.__client, |
| self.__statement_id, |
| self.__session_id, |
| resp.queryDataSet, |
| resp.ignoreTimeStamp, |
| ) |
| |
| def execute_last_data_query(self, paths: list, last_time: int) -> SessionDataSet: |
| """ |
| execute query statement and returns SessionDataSet |
| :param paths: String path list |
| :param last_time: Query last time |
| :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py) |
| """ |
| request = TSLastDataQueryReq( |
| self.__session_id, |
| paths, |
| self.__fetch_size, |
| last_time, |
| self.__statement_id, |
| enableRedirectQuery=False, |
| ) |
| |
| resp = self.__client.executeLastDataQuery(request) |
| return SessionDataSet( |
| "", |
| resp.columns, |
| resp.dataTypeList, |
| resp.columnNameIndexMap, |
| resp.queryId, |
| self.__client, |
| self.__statement_id, |
| self.__session_id, |
| resp.queryDataSet, |
| resp.ignoreTimeStamp, |
| ) |
| |
| def insert_string_records_of_one_device( |
| self, |
| device_id: str, |
| times: list, |
| measurements_list: list, |
| values_list: list, |
| have_sorted: bool = False, |
| ): |
| """ |
| insert multiple row of string record into database: |
| timestamp, m1, m2, m3 |
| 0, text1, text2, text3 |
| :param device_id: String, device id |
| :param times: Timestamp list |
| :param measurements_list: Measurements list |
| :param values_list: Value list |
| :param have_sorted: have these list been sorted by timestamp |
| """ |
| if (len(times) != len(measurements_list)) or (len(times) != len(values_list)): |
| raise RuntimeError( |
| "insert records of one device error: times, measurementsList and valuesList's size should be equal!" |
| ) |
| request = self.gen_insert_string_records_of_one_device_request( |
| device_id, times, measurements_list, values_list, have_sorted, False |
| ) |
| status = self.__client.insertStringRecordsOfOneDevice(request) |
| logger.debug( |
| "insert one device {} message: {}".format(device_id, status.message) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def insert_aligned_string_records_of_one_device( |
| self, |
| device_id: str, |
| times: list, |
| measurements_list: list, |
| values: list, |
| have_sorted: bool = False, |
| ): |
| if (len(times) != len(measurements_list)) or (len(times) != len(values)): |
| raise RuntimeError( |
| "insert records of one device error: times, measurementsList and valuesList's size should be equal!" |
| ) |
| request = self.gen_insert_string_records_of_one_device_request( |
| device_id, times, measurements_list, values, have_sorted, True |
| ) |
| status = self.__client.insertStringRecordsOfOneDevice(request) |
| logger.debug( |
| "insert one device {} message: {}".format(device_id, status.message) |
| ) |
| |
| return Session.verify_success(status) |
| |
| def gen_insert_string_records_of_one_device_request( |
| self, |
| device_id, |
| times, |
| measurements_list, |
| values_list, |
| have_sorted, |
| is_aligned=False, |
| ): |
| if (len(times) != len(measurements_list)) or (len(times) != len(values_list)): |
| raise RuntimeError( |
| "insert records of one device error: times, measurementsList and valuesList's size should be equal!" |
| ) |
| if not Session.check_sorted(times): |
| # sort by timestamp |
| sorted_zipped = sorted(zip(times, measurements_list, values_list)) |
| result = zip(*sorted_zipped) |
| times_list, measurements_list, values_list = [list(x) for x in result] |
| request = TSInsertStringRecordsOfOneDeviceReq( |
| self.__session_id, |
| device_id, |
| measurements_list, |
| values_list, |
| times, |
| is_aligned, |
| ) |
| return request |
| |
| def create_schema_template(self, template: Template): |
| """ |
| create schema template, users using this method should use the template class as an argument |
| :param template: The template contains multiple child node(see Template.py) |
| """ |
| bytes_array = template.serialize |
| request = TSCreateSchemaTemplateReq( |
| self.__session_id, template.get_name(), bytes_array |
| ) |
| status = self.__client.createSchemaTemplate(request) |
| logger.debug( |
| "create one template {} template name: {}".format( |
| self.__session_id, template.get_name() |
| ) |
| ) |
| return Session.verify_success(status) |
| |
| def drop_schema_template(self, template_name: str): |
| """ |
| drop schema template, this method should be used to the template unset anything |
| :param template_name: template name |
| """ |
| request = TSDropSchemaTemplateReq(self.__session_id, template_name) |
| status = self.__client.dropSchemaTemplate(request) |
| logger.debug( |
| "drop one template {} template name: {}".format( |
| self.__session_id, template_name |
| ) |
| ) |
| return Session.verify_success(status) |
| |
| def execute_statement(self, sql: str, timeout=0): |
| request = TSExecuteStatementReq( |
| self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout |
| ) |
| try: |
| resp = self.__client.executeStatement(request) |
| status = resp.status |
| logger.debug("execute statement {} message: {}".format(sql, status.message)) |
| if Session.verify_success(status) == 0: |
| if resp.columns: |
| return SessionDataSet( |
| sql, |
| resp.columns, |
| resp.dataTypeList, |
| resp.columnNameIndexMap, |
| resp.queryId, |
| self.__client, |
| self.__statement_id, |
| self.__session_id, |
| resp.queryDataSet, |
| resp.ignoreTimeStamp, |
| ) |
| else: |
| return None |
| else: |
| raise RuntimeError( |
| "execution of statement fails because: {}", status.message |
| ) |
| except TTransport.TException as e: |
| raise RuntimeError("execution of statement fails because: ", e) |
| |
| def add_measurements_in_template( |
| self, |
| template_name: str, |
| measurements_path: list, |
| data_types: list, |
| encodings: list, |
| compressors: list, |
| is_aligned: bool = False, |
| ): |
| """ |
| add measurements in the template, the template must already create. This function adds some measurements node. |
| :param template_name: template name, string list, like ["name_x", "name_y", "name_z"] |
| :param measurements_path: when ths is_aligned is True, recommend the name like a.b, like [python.x, python.y, iotdb.z] |
| :param data_types: using TSDataType(see IoTDBConstants.py) |
| :param encodings: using TSEncoding(see IoTDBConstants.py) |
| :param compressors: using Compressor(see IoTDBConstants.py) |
| :param is_aligned: True is aligned, False is unaligned |
| """ |
| request = TSAppendSchemaTemplateReq( |
| self.__session_id, |
| template_name, |
| is_aligned, |
| measurements_path, |
| list(map(lambda x: x.value, data_types)), |
| list(map(lambda x: x.value, encodings)), |
| list(map(lambda x: x.value, compressors)), |
| ) |
| status = self.__client.appendSchemaTemplate(request) |
| logger.debug( |
| "append unaligned template {} template name: {}".format( |
| self.__session_id, template_name |
| ) |
| ) |
| return Session.verify_success(status) |
| |
| def delete_node_in_template(self, template_name: str, path: str): |
| """ |
| delete a node in the template, this node must be already in the template |
| :param template_name: template name |
| :param path: measurements path |
| """ |
| request = TSPruneSchemaTemplateReq(self.__session_id, template_name, path) |
| status = self.__client.pruneSchemaTemplate(request) |
| logger.debug( |
| "append unaligned template {} template name: {}".format( |
| self.__session_id, template_name |
| ) |
| ) |
| return Session.verify_success(status) |
| |
| def set_schema_template(self, template_name, prefix_path): |
| """ |
| set template in prefix path, template already exit, prefix path is not measurements |
| :param template_name: template name |
| :param prefix_path: prefix path |
| """ |
| request = TSSetSchemaTemplateReq(self.__session_id, template_name, prefix_path) |
| status = self.__client.setSchemaTemplate(request) |
| logger.debug( |
| "set schema template to path{} template name: {}, path:{}".format( |
| self.__session_id, template_name, prefix_path |
| ) |
| ) |
| return Session.verify_success(status) |
| |
| def unset_schema_template(self, template_name, prefix_path): |
| """ |
| unset schema template from prefix path, this method unsetting the template from entities, |
| which have already inserted records using the template, is not supported. |
| :param template_name: template name |
| :param prefix_path: |
| """ |
| request = TSUnsetSchemaTemplateReq( |
| self.__session_id, prefix_path, template_name |
| ) |
| status = self.__client.unsetSchemaTemplate(request) |
| logger.debug( |
| "set schema template to path{} template name: {}, path:{}".format( |
| self.__session_id, template_name, prefix_path |
| ) |
| ) |
| return Session.verify_success(status) |
| |
| def count_measurements_in_template(self, template_name: str): |
| """ |
| drop schema template, this method should be used to the template unset anything |
| :param template_name: template name |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, |
| template_name, |
| TemplateQueryType.COUNT_MEASUREMENTS.value, |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "count measurements template {}, template name is {}, count is {}".format( |
| self.__session_id, template_name, response.measurements |
| ) |
| ) |
| return response.count |
| |
| def is_measurement_in_template(self, template_name: str, path: str): |
| """ |
| judge the node in the template is measurement or not, this node must in the template |
| :param template_name: template name |
| :param path: |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, |
| template_name, |
| TemplateQueryType.IS_MEASUREMENT.value, |
| path, |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "judge the path is measurement or not in template {}, template name is {}, result is {}".format( |
| self.__session_id, template_name, response.result |
| ) |
| ) |
| return response.result |
| |
| def is_path_exist_in_template(self, template_name: str, path: str): |
| """ |
| judge whether the node is a measurement or not in the template, this node must be in the template |
| :param template_name: template name |
| :param path: |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, template_name, TemplateQueryType.PATH_EXIST.value, path |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "judge the path is in template or not {}, template name is {}, result is {}".format( |
| self.__session_id, template_name, response.result |
| ) |
| ) |
| return response.result |
| |
| def show_measurements_in_template(self, template_name: str, pattern: str = ""): |
| """ |
| show all measurements under the pattern in template |
| :param template_name: template name |
| :param pattern: parent path, if default, show all measurements |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, |
| template_name, |
| TemplateQueryType.SHOW_MEASUREMENTS.value, |
| pattern, |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "show measurements in template {}, template name is {}, result is {}".format( |
| self.__session_id, template_name, response.measurements |
| ) |
| ) |
| return response.measurements |
| |
| def show_all_templates(self): |
| """ |
| show all schema templates |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, |
| "", |
| TemplateQueryType.SHOW_TEMPLATES.value, |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "show all template {}, measurements is {}".format( |
| self.__session_id, response.measurements |
| ) |
| ) |
| return response.measurements |
| |
| def show_paths_template_set_on(self, template_name): |
| """ |
| show the path prefix where a schema template is set |
| :param template_name: |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, template_name, TemplateQueryType.SHOW_SET_TEMPLATES.value |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "show paths template set {}, on {}".format( |
| self.__session_id, response.measurements |
| ) |
| ) |
| return response.measurements |
| |
| def show_paths_template_using_on(self, template_name): |
| """ |
| show the path prefix where a schema template is used |
| :param template_name: |
| """ |
| request = TSQueryTemplateReq( |
| self.__session_id, |
| template_name, |
| TemplateQueryType.SHOW_USING_TEMPLATES.value, |
| ) |
| response = self.__client.querySchemaTemplate(request) |
| logger.debug( |
| "show paths template using {}, on {}".format( |
| self.__session_id, response.measurements |
| ) |
| ) |
| return response.measurements |