┌─────────────────────┐
│ CeresDBClient │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ QueryClient │───┐
└─────────────────────┘ │
│ │Async to retry
│ │
┌────────────┘ │
│ │
│ ┌─────────────────────┐ │ ┌─────────────────────┐ ┌─────────────────────┐
└─▶│ RouterClient │◀─┴──▶│ RouterCache │◀─────▶│ RouterFor │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
▲ │
│ │
▼ │
┌─────────────────────┐ │
│ RpcClient │◀──────────────────────────────────────────────┘
└─────────────────────┘
▲
│
▼
┌─────────────────────┐
│ Default GRPC impl │
└─────────────────────┘
▲
│
┌───────────────────┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ CeresDB Node1 │ │ CeresDB Node2 │ │ ... │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
/** * According to the conditions, query data from the database. * * @param req the query request * @param ctx the invoke context * @return query result */ CompletableFuture<Result<QueryOk, Err>> query(QueryRequest req, Context ctx);
| name | desc |
|---|---|
QueryRequest req | 查询条件,包含 metrics 和 ql 字段,metrics 为建议字段,填写话会有更高效的路由, 不填写的话会自动解析 ql 语句以便进行路由(需要引入 ceresdb-sql 模块); ql 为查询语言的文本表示。 |
Context ctx | 调用上下文,实现一些特殊需求,ctx 中的内容会写入 gRPC 的 headers metadata |
CompletableFuture<Result<QueryOk, Err>>: 返回结果一个 future,因为 query 是纯异步 API,整个链路上没有一处是阻塞的。 Result<QueryOk, Err>: Result 的灵感来自 Rust 中的 Result,其中 QueryOk 和 Err 同时只能一个有值。 QueryOk: 查询成功的结果展示,包含一个 rowCount 用于反映这次查询的数据条数;还有一个 Stream<byte[]> rows 表示此次查询的数据流。 可以调用 QueryOk#mapToRecord 将每一行转成 Record 或是 QueryOk#mapToArray 将每一行转成一个 Object[]。 实际上,也可以让服务端返回 json,在发起查询调用时在 Context 中设置数据协议,即 ctx.with("data_protocol", "json"),设置后服务端将返回 UTF8 的 byte[],用户基于 QueryOk#map 自行将 byte[] 转成 String 再解析 json 即可。 代码示例: // Record final Stream<Record> records = queryOk.mapToRecord(); // 其中 parseUser 先基于 bytes 构建 String 再 parseJson 到 pojo 对象 final Stream<User> users = queryOk.map(bytes -> parseUser(bytes)); Err: 查询失败结果展示,包含错误状态码、错误文本信息、抛错的服务器地址。
Record:
public interface Record extends IndexedRecord { /** * Return the value of a field given its name. */ Object get(final String field); default <T> T get(final String field, final Class<T> expectType) { return expectType.cast(get(field)); } /** * Get a boolean value for the given {@code field}. */ default Boolean getBoolean(final String field) { return get(field, Boolean.class); } default Integer getUInt16(final String field) { return getInteger(field); } default Integer getUInt8(final String field) { return getInteger(field); } default Integer getInt16(final String field) { return getInteger(field); } default Integer getInt8(final String field) { return getInteger(field); } /** * Get a integer value for the given {@code field}. */ default Integer getInteger(final String field) { return get(field, Integer.class); } default Long getTimestamp(final String field) { return getLong(field); } default Long getUInt64(final String field) { return getLong(field); } default Long getUInt32(final String field) { return getLong(field); } default Long getInt64(final String field) { return getLong(field); } /** * Get a long value for the given {@code field}. */ default Long getLong(final String field) { return get(field, Long.class); } /** * Get a float value for the given {@code field}. */ default Float getFloat(final String field) { return get(field, Float.class); } /** * Get a double value for the given {@code field}. */ default Double getDouble(final String field) { return get(field, Double.class); } /** * Get a string value for the given {@code field}. */ default String getString(final String field) { return get(field, String.class); } /** * Get a bytes value for the given {@code field}. */ default byte[] getBytes(final String field) { return get(field, byte[].class); } /** * Return true if record has field with name. */ boolean hasField(final String field); /** * Return the field count of this record. */ int getFieldCount(); /** * Return all field descriptors in the record. */ List<FieldDescriptor> getFieldDescriptors(); }