在根目录下运行:
mvn clean install -pl iotdb-client/session -am -Dmaven.test.skip=true
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-session</artifactId> <version>${project.version}</version> </dependency> </dependencies>
example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java下面将给出 Session 对应的接口的简要介绍和对应参数:
// 全部使用默认配置 session = new Session.Builder.build(); // 指定一个可连接节点 session = new Session.Builder() .host(String host) .port(int port) .build(); // 指定多个可连接节点 session = new Session.Builder() .nodeUrls(List<String> nodeUrls) .build(); // 其他配置项 session = new Session.Builder() .fetchSize(int fetchSize) .username(String username) .password(String password) .thriftDefaultBufferSize(int thriftDefaultBufferSize) .thriftMaxFrameSize(int thriftMaxFrameSize) .enableRedirection(boolean enableRedirection) .version(Version version) .build();
其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:V_0_12、V_0_13、V_1_0。
void open()
void open(boolean enableRPCCompression)
注意: 客户端的 RPC 压缩开启状态需和服务端一致
void close()
void setStorageGroup(String storageGroupId)
void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)
void createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor, Map<String, String> props, Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes, List<TSEncoding> encodings, List<CompressionType> compressors, List<Map<String, String>> propsList, List<Map<String, String>> tagsList, List<Map<String, String>> attributesList, List<String> measurementAliasList)
void createAlignedTimeseries(String prefixPath, List<String> measurements,
List<TSDataType> dataTypes, List<TSEncoding> encodings,
List <CompressionType> compressors, List<String> measurementAliasList);
注意:目前暂不支持使用传感器别名。
void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)
boolean checkTimeseriesExists(String path)
public void createSchemaTemplate(Template template); Class Template { private String name; private boolean directShareTime; Map<String, Node> children; public Template(String name, boolean isShareTime); public void addToTemplate(Node node); public void deleteFromTemplate(String name); public void setShareTime(boolean shareTime); } Abstract Class Node { private String name; public void addChild(Node node); public void deleteChild(Node node); } Class MeasurementNode extends Node { TSDataType dataType; TSEncoding encoding; CompressionType compressor; public MeasurementNode(String name, TSDataType dataType, TSEncoding encoding, CompressionType compressor); }
通过上述类的实例描述模板时,Template 内应当仅能包含单层的 MeasurementNode,具体可以参见如下示例:
MeasurementNode nodeX = new MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); MeasurementNode nodeY = new MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); MeasurementNode nodeSpeed = new MeasurementNode("speed", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); // This is the template we suggest to implement Template flatTemplate = new Template("flatTemplate"); template.addToTemplate(nodeX); template.addToTemplate(nodeY); template.addToTemplate(nodeSpeed); createSchemaTemplate(flatTemplate);
void createTimeseriesUsingSchemaTemplate(List<String> devicePathList)
void setSchemaTemplate(String templateName, String prefixPath)
/** @return All template names. */ public List<String> showAllTemplates(); /** @return All paths have been set to designated template. */ public List<String> showPathsTemplateSetOn(String templateName); /** @return All paths are using designated template. */ public List<String> showPathsTemplateUsingOn(String templateName)
void unsetSchemaTemplate(String prefixPath, String templateName); public void dropSchemaTemplate(String templateName);
注意:目前不支持从曾经在‘prefixPath’路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。
推荐使用 insertTablet 帮助提高写入效率
void insertTablet(Tablet tablet) public class Tablet { /** deviceId of this tablet */ public String prefixPath; /** the list of measurement schemas for creating the tablet */ private List<MeasurementSchema> schemas; /** timestamps in this tablet */ public long[] timestamps; /** each object is a primitive type array, which represents values of one measurement */ public Object[] values; /** each bitmap represents the existence of each value in the current column. */ public BitMap[] bitMaps; /** the number of rows to include in this tablet */ public int rowSize; /** the maximum number of rows for this tablet */ private int maxRowNumber; /** whether this tablet store data of aligned timeseries or not */ private boolean isAligned; }
void insertTablets(Map<String, Tablet> tablets)
插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据。这里的 value 是 Object 类型,相当于提供了一个公用接口,后面可以通过 TSDataType 将 value 强转为原类型
其中,Object 类型与 TSDataType 类型的对应关系如下表所示:
| TSDataType | Object |
|---|---|
| BOOLEAN | Boolean |
| INT32 | Integer |
| DATE | LocalDate |
| INT64 | Long |
| TIMESTAMP | Long |
| FLOAT | Float |
| DOUBLE | Double |
| TEXT | String, Binary |
| STRING | String, Binary |
| BLOB | Binary |
void insertRecord(String prefixPath, long time, List<String> measurements, List<TSDataType> types, List<Object> values)
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
当数据均是 String 类型时,我们可以使用如下接口,根据 value 的值进行类型推断。例如:value 为 “true” ,就可以自动推断为布尔类型。value 为 “3.2” ,就可以自动推断为数值类型。服务器需要做类型推断,可能会有额外耗时,速度较无需类型推断的写入慢
void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values)
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
对齐时间序列的写入使用 insertAlignedXXX 接口,其余与上述接口类似:
void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime);
最新点查询:
SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
isLegalPathNodes置为true以避免路径校验带来的性能损失。SessionDataSet executeLastDataQueryForOneDevice( String db, String device, List<String> sensors, boolean isLegalPathNodes);
聚合查询:
SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations); SessionDataSet executeAggregationQuery( List<String> paths, List<Aggregation> aggregations, long startTime, long endTime); SessionDataSet executeAggregationQuery( List<String> paths, List<Aggregation> aggregations, long startTime, long endTime, long interval); SessionDataSet executeAggregationQuery( List<String> paths, List<Aggregation> aggregations, long startTime, long endTime, long interval, long slidingStep);
SessionDataSet executeQueryStatement(String sql)
void executeNonQueryStatement(String sql)
不实际写入数据,只将数据传输到 server 即返回
void testInsertRecord(String deviceId, long time, List<String> measurements, List<String> values) void testInsertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values)
void testInsertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) void testInsertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
void testInsertTablet(Tablet tablet)
void testInsertTablets(Map<String, Tablet> tablets)
浏览上述接口的详细信息,请参阅代码 session/src/main/java/org/apache/iotdb/session/Session.java
使用上述接口的示例代码在 example/session/src/main/java/org/apache/iotdb/SessionExample.java
使用对齐时间序列和元数据模板的示例可以参见 example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
我们提供了一个针对原生接口的连接池 (SessionPool),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
当一个连接被用完后,他会自动返回池中等待下次被使用; 当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; 你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。
对于查询操作:
SessionDataSet的封装类SessionDataSetWrapper;closeResultSet;closeResultSet.SessionDataSetWrapper 的 getColumnNames() 方法得到结果集列名使用示例可以参见 session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
或 example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
集群信息相关的接口允许用户获取如数据分区情况、节点是否当机等信息。 要使用该 API,需要增加依赖:
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-thrift-cluster</artifactId> <version>${project.version}</version> </dependency> </dependencies>
建立连接与关闭连接的示例:
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.iotdb.rpc.RpcTransportFactory; public class CluserInfoClient { TTransport transport; ClusterInfoService.Client client; public void connect() { transport = RpcTransportFactory.INSTANCE.getTransport( new TSocket( // the RPC address IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), // the RPC port ClusterDescriptor.getInstance().getConfig().getClusterRpcPort())); try { transport.open(); } catch (TTransportException e) { Assert.fail(e.getMessage()); } //get the client client = new ClusterInfoService.Client(new TBinaryProtocol(transport)); } public void close() { transport.close(); } }
API 列表:
list<Node> getRing();
/** * @param path input path (should contains a database name as its prefix) * @return the data partition info. If the time range only covers one data partition, the the size * of the list is one. */ list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
/** * @param path input path (should contains a database name as its prefix) * @return metadata partition information */ list<Node> getMetaPartition(1:string path);
/** * @return key: node, value: live or not */ map<Node, bool> getAllNodeStatus();
/** * @return A multi-line string with each line representing the total time consumption, invocation * number, and average time consumption. */ string getInstrumentingInfo();