Session原生API

IoTDB 原生 API 中的 Session 是实现与数据库交互的核心接口,它集成了丰富的方法,支持数据写入、查询以及元数据操作等功能。通过实例化 Session,能够建立与 IoTDB 服务器的连接,在该连接所构建的环境中执行各类数据库操作。Session为非线程安全,不能被多线程同时调用。

SessionPool 是 Session 的连接池,推荐使用SessionPool编程。在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。

1 步骤概览

  1. 创建连接池实例:初始化一个SessionPool对象,用于管理多个Session实例。
  2. 执行操作:直接从SessionPool中获取Session实例,并执行数据库操作,无需每次都打开和关闭连接。
  3. 关闭连接池资源:在不再需要进行数据库操作时,关闭SessionPool,释放所有相关资源。

2 详细步骤

本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: 全量接口说明 或 查阅: 源码

2.1 创建maven项目

创建一个maven项目,并在pom.xml文件中添加以下依赖(JDK >= 1.8, Maven >= 3.6)

<dependencies>
    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <artifactId>iotdb-session</artifactId>
        <!-- 版本号与数据库版本号相同 -->
      <version>${project.version}</version>
    </dependency>
</dependencies>

2.2 创建连接池实例

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.session.pool.SessionPool;

public class IoTDBSessionPoolExample {
    private static SessionPool sessionPool;

    public static void main(String[] args) {
        // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
        List<String> nodeUrls = new ArrayList<>();
        nodeUrls.add("127.0.0.1:6667");
        nodeUrls.add("127.0.0.1:6668");
        sessionPool =
                new SessionPool.Builder()
                        .nodeUrls(nodeUrls)
                        .user("root")
                        .password("root")
                        .maxSize(3)
                        .build();
    }
}

2.3 执行数据库操作

2.3.1 数据写入

在工业场景中,数据写入可分为以下几类:多行数据写入、单设备多行数据写入,下面按不同场景对写入接口进行介绍。

多行数据写入接口

接口说明:支持一次写入多行数据,每一行对应一个设备一个时间戳的多个测点值。

接口列表:

接口名称功能描述
insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)插入多行数据,适用于不同测点独立采集的场景

代码案例:

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.tsfile.enums.TSDataType;

public class SessionPoolExample {
 private static SessionPool sessionPool;
     public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
          // 1. init SessionPool
        constructSessionPool();
          // 2. execute insert data
        insertRecordsExample();
          // 3. close SessionPool
        closeSessionPool();
  }

  private static void constructSessionPool() {
        // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
        List<String> nodeUrls = new ArrayList<>();
        nodeUrls.add("127.0.0.1:6667");
        nodeUrls.add("127.0.0.1:6668");
        sessionPool =
                new SessionPool.Builder()
                        .nodeUrls(nodeUrls)
                        .user("root")
                        .password("root")
                        .maxSize(3)
                        .build();
    }

    public static void insertRecordsExample() throws IoTDBConnectionException, StatementExecutionException {
        String deviceId = "root.sg1.d1";
        List<String> measurements = new ArrayList<>();
        measurements.add("s1");
        measurements.add("s2");
        measurements.add("s3");
        List<String> deviceIds = new ArrayList<>();
        List<List<String>> measurementsList = new ArrayList<>();
        List<List<Object>> valuesList = new ArrayList<>();
        List<Long> timestamps = new ArrayList<>();
        List<List<TSDataType>> typesList = new ArrayList<>();

        for (long time = 0; time < 500; time++) {
            List<Object> values = new ArrayList<>();
            List<TSDataType> types = new ArrayList<>();
            values.add(1L);
            values.add(2L);
            values.add(3L);
            types.add(TSDataType.INT64);
            types.add(TSDataType.INT64);
            types.add(TSDataType.INT64);

            deviceIds.add(deviceId);
            measurementsList.add(measurements);
            valuesList.add(values);
            typesList.add(types);
            timestamps.add(time);
            if (time != 0 && time % 100 == 0) {
                try {
                    sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
                } catch (IoTDBConnectionException | StatementExecutionException e) {
                    // solve exception
                }
                deviceIds.clear();
                measurementsList.clear();
                valuesList.clear();
                typesList.clear();
                timestamps.clear();
            }
        }
        try {
            sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
        } catch (IoTDBConnectionException | StatementExecutionException e) {
            // solve exception
        }
    }
    
      public static void closeSessionPool(){
        sessionPool.close();
    }
}
单设备多行数据写入接口

接口说明:支持一次写入单个设备的多行数据,每一行对应一个时间戳的多个测点值。

接口列表:

接口名称功能描述
insertTablet(Tablet tablet)插入单个设备的多行数据,适用于不同测点独立采集的场景

代码案例:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;

public class SessionPoolExample {
      private static SessionPool sessionPool;
      public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
        // 1. init SessionPool
        constructSessionPool();
        // 2. execute insert data
        insertTabletExample();
        // 3. close SessionPool
        closeSessionPool();
          }

        private static void constructSessionPool() {
        // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
        List<String> nodeUrls = new ArrayList<>();
        nodeUrls.add("127.0.0.1:6667");
        nodeUrls.add("127.0.0.1:6668");
        sessionPool =
                new SessionPool.Builder()
                        .nodeUrls(nodeUrls)
                        .user("root")
                        .password("root")
                        .maxSize(3)
                        .build();
    }

    private static void insertTabletExample() throws IoTDBConnectionException, StatementExecutionException {
        /*
         * A Tablet example:
         *      device1
         * time s1, s2, s3
         * 1,   1,  1,  1
         * 2,   2,  2,  2
         * 3,   3,  3,  3
         */
        // The schema of measurements of one device
        // only measurementId and data type in MeasurementSchema take effects in Tablet
        List<MeasurementSchema> schemaList = new ArrayList<>();
        schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
        schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
        schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));

        Tablet tablet = new Tablet("root.sg.d1", schemaList, 100);

        // Method 1 to add tablet data
        long timestamp = System.currentTimeMillis();

        Random random = new Random();
        for (long row = 0; row < 100; row++) {
            int rowIndex = tablet.rowSize++;
            tablet.addTimestamp(rowIndex, timestamp);
            for (int s = 0; s < 3; s++) {
                long value = random.nextLong();
                tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
            }
            if (tablet.rowSize == tablet.getMaxRowNumber()) {
                sessionPool.insertTablet(tablet);
                tablet.reset();
            }
            timestamp++;
        }
        if (tablet.rowSize != 0) {
            sessionPool.insertTablet(tablet);
            tablet.reset();
        }
    }

    public static void closeSessionPool(){
        sessionPool.close();
    }
}

2.3.2 SQL操作

SQL操作分为查询和非查询两类操作,对应的接口为executeQueryexecuteNonQuery操作,其区别为前者执行的是具体的查询语句,会返回一个结果集,后者是执行的是增、删、改操作,不返回结果集。

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;

public class SessionPoolExample {
  private static SessionPool sessionPool;
  public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
        // 1. init SessionPool
        constructSessionPool(); 
        // 2. executes a non-query SQL statement, such as a DDL or DML command.
        executeQueryExample();
        // 3. executes a query SQL statement and returns the result set.
        executeNonQueryExample();
        // 4. close SessionPool
        closeSessionPool();
    }

    private static void executeNonQueryExample() throws IoTDBConnectionException, StatementExecutionException {
        // 1. create a nonAligned time series
        sessionPool.executeNonQueryStatement("create timeseries root.test.d1.s1 with dataType = int32");
        // 2. set ttl
        sessionPool.executeNonQueryStatement("set TTL to root.test.** 10000");
        // 3. delete time series
        sessionPool.executeNonQueryStatement("delete timeseries root.test.d1.s1");
    }
    
    private static void executeQueryExample() throws IoTDBConnectionException, StatementExecutionException {
        // 1. execute normal query
        try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select s1 from root.sg1.d1 limit 10")) {
            // get DataIterator like JDBC
            DataIterator dataIterator = wrapper.iterator();
            System.out.println(wrapper.getColumnNames());
            System.out.println(wrapper.getColumnTypes());
            while (dataIterator.next()) {
                StringBuilder builder = new StringBuilder();
                for (String columnName : wrapper.getColumnNames()) {
                    builder.append(dataIterator.getString(columnName) + " ");
                }
                System.out.println(builder);
            }
        }
        // 2. execute aggregate query
        try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select count(s1) from root.sg1.d1 group by ([0, 40), 5ms) ")) {
            // get DataIterator like JDBC
            DataIterator dataIterator = wrapper.iterator();
            System.out.println(wrapper.getColumnNames());
            System.out.println(wrapper.getColumnTypes());
            while (dataIterator.next()) {
                StringBuilder builder = new StringBuilder();
                for (String columnName : wrapper.getColumnNames()) {
                    builder.append(dataIterator.getString(columnName) + " ");
                }
                System.out.println(builder);
            }
        }
    }
    
    private static void constructSessionPool() {
    // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
    List<String> nodeUrls = new ArrayList<>();
    nodeUrls.add("127.0.0.1:6667");
    nodeUrls.add("127.0.0.1:6668");
    sessionPool =
            new SessionPool.Builder()
                    .nodeUrls(nodeUrls)
                    .user("root")
                    .password("root")
                    .maxSize(3)
                    .build();
    }

    public static void closeSessionPool(){
        sessionPool.close();
    }
}

更多关于结果集及其方法 SessionDataSet.DataIterator 的使用可参考如下示例(其中,getBlob 和 getDate 两个接口从 V1.3.6 起支持):

import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class SessionExample {
  private static SessionPool sessionPool;

  public static void main(String[] args)
      throws IoTDBConnectionException, StatementExecutionException {
    // 1. init SessionPool
    constructSessionPool();
    // 2. executes a query SQL statement, such as a DDL or DML command.
    executeQueryExample();
    // 3. close SessionPool
    closeSessionPool();
  }

  private static void executeQueryExample()
      throws IoTDBConnectionException, StatementExecutionException {
    Tablet tablet =
        new Tablet(
            "root.sg.d1",
            Arrays.asList(
                new MeasurementSchema("s1", TSDataType.INT32),
                new MeasurementSchema("s2", TSDataType.INT64),
                new MeasurementSchema("s3", TSDataType.FLOAT),
                new MeasurementSchema("s4", TSDataType.DOUBLE),
                new MeasurementSchema("s5", TSDataType.TEXT),
                new MeasurementSchema("s6", TSDataType.BOOLEAN),
                new MeasurementSchema("s7", TSDataType.TIMESTAMP),
                new MeasurementSchema("s8", TSDataType.BLOB),
                new MeasurementSchema("s9", TSDataType.STRING),
                new MeasurementSchema("s10", TSDataType.DATE),
                new MeasurementSchema("s11", TSDataType.TIMESTAMP)),
            10);
    tablet.addTimestamp(0, 0L);
    tablet.addValue("s1", 0, 1);
    tablet.addValue("s2", 0, 1L);
    tablet.addValue("s3", 0, 0f);
    tablet.addValue("s4", 0, 0d);
    tablet.addValue("s5", 0, "text_value");
    tablet.addValue("s6", 0, true);
    tablet.addValue("s7", 0, 1L);
    tablet.addValue("s8", 0, new Binary(new byte[] {1}));
    tablet.addValue("s9", 0, "string_value");
    tablet.addValue("s10", 0, DateUtils.parseIntToLocalDate(20250403));
    tablet.initBitMaps();
    tablet.bitMaps[10].mark(0);
    tablet.rowSize = 1;
    sessionPool.insertAlignedTablet(tablet);

    try (SessionDataSetWrapper dataSet =
        sessionPool.executeQueryStatement("select * from root.sg.d1")) {
      SessionDataSet.DataIterator iterator = dataSet.iterator();
      int count = 0;
      while (iterator.next()) {
        count++;
        Assert.assertFalse(iterator.isNull("root.sg.d1.s1"));
        Assert.assertEquals(1, iterator.getInt("root.sg.d1.s1"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s2"));
        Assert.assertEquals(1L, iterator.getLong("root.sg.d1.s2"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s3"));
        Assert.assertEquals(0, iterator.getFloat("root.sg.d1.s3"), 0.01);
        Assert.assertFalse(iterator.isNull("root.sg.d1.s4"));
        Assert.assertEquals(0, iterator.getDouble("root.sg.d1.s4"), 0.01);
        Assert.assertFalse(iterator.isNull("root.sg.d1.s5"));
        Assert.assertEquals("text_value", iterator.getString("root.sg.d1.s5"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s6"));
        Assert.assertTrue(iterator.getBoolean("root.sg.d1.s6"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s7"));
        Assert.assertEquals(new Timestamp(1), iterator.getTimestamp("root.sg.d1.s7"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s8"));
        Assert.assertEquals(new Binary(new byte[] {1}), iterator.getBlob("root.sg.d1.s8"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s9"));
        Assert.assertEquals("string_value", iterator.getString("root.sg.d1.s9"));
        Assert.assertFalse(iterator.isNull("root.sg.d1.s10"));
        Assert.assertEquals(
            DateUtils.parseIntToLocalDate(20250403), iterator.getDate("root.sg.d1.s10"));
        Assert.assertTrue(iterator.isNull("root.sg.d1.s11"));
        Assert.assertNull(iterator.getTimestamp("root.sg.d1.s11"));

        Assert.assertEquals(new Timestamp(0), iterator.getTimestamp("Time"));
        Assert.assertFalse(iterator.isNull("Time"));
      }
      Assert.assertEquals(tablet.rowSize, count);
    }
  }

  private static void constructSessionPool() {
    List<String> nodeUrls = new ArrayList<>();
    nodeUrls.add("127.0.0.1:6667");
    sessionPool =
        new SessionPool.Builder()
            .nodeUrls(nodeUrls)
            .user("root")
            .password("root")
            .maxSize(3)
            .build();
  }

  public static void closeSessionPool() {
    sessionPool.close();
  }
}

3 全量接口说明

3.1 参数列表

Session具有如下的字段,可以通过构造函数或Session.Builder方式设置如下参数

字段名类型说明
nodeUrlsList<String>数据库节点的 URL 列表,支持多节点连接
usernameString用户名
passwordString密码
fetchSizeint查询结果的默认批量返回大小
useSSLboolean是否启用 SSL
trustStoreString信任库路径
trustStorePwdString信任库密码
queryTimeoutInMslong查询的超时时间,单位毫秒
enableRPCCompressionboolean是否启用 RPC 压缩
connectionTimeoutInMsint连接超时时间,单位毫秒
zoneIdZoneId会话的时区设置
thriftDefaultBufferSizeintThrift 默认缓冲区大小
thriftMaxFrameSizeintThrift 最大帧大小
defaultEndPointTEndPoint默认的数据库端点信息
defaultSessionConnectionSessionConnection默认的会话连接对象
isClosedboolean当前会话是否已关闭
enableRedirectionboolean是否启用重定向功能
enableRecordsAutoConvertTabletboolean是否启用记录自动转换为 Tablet 的功能
deviceIdToEndpointMap<String, TEndPoint>设备 ID 和数据库端点的映射关系
endPointToSessionConnectionMap<TEndPoint, SessionConnection>数据库端点和会话连接的映射关系
executorServiceScheduledExecutorService用于定期更新节点列表的线程池
availableNodesINodeSupplier可用节点的供应器
enableQueryRedirectionboolean是否启用查询重定向功能
versionVersion客户端的版本号,用于与服务端的兼容性判断
enableAutoFetchboolean是否启用自动获取功能
maxRetryCountint最大重试次数
retryIntervalInMslong重试的间隔时间,单位毫秒

3.2 接口列表

3.2.1 元数据管理
方法名功能描述参数解释
createDatabase(String database)创建数据库database: 数据库名称
deleteDatabase(String database)删除指定数据库database: 要删除的数据库名称
deleteDatabases(List<String> databases)批量删除数据库databases: 要删除的数据库名称列表
createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)创建单个时间序列path: 时间序列路径,dataType: 数据类型,encoding: 编码类型,compressor: 压缩类型
createAlignedTimeseries(...)创建对齐时间序列设备ID、测点列表、数据类型列表、编码列表、压缩类型列表
createMultiTimeseries(...)批量创建时间序列多个路径、数据类型、编码、压缩类型、属性、标签、别名等
deleteTimeseries(String path)删除时间序列path: 要删除的时间序列路径
deleteTimeseries(List<String> paths)批量删除时间序列paths: 要删除的时间序列路径列表
setSchemaTemplate(String templateName, String prefixPath)设置模式模板templateName: 模板名称,prefixPath: 应用模板的路径
createSchemaTemplate(Template template)创建模式模板template: 模板对象
dropSchemaTemplate(String templateName)删除模式模板templateName: 要删除的模板名称
addAlignedMeasurementsInTemplate(...)添加对齐测点到模板模板名称、测点路径列表、数据类型、编码类型、压缩类型
addUnalignedMeasurementsInTemplate(...)添加非对齐测点到模板同上
deleteNodeInTemplate(String templateName, String path)删除模板中的节点templateName: 模板名称,path: 要删除的路径
countMeasurementsInTemplate(String name)统计模板中测点数量name: 模板名称
isMeasurementInTemplate(String templateName, String path)检查模板中是否存在某测点templateName: 模板名称,path: 测点路径
isPathExistInTemplate(String templateName, String path)检查模板中路径是否存在同上
showMeasurementsInTemplate(String templateName)显示模板中的测点templateName: 模板名称
showMeasurementsInTemplate(String templateName, String pattern)按模式显示模板中的测点templateName: 模板名称,pattern: 匹配模式
showAllTemplates()显示所有模板无参数
showPathsTemplateSetOn(String templateName)显示模板应用的路径templateName: 模板名称
showPathsTemplateUsingOn(String templateName)显示模板实际使用的路径同上
unsetSchemaTemplate(String prefixPath, String templateName)取消路径的模板设置prefixPath: 路径,templateName: 模板名称
3.2.2 数据写入
方法名功能描述参数解释
insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, Object... values)插入单条记录deviceId: 设备ID,time: 时间戳,measurements: 测点列表,types: 数据类型列表,values: 值列表
insertRecord(String deviceId, long time, List<String> measurements, List<String> values)插入单条记录deviceId: 设备ID,time: 时间戳,measurements: 测点列表,values: 值列表
insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<Object>> valuesList)插入多条记录deviceIds: 设备ID列表,times: 时间戳列表,measurementsList: 测点列表列表,valuesList: 值列表
insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)插入多条记录同上,增加 typesList: 数据类型列表
insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)插入单设备的多条记录deviceId: 设备ID,times: 时间戳列表,measurementsList: 测点列表列表,typesList: 类型列表,valuesList: 值列表
insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList, boolean haveSorted)插入排序后的单设备多条记录同上,增加 haveSorted: 数据是否已排序
insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)插入字符串格式的单设备记录deviceId: 设备ID,times: 时间戳列表,measurementsList: 测点列表,valuesList: 值列表
insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList, boolean haveSorted)插入排序的字符串格式单设备记录同上,增加 haveSorted: 数据是否已排序
insertAlignedRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values)插入单条对齐记录deviceId: 设备ID,time: 时间戳,measurements: 测点列表,types: 类型列表,values: 值列表
insertAlignedRecord(String deviceId, long time, List<String> measurements, List<String> values)插入字符串格式的单条对齐记录deviceId: 设备ID,time: 时间戳,measurements: 测点列表,values: 值列表
insertAlignedRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<Object>> valuesList)插入多条对齐记录deviceIds: 设备ID列表,times: 时间戳列表,measurementsList: 测点列表,valuesList: 值列表
insertAlignedRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)插入多条对齐记录同上,增加 typesList: 数据类型列表
insertAlignedRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList)插入单设备的多条对齐记录同上
insertAlignedRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList, boolean haveSorted)插入排序的单设备多条对齐记录同上,增加 haveSorted: 数据是否已排序
insertAlignedStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList)插入字符串格式的单设备对齐记录deviceId: 设备ID,times: 时间戳列表,measurementsList: 测点列表,valuesList: 值列表
insertAlignedStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList, boolean haveSorted)插入排序的字符串格式单设备对齐记录同上,增加 haveSorted: 数据是否已排序
insertTablet(Tablet tablet)插入单个Tablet数据tablet: 要插入的Tablet数据
insertTablet(Tablet tablet, boolean sorted)插入排序的Tablet数据同上,增加 sorted: 数据是否已排序
insertAlignedTablet(Tablet tablet)插入对齐的Tablet数据tablet: 要插入的Tablet数据
insertAlignedTablet(Tablet tablet, boolean sorted)插入排序的对齐Tablet数据同上,增加 sorted: 数据是否已排序
insertTablets(Map<String, Tablet> tablets)批量插入多个Tablet数据tablets: 设备ID到Tablet的映射表
insertTablets(Map<String, Tablet> tablets, boolean sorted)批量插入排序的多个Tablet数据同上,增加 sorted: 数据是否已排序
insertAlignedTablets(Map<String, Tablet> tablets)批量插入多个对齐Tablet数据tablets: 设备ID到Tablet的映射表
insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)批量插入排序的多个对齐Tablet数据同上,增加 sorted: 数据是否已排序
3.2.3 数据删除
方法名功能描述参数解释
deleteTimeseries(String path)删除单个时间序列path: 时间序列路径
deleteTimeseries(List<String> paths)批量删除时间序列paths: 时间序列路径列表
deleteData(String path, long endTime)删除指定路径的历史数据path: 路径,endTime: 结束时间戳
deleteData(List<String> paths, long endTime)批量删除路径的历史数据paths: 路径列表,endTime: 结束时间戳
deleteData(List<String> paths, long startTime, long endTime)删除路径时间范围内的历史数据同上,增加 startTime: 起始时间戳
3.2.4 数据查询
方法名功能描述参数解释
executeQueryStatement(String sql)执行查询语句sql: 查询SQL语句
executeQueryStatement(String sql, long timeoutInMs)执行带超时的查询语句sql: 查询SQL语句,timeoutInMs: 查询超时时间(毫秒),默认取服务器配置即60s
executeRawDataQuery(List<String> paths, long startTime, long endTime)查询指定路径的原始数据paths: 查询路径列表,startTime: 起始时间戳,endTime: 结束时间戳
executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut)查询指定路径的原始数据(带超时)同上,增加 timeOut: 超时时间
executeLastDataQuery(List<String> paths)查询最新数据paths: 查询路径列表
executeLastDataQuery(List<String> paths, long lastTime)查询指定时间的最新数据paths: 查询路径列表,lastTime: 指定的时间戳
executeLastDataQuery(List<String> paths, long lastTime, long timeOut)查询指定时间的最新数据(带超时)同上,增加 timeOut: 超时时间
executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes)查询单个设备的最新数据db: 数据库名,device: 设备名,sensors: 传感器列表,isLegalPathNodes: 是否合法路径节点
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations)执行聚合查询paths: 查询路径列表,aggregations: 聚合类型列表
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime)执行带时间范围的聚合查询同上,增加 startTime: 起始时间戳,endTime: 结束时间戳
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval)执行带时间间隔的聚合查询同上,增加 interval: 时间间隔
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep)执行滑动窗口聚合查询同上,增加 slidingStep: 滑动步长
fetchAllConnections()获取所有活动连接信息无参数
3.2.5 系统状态与备份
方法名功能描述参数解释
getBackupConfiguration()获取备份配置信息无参数
fetchAllConnections()获取所有活动的连接信息无参数
getSystemStatus()获取系统状态已废弃,默认返回 SystemStatus.NORMAL