Go 原生接口

Go 原生 API 支持通过 Session SessionPool 两种方式与数据库进行交互。由于 Session 非线程安全,因此强烈推荐使用 SessionPool 编程。在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。

本文将围绕 SessionPool 的使用进行说明,涵盖从环境准备、核心操作步骤到全量接口的完整内容。

1. 环境准备

1.1 前置依赖

  • golang >= 1.13
  • make >= 3.0
  • curl >= 7.1.1
  • thrift: 0.15.0
  • Linux、Macos 或其他类 unix 系统
  • Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可)

1.2 安装方法

  • 使用 go mod
# 切换到 GOPATH 的 HOME 路径,启用 Go Modules 功能
export GO111MODULE=on

# 配置 GOPROXY 环境变量
export GOPROXY=https://goproxy.io

# 创建命名的文件夹或目录,并切换当前目录
mkdir session_example && cd session_example

# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go

# 初始化 go module 环境
go mod init session_example

# 下载依赖包
go mod tidy

# 编译并运行程序
go run session_example.go
  • 使用 GOPATH
# get thrift 0.13.0
go get github.com/apache/thrift@0.13.0

# 递归创建目录
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example

# 切换到当前目录
cd $GOPATH/src/iotdb-client-go-example/session_example

# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go

# 初始化 go module 环境
go mod init

# 下载依赖包
go mod tidy

# 编译并运行程序
go run session_example.go

2. 核心步骤

使用 Go 原生接口操作 IoTDB 的三个核心步骤如下:

  1. 创建连接池实例:初始化一个SessionPool对象,配置连接参数和池大小。
  2. 执行数据库操作:从连接池中GetSession(),执行数据写入或查询等操作,完成后必须PutBack(session)
  3. 关闭连接池资源:程序结束时调用sessionPool.Close(),释放所有连接。

下面的章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: 全量接口说明 或 查阅: SessionPool 示例源码

2.1 创建连接池实例

  • 单实例
config := &client.PoolConfig{
    Host:     host,
    Port:     port,
    UserName: user,
    Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
  • 分布式或双活
config := &client.PoolConfig{
                UserName: user,
                Password: password,
                NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
        }
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()

2.2 数据库操作

2.2.1 数据写入

session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)

2.2.2 数据查询

var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
    log.Print(err)
    return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
    defer sessionDataSet.Close()
    printDataSet(sessionDataSet)
} else {
    log.Println(err)
}

2.3 使用示例

import (
    "flag"
    "fmt"
    "log"
    "math/rand"
    "strings"
    "time"

    "github.com/apache/iotdb-client-go/v2/client"
    "github.com/apache/iotdb-client-go/v2/common"
)

var (
    host     string
    port     string
    user     string
    password string
)
var sessionPool client.SessionPool

func main() {
    flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
    flag.StringVar(&port, "port", "6667", "--port=6667")
    flag.StringVar(&user, "user", "root", "--user=root")
    flag.StringVar(&password, "password", "root", "--password=root")
    flag.Parse()
    
    //1.创建连接池
    config := &client.PoolConfig{
        Host:     host,
        Port:     port,
        UserName: user,
        Password: password,
    }
    sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)

    defer sessionPool.Close()
    
    //2.创建存储组
    setStorageGroup("root.sg1") 
    
    //3. 创建时间序列
    createTimeseries("root.sg1.dev1.temperature")
    
    //4.数据写入
    insertTablet()

    //5. 数据查询
    executeQueryStatement("select temperature from root.sg1.dev1") 
    
    //6. 删除
    deleteTimeseries("root.sg1.dev1.temperature")
    deleteStorageGroup("root.sg1")

}

// 设置存储组
func setStorageGroup(sg string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        session.SetStorageGroup(sg)
    }
}

// 删除存储组
func deleteStorageGroup(sg string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        checkError(session.DeleteStorageGroup(sg))
    }
}

// 创建时间序列
func createTimeseries(path string) {
    var (
        dataType   = client.FLOAT
        encoding   = client.PLAIN
        compressor = client.SNAPPY
    )
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
    }
}

// 删除时间序列
func deleteTimeseries(paths ...string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        checkError(session.DeleteTimeseries(paths))
    }
}

// 插入Tablet数据
func insertTablet() {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        if tablet, err := createTablet(12); err == nil {
            status, err := session.InsertTablet(tablet, false)
            tablet.Reset()
            checkError(status, err)
        } else {
            log.Fatal(err)
        }
    }
}

//创建Tablet
func createTablet(rowCount int) (*client.Tablet, error) {
    tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{
        {
            Measurement: "temperature",
            DataType:    client.FLOAT,
        },
    }, rowCount)

    if err != nil {
        return nil, err
    }
    ts := time.Now().UTC().UnixNano() / 1000000
    for row := 0; row < int(rowCount); row++ {
        ts++
        tablet.SetTimestamp(ts, row)
        tablet.SetValueAt(rand.Float32(), 0, row)
        tablet.RowSize++
    }
    return tablet, nil
}

// 执行查询语句
func executeQueryStatement(sql string) {
    var timeout int64 = 1000
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err != nil {
        log.Print(err)
        return
    }
    sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
    if err == nil {
        defer sessionDataSet.Close()
        printDataSet(sessionDataSet)
    } else {
        log.Println(err)
    }
}

// 打印查询结果
func printDataSet(sds *client.SessionDataSet) {
    columnNames := sds.GetColumnNames()
    for _, value := range columnNames {
        fmt.Printf("%s\t", value)
    }
    fmt.Println()

    for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
        for _, columnName := range columnNames {
            isNull, _ := sds.IsNull(columnName)

            if isNull {
                fmt.Printf("%v\t\t", "null")
            } else {
                v, _ := sds.GetString(columnName)
                fmt.Printf("%v\t\t", v)
            }
        }
        fmt.Println()
    }
}

// 检查错误
func checkError(status *common.TSStatus, err error) {
    if err != nil {
        log.Fatal(err)
    }

    if status != nil {
        if err = client.VerifySuccess(status); err != nil {
            log.Println(err)
        }
    }
}

3. 全量接口

3.1 SessionPool 管理接口

接口名称功能描述参数说明
NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool创建并返回一个Session连接池实例。config: 连接池配置
maxSize: 最大连接数(≤0时取CPU数*5)
connTimeoutMs: TCP连接超时(ms)
waitTimeoutMs: 获取Session等待超时(ms)
enableComp: 是否启用压缩
GetSession() (Session, error)从池中获取一个可用Session。若池满则阻塞等待,超时返回错误。​必须与PutBack配对使用​。
PutBack(session Session)将使用完毕的Session归还到连接池中。session: 从GetSession获取的实例
Close()关闭连接池,释放所有活跃连接。程序退出前必须调用。

3.2 数据写入接口

以下接口需通过获取的 Session 进行调用

接口名称功能描述参数说明
InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)插入单条记录。deviceId: 设备ID
measurements: 测点列表
dataTypes: 数据类型列表
values: 值列表
timestamp: 时间戳
InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)插入单条对齐记录。deviceId: 设备ID
measurements: 测点列表
dataTypes: 数据类型列表
values: 值列表
timestamp: 时间戳
InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)插入字符串格式的单条记录。deviceId: 设备ID
measurements: 测点列表
values: 字符串类型的值列表
timestamp: 时间戳
InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)插入多条记录。deviceIds: 设备ID列表
measurements:二维测点列表
dataTypes: 二维数据类型列表
values: 二维值列表
timestamps: 时间戳列表
InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)插入多个对齐设备的多条记录。deviceIds: 设备ID列表
measurements:二维测点列表
dataTypes: 二维数据类型列表
values: 二维值列表
timestamps: 时间戳列表
InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)插入单个设备的多条数据。tablet: 要插入的Tablet数据
sorted: 数据是否已排序
InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)插入单个对齐设备的多条数据。tablet: 要插入的Tablet数据
sorted: 数据是否已排序
InsertTablets``(tablets []Tablet, sorted bool) (r common.TSStatus, err error)批量插入多个 Tablet 数据。tablets: 要插入的多个Tablet 数据
sorted: 数据是否已排序
InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)批量插入多个对齐设备的数据。tablets: 要插入的多个Tablet 数据
sorted: 数据是否已排序

3.3 SQL与查询接口

以下接口需通过获取的 Session 进行调用

接口名称功能描述参数说明
ExecuteStatement(sql string)(SessionDataSet, error)执行SQL(主要查询),返回SessionDataSetsql:要执行的SQL查询语句
ExecuteQueryStatement(sql string, timeoutMs int64) (SessionDataSet, error)执行查询SQL,可指定超时,返回SessionDataSetsql:要执行的SQL查询语句
timeoutMs: 查询超时时间(毫秒)
ExecuteNonQueryStatementExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)执行不返回结果集的SQL(如INSERT, CREATE, DELETE)。sql:要执行的SQL语句
ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)查询指定时间序列在时间范围内的原始数据。paths: 查询路径列表
startTime: 起始时间戳
endTime: 结束时间戳
ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)执行聚合查询(COUNT, AVG等)。paths: 查询路径列表
aggregations: 聚合类型列表
startTime, endTime, interval: 起始时间、结束时间和间隔时间
timeoutMs: 查询超时时间
ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)批量执行多条SQL语句。sqls:要执行的SQL语句

3.4 元数据操作接口

以下接口需通过获取的 Session 进行调用

接口名称功能描述参数说明
SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)创建数据库(存储组)。storageGroupId:数据库(存储组)名称
DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)删除一个数据库(存储组)。storageGroupId:要删除的数据库(存储组)名称
DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)删除多个数据库(存储组)。storageGroupIds:要删除的数据库(存储组)名称列表
CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)创建非对齐时间序列。path: 时间序列路径
dataType: 数据类型
encoding: 编码方式
compressor: 压缩算法
attributes: (可选)序列属性
tags: (可选)序列标签
CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)创建一组对齐时间序列。prefixPath: 时间序列路径前缀
measurements: 测点名称列表
dataTypes, encodings, compressors: 每个测点对应的数据类型、编码和压缩算法列表
measurementAlias: (可选)每个测点的别名列表
DeleteTimeseries(paths []string) (r common.TSStatus, err error)删除多条时间序列(含数据)。paths:要删除的时间序列路径列表
DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)删除指定时间序列在时间段内的数据(保留元数据)。paths: 要删除的时间序列路径列表
startTime: 起始时间戳
endTime: 结束时间戳。
SetTimeZone(timeZone string) (r common.TSStatus, err error)设置当前会话时区。timeZone: 时区字符串,例如 ”UTC”, ”Asia/Shanghai”, ”GMT+8”
GetTimeZone() (string, error)获取当前会话时区。

3.5 关键配置结构 (PoolConfig)

字段类型必填描述
Hoststring与NodeUrls二选一单节点主机地址。
Portstring与NodeUrls二选一单节点端口。
NodeUrls[]string与Host/Port二选一集群节点地址列表,格式为”host:port”
UserNamestring用户名。
Passwordstring密码。
FetchSizeint32查询结果集获取大小,默认1024。
TimeZonestring会话时区,如”Asia/Shanghai”,默认使用服务端时区。
Databasestring表模型适用,用于设置会话默认数据库。