You have to install thrift (>=0.13) before using the package.
First, download the package: pip3 install apache-iotdb>=2.0
You can get an example of using the package to read and write data at here:Session Example
An example of aligned timeseries: Aligned Timeseries Session Example
(you need to add import iotdb in the head of the file)
Or:
from iotdb.Session import Session ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "TimechoDB@2021" //Before V2.0.6.x the default password is root session = Session(ip, port_, username_, password_) session.open(False) zone = session.get_time_zone() session.close()
session = Session( ip="127.0.0.1", port="6667", user="root", password="TimechoDB@2021", //Before V2.0.6.x the default password is root fetch_size=1024, zone_id="UTC+8", enable_redirection=True )
session = Session.init_from_node_urls( node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], user="root", password="TimechoDB@2021", //Before V2.0.6.x the default password is root fetch_size=1024, zone_id="UTC+8", enable_redirection=True )
session.open(enable_rpc_compression=False)
Notice: this RPC compression status of client must comply with that of IoTDB server
session.close()
Utilizing SessionPool to manage sessions eliminates the need to worry about session reuse. When the number of session connections reaches the maximum capacity of the pool, requests for acquiring a session will be blocked, and you can set the blocking wait time through parameters. After using a session, it should be returned to the SessionPool using the putBack method for proper management.
pool_config = PoolConfig(host=ip,port=port, user_name=username, password=password, fetch_size=1024, time_zone="UTC+8", max_retry=3) max_pool_size = 5 wait_timeout_in_ms = 3000 # # Create the connection pool session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
pool_config = PoolConfig(node_urls=node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], user_name=username, password=password, fetch_size=1024, time_zone="UTC+8", max_retry=3) max_pool_size = 5 wait_timeout_in_ms = 3000
session = session_pool.get_session() session.set_storage_group(STORAGE_GROUP_NAME) session.create_time_series( TIMESERIES_PATH, TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY ) # After usage, return the session using putBack session_pool.put_back(session) # When closing the sessionPool, all managed sessions will be closed as well session_pool.close()
In the conf/iotdb-system.properties configuration file, locate or add the following configuration items:
enable_thrift_ssl=true key_store_path=/path/to/your/server_keystore.jks key_store_pwd=your_keystore_password
use_ssl to True to enable SSL.ca_certs parameter.use_ssl = True ca_certs = "/path/to/your/server.crt" # 或 ca_certs = "/path/to/your//ca_cert.pem"
Example Code: Using SSL to Connect to IoTDB
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from iotdb.SessionPool import PoolConfig, SessionPool from iotdb.Session import Session ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "TimechoDB@2021" //Before V2.0.6.x the default password is root # Configure SSL enabled use_ssl = True # Configure certificate path ca_certs = "/path/server.crt" def get_data(): session = Session( ip, port_, username_, password_, use_ssl=use_ssl, ca_certs=ca_certs ) session.open(False) result = session.execute_query_statement("select * from root.eg.etth") df = result.todf() df.rename(columns={"Time": "date"}, inplace=True) session.close() return df def get_data2(): pool_config = PoolConfig( host=ip, port=port_, user_name=username_, password=password_, fetch_size=1024, time_zone="UTC+8", max_retry=3, use_ssl=use_ssl, ca_certs=ca_certs, ) max_pool_size = 5 wait_timeout_in_ms = 3000 session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms) session = session_pool.get_session() result = session.execute_query_statement("select * from root.eg.etth") df = result.todf() df.rename(columns={"Time": "date"}, inplace=True) session_pool.put_back(session) session_pool.close() if __name__ == "__main__": df = get_data()
session.set_storage_group(group_name)
session.delete_storage_group(group_name) session.delete_storage_groups(group_name_lst)
session.create_time_series(ts_path, data_type, encoding, compressor, props=None, tags=None, attributes=None, alias=None) session.create_multi_time_series( ts_path_lst, data_type_lst, encoding_lst, compressor_lst, props_lst=None, tags_lst=None, attributes_lst=None, alias_lst=None )
session.create_aligned_time_series( device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst )
Attention: Alias of measurements are not supported currently.
session.delete_time_series(paths_list)
session.check_time_series_exists(path)
It is recommended to use insertTablet to help improve write efficiency.
We have two implementations of Tablet in Python API.
values_ = [ [False, 10, 11, 1.1, 10011.1, "test01"], [True, 100, 11111, 1.25, 101.0, "test02"], [False, 100, 1, 188.1, 688.25, "test03"], [True, 0, 0, 0, 6.25, "test04"], ] timestamps_ = [1, 2, 3, 4] tablet_ = Tablet( device_id, measurements_, data_types_, values_, timestamps_ ) session.insert_tablet(tablet_) values_ = [ [None, 10, 11, 1.1, 10011.1, "test01"], [True, None, 11111, 1.25, 101.0, "test02"], [False, 100, None, 188.1, 688.25, "test03"], [True, 0, 0, 0, None, None], ] timestamps_ = [16, 17, 18, 19] tablet_ = Tablet( device_id, measurements_, data_types_, values_, timestamps_ ) session.insert_tablet(tablet_)
Comparing with Tablet, Numpy Tablet is using numpy.ndarray to record data. With less memory footprint and time cost of serialization, the insert performance will be better.
Notice
import numpy as np data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] np_values_ = [ np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), ] np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype()) np_tablet_ = NumpyTablet( device_id, measurements_, data_types_, np_values_, np_timestamps_ ) session.insert_tablet(np_tablet_) # insert one numpy tablet with None into the database. np_values_ = [ np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), ] np_timestamps_ = np.array([98, 99, 100, 101], TSDataType.INT64.np_dtype()) np_bitmaps_ = [] for i in range(len(measurements_)): np_bitmaps_.append(BitMap(len(np_timestamps_))) np_bitmaps_[0].mark(0) np_bitmaps_[1].mark(1) np_bitmaps_[2].mark(2) np_bitmaps_[4].mark(3) np_bitmaps_[5].mark(3) np_tablet_with_none = NumpyTablet( device_id, measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_ ) session.insert_tablet(np_tablet_with_none)
session.insert_tablets(tablet_lst)
session.insert_record(device_id, timestamp, measurements_, data_types_, values_)
session.insert_records( device_ids_, time_list_, measurements_list_, data_type_list_, values_list_ )
session.insert_records_of_one_device(device_id, time_list, measurements_list, data_types_list, values_list)
When the data is of String type, we can use the following interface to perform type inference based on the value of the value itself. For example, if value is “true” , it can be automatically inferred to be a boolean type. If value is “3.2” , it can be automatically inferred as a flout type. Without type information, server has to do type inference, which may cost some time.
session.insert_str_record(device_id, timestamp, measurements, string_values)
The Insert of aligned timeseries uses interfaces like insert_aligned_XXX, and others are similar to the above interfaces:
session.execute_query_statement(sql)
session.execute_non_query_statement(sql)
session.execute_statement(sql)
The step for creating a metadata template is as follows
template = Template(name=template_name, share_time=True) m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY) m_node_y = MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY) m_node_z = MeasurementNode("z", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY) template.add_template(m_node_x) template.add_template(m_node_y) template.add_template(m_node_z) session.create_schema_template(template)
Modify measurements in a template, the template must be already created. These are functions that add or delete some measurement nodes.
session.add_measurements_in_template(template_name, measurements_path, data_types, encodings, compressors, is_aligned)
session.delete_node_in_template(template_name, path)
session.set_schema_template(template_name, prefix_path)
session.unset_schema_template(template_name, prefix_path)
session.show_all_templates()
session.count_measurements_in_template(template_name)
session.count_measurements_in_template(template_name, path)
session.is_path_exist_in_template(template_name, path)
session.show_measurements_in_template(template_name)
session.show_paths_template_set_on(template_name)
session.show_paths_template_using_on(template_name)
Delete an existing metadata template,dropping an already set template is not supported
session.drop_schema_template("template_python")
To easily transform a query result to a Pandas Dataframe the SessionDataSet has a method .todf() which consumes the dataset and transforms it to a pandas dataframe.
Example:
from iotdb.Session import Session ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "TimechoDB@2021" //Before V2.0.6.x the default password is root session = Session(ip, port_, username_, password_) session.open(False) result = session.execute_query_statement("SELECT * FROM root.*") # Transform to Pandas Dataset df = result.todf() session.close() # Now you can work with the dataframe df = ...
The Test Support is based on the lib testcontainers (https://testcontainers-python.readthedocs.io/en/latest/index.html) which you need to install in your project if you want to use the feature.
To start (and stop) an IoTDB Database in a Docker container simply do:
class MyTestCase(unittest.TestCase): def test_something(self): with IoTDBContainer() as c: session = Session("localhost", c.get_exposed_port(6667), "root", "TimechoDB@2021") //Before V2.0.6.x the default password is root session.open(False) result = session.execute_query_statement("SHOW TIMESERIES") print(result) session.close()
by default it will load the image apache/iotdb:latest, if you want a specific version just pass it like e.g. IoTDBContainer("apache/iotdb:0.12.0") to get version 0.12.0 running.
IoTDB DBAPI implements the Python DB API 2.0 specification (https://peps.python.org/pep-0249/), which defines a common interface for accessing databases in Python.
The initialized parameters are consistent with the session part (except for the sqlalchemy_mode).
from iotdb.dbapi import connect ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "TimechoDB@2021" //Before V2.0.6.x the default password is root conn = connect(ip, port_, username_, password_,fetch_size=1024,zone_id="UTC+8",sqlalchemy_mode=False) cursor = conn.cursor()
cursor.execute("SELECT ** FROM root") for row in cursor.fetchall(): print(row)
IoTDB DBAPI supports pyformat style parameters
cursor.execute("SELECT ** FROM root WHERE time < %(time)s",{"time":"2017-11-01T00:08:00.000"}) for row in cursor.fetchall(): print(row)
seq_of_parameters = [ {"timestamp": 1, "temperature": 1}, {"timestamp": 2, "temperature": 2}, {"timestamp": 3, "temperature": 3}, {"timestamp": 4, "temperature": 4}, {"timestamp": 5, "temperature": 5}, ] sql = "insert into root.cursor(timestamp,temperature) values(%(timestamp)s,%(temperature)s)" cursor.executemany(sql,seq_of_parameters)
cursor.close() conn.close()
The SQLAlchemy dialect of IoTDB is written to adapt to Apache Superset. This part is still being improved. Please do not use it in the production environment!
The data model used by SQLAlchemy is a relational data model, which describes the relationships between different entities through tables. While the data model of IoTDB is a hierarchical data model, which organizes the data through a tree structure. In order to adapt IoTDB to the dialect of SQLAlchemy, the original data model in IoTDB needs to be reorganized. Converting the data model of IoTDB into the data model of SQLAlchemy.
The metadata in the IoTDB are:
The metadata in the SQLAlchemy are:
The mapping relationship between them is:
| The metadata in the SQLAlchemy | The metadata in the IoTDB |
|---|---|
| Schema | Database |
| Table | Path ( from database to entity ) + Entity |
| Column | Measurement |
The following figure shows the relationship between the two more intuitively:
| data type in IoTDB | data type in SQLAlchemy |
|---|---|
| BOOLEAN | Boolean |
| INT32 | Integer |
| INT64 | BigInteger |
| FLOAT | Float |
| DOUBLE | Float |
| TEXT | Text |
| LONG | BigInteger |
from sqlalchemy import create_engine engine = create_engine("iotdb://root:root@127.0.0.1:6667") connect = engine.connect() result = connect.execute("SELECT ** FROM root") for row in result.fetchall(): print(row)
from sqlalchemy import create_engine, Column, Float, BigInteger, MetaData from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker metadata = MetaData( schema='root.factory' ) Base = declarative_base(metadata=metadata) class Device(Base): __tablename__ = "room2.device1" Time = Column(BigInteger, primary_key=True) temperature = Column(Float) status = Column(Float) engine = create_engine("iotdb://root:TimechoDB@2021@127.0.0.1:6667") //Before V2.0.6.x the default password is root DbSession = sessionmaker(bind=engine) session = DbSession() res = session.query(Device.status).filter(Device.temperature > 1) for row in res: print(row)
This is an example of how to connect to IoTDB with python, using the thrift rpc interfaces. Things are almost the same on Windows or Linux, but pay attention to the difference like path separator.
Python3.7 or later is preferred.
You have to install Thrift (0.11.0 or later) to compile our thrift file into python code. Below is the official tutorial of installation, eventually, you should have a thrift executable.
http://thrift.apache.org/docs/install/
Before starting you need to install requirements_dev.txt in your python environment, e.g. by calling
pip install -r requirements_dev.txt
In the root of IoTDB's source code folder, run mvn clean generate-sources -pl iotdb-client/client-py -am.
This will automatically delete and repopulate the folder iotdb/thrift with the generated thrift files. This folder is ignored from git and should never be pushed to git!
Notice Do not upload iotdb/thrift to the git repo.
We packed up the Thrift interface in client-py/src/iotdb/Session.py (similar with its Java counterpart), also provided an example file client-py/src/SessionExample.py of how to use the session module. please read it carefully.
Or, another simple example:
from iotdb.Session import Session ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "TimechoDB@2021" //Before V2.0.6.x the default password is root session = Session(ip, port_, username_, password_) session.open(False) zone = session.get_time_zone() session.close()
Please add your custom tests in tests folder.
To run all defined tests just type pytest . in the root folder.
Notice Some tests need docker to be started on your system as a test instance is started in a docker container using testcontainers.
black and flake8 are installed for autoformatting and linting. Both can be run by black . or flake8 . respectively.
To do a release just ensure that you have the right set of generated thrift files. Then run linting and auto-formatting. Then, ensure that all tests work (via pytest .). Then you are good to go to do a release!
First, install all necessary dev dependencies via pip install -r requirements_dev.txt.
There is a convenient script release.sh to do all steps for a release. Namely, these are