CeresDBxClient 是 CeresDB 的高性能 Java 版客户端,CeresDB 定位高性能的、分布式的、Schema-less 的 HTAP (Hybrid Timeseries/Analytic Processing) 型时间序列数据库。
┌─────────────────────┐
│ CeresDBxClient │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ WriteClient │───┐
└─────────────────────┘ │
│ Async to retry and merge responses
│ │
┌────Split requests │
│ │
│ ┌─────────────────────┐ │ ┌─────────────────────┐ ┌─────────────────────┐
└─▶│ RouterClient │◀─┴──▶│ RouterCache │◀─────▶│ RouterFor │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
▲ │
│ │
▼ │
┌─────────────────────┐ │
│ RpcClient │◀──────────────────────────────────────────────┘
└─────────────────────┘
▲
│
▼
┌─────────────────────┐
│ Default gRPC impl │
└─────────────────────┘
▲
│
┌───────────────────┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ CeresDBx Node1 │ │ CeresDBx Node2 │ │ ... │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
┌─────────────────────┐
│ CeresDBxClient │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ QueryClient │───┐
└─────────────────────┘ │
│ │Async to retry
│ │
┌────────────┘ │
│ │
│ ┌─────────────────────┐ │ ┌─────────────────────┐ ┌─────────────────────┐
└─▶│ RouterClient │◀─┴──▶│ RouterCache │◀─────▶│ RouterFor │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
▲ │
│ │
▼ │
┌─────────────────────┐ │
│ RpcClient │◀──────────────────────────────────────────────┘
└─────────────────────┘
▲
│
▼
┌─────────────────────┐
│ Default gRPC impl │
└─────────────────────┘
▲
│
┌───────────────────┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ CeresDBx Node1 │ │ CeresDBx Node2 │ │ ... │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
编译需要 Java8 及以上
CeresDB 是一个 Schema-less 的时序数据引擎,你可以不必创建 schema 就立刻写入数据(CeresDB 会根据你的第一次写入帮你创建一个默认的 schema)。 当然你也可以自行创建一个 schema 来更精细化的管理的表(比如索引等)
下面的建表语句(使用 SDK 的 SQL API)包含了 CeresDB 支持的所有字段类型:
SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" + "ts TIMESTAMP NOT NULL," + "c1 STRING TAG NOT NULL," + "c2 STRING TAG NOT NULL," + "c3 DOUBLE NULL," + "c4 STRING NULL," + "c5 INT64 NULL," + "c6 FLOAT NULL," + "c7 INT32 NULL," + "c8 INT16 NULL," + "c9 INT8 NULL," + "c10 BOOLEAN NULL," "c11 UINT64 NULL," "c12 UINT32 NULL," "c13 UINT16 NULL," "c14 UINT8 NULL," "c15 TIMESTAMP NULL," "c16 VARBINARY NULL," "TIMESTAMP KEY(ts)) ENGINE=Analytic" );
更多建表语句例子见 这里
// CeresDBx options final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8081) // .tenant("test", "sub_test", "test_token") // 租户信息 .writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效) .readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效) .build(); final CeresDBxClient client = new CeresDBxClient(); if (!client.init(this.opts)) { throw new IllegalStateException("Fail to start CeresDBxClient"); } final long t0 = System.currentTimeMillis(); final long t1 = t0 + 1000; final long t2 = t1 + 1000; final Rows data = Series.newBuilder("machine_metric") .tag("city", "Hangzhou") .tag("ip", "127.0.01") .toRowsBuilder() // 下面针对 cpu、mem 两列,一次写入了三行数据(3 个时间戳),CeresDB 鼓励这种实践,SDK 可以通过高效的压缩来减少网络传输,并且对 server 端写入非常友好 .field(t0, "cpu", FieldValue.withDouble(0.23)) // 第 1 行第 1 列 .field(t0, "mem", FieldValue.withDouble(0.55)) // 第 1 行第 2 列 .field(t1, "cpu", FieldValue.withDouble(0.25)) // 第 2 行第 1 列 .field(t1, "mem", FieldValue.withDouble(0.56)) // 第 2 行第 2 列 .field(t2, "cpu", FieldValue.withDouble(0.21)) // 第 3 行第 1 列 .field(t2, "mem", FieldValue.withDouble(0.52)) // 第 3 行第 2 列 .build(); final CompletableFuture<Result<WriteOk, Err>> wf = client.write(data); // 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程 final Result<WriteOk, Err> wr = wf.get(); Assert.assertTrue(wr.isOk()); Assert.assertEquals(3, wr.getOk().getSuccess()); // `Result` 类参考了 Rust 语言,提供了丰富的 mapXXX、andThen 类 function 方便对结果值进行转换,提高编程效率,欢迎参考 API 文档使用 Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess())); Assert.assertEquals(0, wr.getOk().getFailed()); Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed)); final QueryRequest queryRequest = QueryRequest.newBuilder() .forMetrics("machine_metric") // 表名可选填,不填的话 SQL Parser 会自动解析 ql 涉及到的表名并完成自动路由 .ql("select timestamp, cpu, mem from machine_metric") // .build(); final CompletableFuture<Result<QueryOk, Err>> qf = client.query(queryRequest); // 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程 final Result<QueryOk, Err> qr = qf.get(); Assert.assertTrue(qr.isOk()); final QueryOk queryOk = qr.getOk(); final List<Record> records = queryOk.mapToRecord().collect(Collectors.toList())