在根目录下运行:
mvn clean install -pl session -am -Dmaven.test.skip=true
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-session</artifactId> <version>0.13.0-SNAPSHOT</version> </dependency> </dependencies>
下面将给出 Session 对应的接口的简要介绍和对应参数:
// 全部使用默认配置 session = new Session.Builder.build(); // 指定一个可连接节点 session = new Session.Builder() .host(String host) .port(int port) .build(); // 指定多个可连接节点 session = new Session.Builder() .nodeUrls(List<String> nodeUrls) .build(); // 其他配置项 session = new Session.Builder() .fetchSize(int fetchSize) .username(String username) .password(String password) .thriftDefaultBufferSize(int thriftDefaultBufferSize) .thriftMaxFrameSize(int thriftMaxFrameSize) .enableCacheLeader(boolean enableCacheLeader) .build();
Session.open()
Session.close()
void setStorageGroup(String storageGroupId)
void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)
void createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor, Map<String, String> props, Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes, List<TSEncoding> encodings, List<CompressionType> compressors, List<Map<String, String>> propsList, List<Map<String, String>> tagsList, List<Map<String, String>> attributesList, List<String> measurementAliasList)
void createAlignedTimeseries(String prefixPath, List<String> measurements,
List<TSDataType> dataTypes, List<TSEncoding> encodings,
CompressionType compressor, List<String> measurementAliasList);
注意:目前暂不支持使用传感器别名。
void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)
void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)
void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values)
void insertTablet(Tablet tablet)
void insertTablets(Map<String, Tablet> tablets)
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
void insertRecord(String prefixPath, long time, List<String> measurements, List<TSDataType> types, List<Object> values)
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
SessionDataSet executeQueryStatement(String sql)
void executeNonQueryStatement(String sql)
public void createSchemaTemplate(Template template); Class Template { private String name; private boolean directShareTime; Map<String, Node> children; public Template(String name, boolean isShareTime); public void addToTemplate(Node node); public void deleteFromTemplate(String name); public void setShareTime(boolean shareTime); } Abstract Class Node { private String name; public void addChild(Node node); public void deleteChild(Node node); } Class InternalNode extends Node { boolean shareTime; Map<String, Node> children; public void setShareTime(boolean shareTime); public InternalNode(String name, boolean isShareTime); } Class MeasurementNode extends Node { TSDataType dataType; TSEncoding encoding; CompressionType compressor; public MeasurementNode(String name, TSDataType dataType, TSEncoding encoding, CompressionType compressor); }
通过这种方式创建物理量模板的代码示例如下:
MeasurementNode nodeX = new MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); MeasurementNode nodeY = new MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); MeasurementNode nodeSpeed = new MeasurementNode("speed", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); InternalNode internalGPS = new InternalNode("GPS", true); InternalNode internalVehicle = new InternalNode("vehicle", false); internalGPS.addChild(nodeX); internalGPS.addChild(nodeY); internalVehicle.addChild(GPS); internalVehicle.addChild(nodeSpeed); Template template = new Template("treeTemplateExample"); template.addToTemplate(internalGPS); template.addToTemplate(internalVehicle); template.addToTemplate(nodeSpeed); createSchemaTemplate(template);
// 为指定模板新增一组对齐的物理量,若其父节点在模板中已经存在,且不要求对齐,则报错 public void addAlignedMeasurementsInTemplate(String templateName, String[] measurementsPath, TSDataType[] dataTypes, TSEncoding[] encodings, CompressionType[] compressors); // 为指定模板新增一个对齐物理量, 若其父节点在模板中已经存在,且不要求对齐,则报错 public void addAlignedMeasurementInTemplate(String templateName, String measurementPath, TSDataType dataType, TSEncoding encoding, CompressionType compressor); // 为指定模板新增一个不对齐物理量, 若其父节在模板中已经存在,且要求对齐,则报错 public void addUnalignedMeasurementInTemplate(String templateName, String measurementPath, TSDataType dataType, TSEncoding encoding, CompressionType compressor); // 为指定模板新增一组不对齐的物理量, 若其父节在模板中已经存在,且要求对齐,则报错 public void addUnalignedMeasurementsIntemplate(String templateName, String[] measurementPaths, TSDataType[] dataTypes, TSEncoding[] encodings, CompressionType[] compressors); // 从指定模板中删除一个节点及其子树 public void deleteNodeInTemplate(String templateName, String path);
// 查询返回目前模板中所有物理量的数量 public int countMeasurementsInTemplate(String templateName); // 检查模板内指定路径是否为物理量 public boolean isMeasurementInTemplate(String templateName, String path); // 检查在指定模板内是否存在某路径 public boolean isPathExistInTemplate(String templateName, String path); // 返回指定模板内所有物理量的路径 public List<String> showMeasurementsInTemplate(String templateName); // 返回指定模板内某前缀路径下的所有物理量的路径 public List<String> showMeasurementsInTemplate(String templateName, String pattern);
void setSchemaTemplate(String templateName, String prefixPath)
void unsetSchemaTemplate(String prefixPath, String templateName)
注意:目前不支持从曾经在‘prefixPath’路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。
void testInsertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)
或
void testInsertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)
void testInsertRecord(String deviceId, long time, List<String> measurements, List<String> values)
或
void testInsertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values)
void testInsertTablet(Tablet tablet)
我们提供了一个针对原生接口的连接池 (SessionPool),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
当一个连接被用完后,他会自动返回池中等待下次被使用; 当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
对于查询操作:
SessionDataSet的封装类SessionDataSetWrapper;closeResultSet;closeResultSet.SessionDataSetWrapper 的 getColumnNames() 方法得到结果集列名使用示例可以参见 session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
或 example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
使用对齐时间序列和物理量模板的示例可以参见 example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java。
浏览上述接口的详细信息,请参阅代码 session/src/main/java/org/apache/iotdb/session/Session.java
使用上述接口的示例代码在 example/session/src/main/java/org/apache/iotdb/SessionExample.java
集群信息相关的接口允许用户获取如数据分区情况、节点是否当机等信息。 要使用该 API,需要增加依赖:
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-thrift-cluster</artifactId> <version>0.13.0-SNAPSHOT</version> </dependency> </dependencies>
建立连接与关闭连接的示例:
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.iotdb.rpc.RpcTransportFactory; public class CluserInfoClient { TTransport transport; ClusterInfoService.Client client; public void connect() { transport = RpcTransportFactory.INSTANCE.getTransport( new TSocket( // the RPC address IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), // the RPC port ClusterDescriptor.getInstance().getConfig().getClusterRpcPort())); try { transport.open(); } catch (TTransportException e) { Assert.fail(e.getMessage()); } //get the client client = new ClusterInfoService.Client(new TBinaryProtocol(transport)); } public void close() { transport.close(); } }
API 列表:
list<Node> getRing();
/** * @param path input path (should contains a Storage group name as its prefix) * @return the data partition info. If the time range only covers one data partition, the the size * of the list is one. */ list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
/** * @param path input path (should contains a Storage group name as its prefix) * @return metadata partition information */ list<Node> getMetaPartition(1:string path);
/** * @return key: node, value: live or not */ map<Node, bool> getAllNodeStatus();
/** * @return A multi-line string with each line representing the total time consumption, invocation * number, and average time consumption. */ string getInstrumentingInfo();