ceresdb-client 是 CeresDB python 客户端.
借助于 PyO3,python 客户端的实现实际上是基于 rust 客户端 的封装。
本手册将会介绍 python client 的一些基本用法,其中涉及到的完整示例,可以查看该示例代码.
pip install ceresdb-client
首先介绍下如何初始化客户端,代码示例如下:
import asyncio import datetime from ceresdb_client import Builder, RpcContext, PointBuilder, ValueBuilder, WriteRequest, SqlQueryRequest, Mode, RpcConfig rpc_config = RpcConfig() rpc_config = RpcConfig() rpc_config.thread_num = 1 rpc_config.default_write_timeout_ms = 1000 builder = Builder('127.0.0.1:8831', Mode.Direct) builder.set_rpc_config(rpc_config) builder.set_default_database('public') client = builder.build()
代码的最开始部分是依赖库的导入,在后面的示例中将省略这部分。
客户端初始化需要至少两个参数:
Endpoint: 服务端地址,由 ip 和端口组成,例如 127.0.0.1:8831;Mode: 客户端和服务端通信模式,有两种模式可供选择: Direct 和 Proxy。这里重点介绍下通信模式 Mode, 当客户端可以访问所有的服务器的时候,建议采用 Direct 模式,以减少转发开销;但是如果客户端访问服务器必须要经过网关,那么只能选择 Proxy 模式。
至于 default_database,会在执行 RPC 请求时未通过 RpcContext 设置 database 的情况下,将被作为目标 database 使用。
最后,通过配置 RpcConfig, 可以管理客户端使用的资源和调整其性能,所有的配置参数可以参考这里.
CeresDB 是一个 schema-less 的时序数据引擎,你可以不必创建 schema 就立刻写入数据(CeresDB 会根据你的第一次写入帮你创建一个默认的 schema)。 当然你也可以自行创建一个 schema 来更精细化的管理表,比如添加索引。
初始化客户端后,建表示例如下:
def async_query(client, ctx, req): await client.sql_query(ctx, req) create_table_sql = 'CREATE TABLE IF NOT EXISTS demo ( \ name string TAG, \ value double, \ t timestamp NOT NULL, \ TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)' req = SqlQueryRequest(['demo'], create_table_sql) rpc_ctx = RpcContext() rpc_ctx.database = 'public' rpc_ctx.timeout_ms = 100 event_loop = asyncio.get_event_loop() event_loop.run_until_complete(async_query(client, rpc_ctx, req))
RpcContext 可以用来指定目标 database (可以覆盖在初始化的时候设置的 default_space) 和超时参数。
可以使用 PointBuilder 来构建一个 point(实际上就是数据集的一行),多个 point 构成一个写入请求。
示例如下:
async def async_write(client, ctx, req): return await client.write(ctx, req) point_builder = PointBuilder('demo') point_builder.set_timestamp(int(round(datetime.datetime.now().timestamp()))) point_builder.set_tag("name", ValueBuilder().string("test_tag1")) point_builder.set_field("value", ValueBuilder().double(0.4242)) point = point_builder.build() write_request = WriteRequest() write_request.add_point(point) event_loop = asyncio.get_event_loop() event_loop.run_until_complete(async_write(client, ctx, req))
通过 sql_query 接口, 可以方便地从服务端查询数据:
req = SqlQueryRequest(['demo'], 'select * from demo') event_loop = asyncio.get_event_loop() resp = event_loop.run_until_complete(async_query(client, ctx, req))
如示例所展示, 构建 SqlQueryRequest 需要两个参数:
当前为了查询的性能,第一个参数是必须的。
查询到数据后,逐行逐列处理数据的示例如下:
for row_idx in range(0, resp.row_num()): row_tokens = [] row = resp.get_row(row_idx) for col_idx in range(0, row.num_cols()): col = row.column_by_idx(col_idx) row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}") print(f"row#{col_idx}: {','.join(row_tokens)}")
和创建表类似,我们可以使用 sql 来删除表:
drop_table_sql = 'DROP TABLE demo' req = SqlQueryRequest(['demo'], drop_table_sql) event_loop = asyncio.get_event_loop() event_loop.run_until_complete(async_query(client, rpc_ctx, req))