在根目录下运行:
mvn clean install -pl session -am -Dmaven.test.skip=true
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-session</artifactId> <version>0.13.0-SNAPSHOT</version> </dependency> </dependencies>
checkTimeseriesExists 接口,由于内部调用了 IoTDB-SQL 接口,因此需要和 SQL 语法规范保持一致,并且针对 JAVA 字符串进行反转义。下面将给出 Session 对应的接口的简要介绍和对应参数:
// 全部使用默认配置 session = new Session.Builder.build(); // 指定一个可连接节点 session = new Session.Builder() .host(String host) .port(int port) .build(); // 指定多个可连接节点 session = new Session.Builder() .nodeUrls(List<String> nodeUrls) .build(); // 其他配置项 session = new Session.Builder() .fetchSize(int fetchSize) .username(String username) .password(String password) .thriftDefaultBufferSize(int thriftDefaultBufferSize) .thriftMaxFrameSize(int thriftMaxFrameSize) .enableCacheLeader(boolean enableCacheLeader) .version(Version version) .build();
其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:V_0_12、V_0_13。
void open()
void open(boolean enableRPCCompression)
注意: 客户端的 RPC 压缩开启状态需和服务端一致
void close()
void setStorageGroup(String storageGroupId)
void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)
void createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor, Map<String, String> props, Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes, List<TSEncoding> encodings, List<CompressionType> compressors, List<Map<String, String>> propsList, List<Map<String, String>> tagsList, List<Map<String, String>> attributesList, List<String> measurementAliasList)
void createAlignedTimeseries(String prefixPath, List<String> measurements,
List<TSDataType> dataTypes, List<TSEncoding> encodings,
CompressionType compressor, List<String> measurementAliasList);
注意:目前暂不支持使用传感器别名。
void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)
boolean checkTimeseriesExists(String path)
Template、MeasurementNode的对象,描述模板内物理量结构与类型、编码方式、压缩方式等信息,并通过以下接口创建模板public void createSchemaTemplate(Template template); Class Template { private String name; private boolean directShareTime; Map<String, Node> children; public Template(String name, boolean isShareTime); public void addToTemplate(Node node); public void deleteFromTemplate(String name); public void setShareTime(boolean shareTime); } Abstract Class Node { private String name; public void addChild(Node node); public void deleteChild(Node node); } Class MeasurementNode extends Node { TSDataType dataType; TSEncoding encoding; CompressionType compressor; public MeasurementNode(String name, TSDataType dataType, TSEncoding encoding, CompressionType compressor); }
通过上述类的实例描述模板时,Template内应当仅能包含单层的MeasurementNode,具体可以参见如下示例:
MeasurementNode nodeX = new MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); MeasurementNode nodeY = new MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); MeasurementNode nodeSpeed = new MeasurementNode("speed", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); // This is the template we suggest to implement Template flatTemplate = new Template("flatTemplate"); template.addToTemplate(nodeX); template.addToTemplate(nodeY); template.addToTemplate(nodeSpeed); createSchemaTemplate(flatTemplate);
// 为指定模板新增一组对齐的物理量,若其父节点在模板中已经存在,且不要求对齐,则报错 public void addAlignedMeasurementsInTemplate(String templateName, String[] measurementsPath, TSDataType[] dataTypes, TSEncoding[] encodings, CompressionType[] compressors); // 为指定模板新增一个对齐物理量, 若其父节点在模板中已经存在,且不要求对齐,则报错 public void addAlignedMeasurementInTemplate(String templateName, String measurementPath, TSDataType dataType, TSEncoding encoding, CompressionType compressor); // 为指定模板新增一个不对齐物理量, 若其父节在模板中已经存在,且要求对齐,则报错 public void addUnalignedMeasurementInTemplate(String templateName, String measurementPath, TSDataType dataType, TSEncoding encoding, CompressionType compressor); // 为指定模板新增一组不对齐的物理量, 若其父节在模板中已经存在,且要求对齐,则报错 public void addUnalignedMeasurementsIntemplate(String templateName, String[] measurementPaths, TSDataType[] dataTypes, TSEncoding[] encodings, CompressionType[] compressors); // 从指定模板中删除一个节点 public void deleteNodeInTemplate(String templateName, String path);
// 查询返回目前模板中所有物理量的数量 public int countMeasurementsInTemplate(String templateName); // 检查模板内指定路径是否为物理量 public boolean isMeasurementInTemplate(String templateName, String path); // 检查在指定模板内是否存在某路径 public boolean isPathExistInTemplate(String templateName, String path); // 返回指定模板内所有物理量的路径 public List<String> showMeasurementsInTemplate(String templateName); // 返回指定模板内某前缀路径下的所有物理量的路径 public List<String> showMeasurementsInTemplate(String templateName, String pattern);
templateName的元数据模板挂载到‘prefixPath’路径下,在执行这一步之前,你需要创建名为templateName的元数据模板void setSchemaTemplate(String templateName, String prefixPath)
/** @return All template names. */ public List<String> showAllTemplates(); /** @return All paths have been set to designated template. */ public List<String> showPathsTemplateSetOn(String templateName); /** @return All paths are using designated template. */ public List<String> showPathsTemplateUsingOn(String templateName)
void unsetSchemaTemplate(String prefixPath, String templateName); public void dropSchemaTemplate(String templateName);
setSchemaTemplate 。而只有在已有数据点插入模板对应的物理量,或使用以下接口激活模板,模板才会被设置为激活状态,进而被 show timeseries 等查询访问到。public void createTimeseriesOfTemplateOnPath(String path);
templateName的元数据模板。你需要保证给定的路径prefixPath下需要有名为templateName的元数据模板。prefixPath路径及其后代节点使用模板插入数据后,或者使用了激活模板命令,那么在卸载模板之前,还要对所有已激活模板的节点使用以下接口解除模板:public void deactivateTemplateOn(String templateName, String prefixPath);
prefixPath如果含有通配符(*或**)则按 PathPattern 匹配目标路径,否则仅表达其字面量对应的路径。推荐使用 insertTablet 帮助提高写入效率
void insertTablet(Tablet tablet) public class Tablet { /** deviceId of this tablet */ public String prefixPath; /** the list of measurement schemas for creating the tablet */ private List<MeasurementSchema> schemas; /** timestamps in this tablet */ public long[] timestamps; /** each object is a primitive type array, which represents values of one measurement */ public Object[] values; /** each bitmap represents the existence of each value in the current column. */ public BitMap[] bitMaps; /** the number of rows to include in this tablet */ public int rowSize; /** the maximum number of rows for this tablet */ private int maxRowNumber; /** whether this tablet store data of aligned timeseries or not */ private boolean isAligned; }
void insertTablets(Map<String, Tablet> tablets)
插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据。这里的 value 是 Object 类型,相当于提供了一个公用接口,后面可以通过 TSDataType 将 value 强转为原类型
其中,Object 类型与 TSDataType 类型的对应关系如下表所示:
| TSDataType | Object |
|---|---|
| BOOLEAN | Boolean |
| INT32 | Integer |
| INT64 | Long |
| FLOAT | Float |
| DOUBLE | Double |
| TEXT | String, Binary |
void insertRecord(String prefixPath, long time, List<String> measurements, List<TSDataType> types, List<Object> values)
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
当数据均是 String 类型时,我们可以使用如下接口,根据 value 的值进行类型推断。例如:value 为 “true” ,就可以自动推断为布尔类型。value 为 “3.2” ,就可以自动推断为数值类型。服务器需要做类型推断,可能会有额外耗时,速度较无需类型推断的写入慢
void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values)
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
对齐时间序列的写入使用 insertAlignedXXX 接口,其余与上述接口类似:
void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)
SessionDataSet executeQueryStatement(String sql)
void executeNonQueryStatement(String sql)
不实际写入数据,只将数据传输到 server 即返回
void testInsertRecord(String deviceId, long time, List<String> measurements, List<String> values) void testInsertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values)
void testInsertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) void testInsertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
void testInsertTablet(Tablet tablet)
void testInsertTablets(Map<String, Tablet> tablets)
浏览上述接口的详细信息,请参阅代码 session/src/main/java/org/apache/iotdb/session/Session.java
使用上述接口的示例代码在 example/session/src/main/java/org/apache/iotdb/SessionExample.java
使用对齐时间序列和元数据模板的示例可以参见 example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
我们提供了一个针对原生接口的连接池 (SessionPool),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
当一个连接被用完后,他会自动返回池中等待下次被使用; 当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; 你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。
对于查询操作:
SessionDataSet的封装类SessionDataSetWrapper;closeResultSet;closeResultSet.SessionDataSetWrapper 的 getColumnNames() 方法得到结果集列名使用示例可以参见 session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
或 example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
集群信息相关的接口允许用户获取如数据分区情况、节点是否当机等信息。 要使用该 API,需要增加依赖:
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-thrift-cluster</artifactId> <version>0.13.0-SNAPSHOT</version> </dependency> </dependencies>
建立连接与关闭连接的示例:
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.iotdb.rpc.RpcTransportFactory; public class CluserInfoClient { TTransport transport; ClusterInfoService.Client client; public void connect() { transport = RpcTransportFactory.INSTANCE.getTransport( new TSocket( // the RPC address IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), // the RPC port ClusterDescriptor.getInstance().getConfig().getClusterRpcPort())); try { transport.open(); } catch (TTransportException e) { Assert.fail(e.getMessage()); } //get the client client = new ClusterInfoService.Client(new TBinaryProtocol(transport)); } public void close() { transport.close(); } }
API 列表:
list<Node> getRing();
/** * @param path input path (should contains a Storage group name as its prefix) * @return the data partition info. If the time range only covers one data partition, the the size * of the list is one. */ list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
/** * @param path input path (should contains a Storage group name as its prefix) * @return metadata partition information */ list<Node> getMetaPartition(1:string path);
/** * @return key: node, value: live or not */ map<Node, bool> getAllNodeStatus();
/** * @return A multi-line string with each line representing the total time consumption, invocation * number, and average time consumption. */ string getInstrumentingInfo();