blob: fe9e49b9b2f33fd982c93baf12099376e99e6c62 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Go 原生接口
Go 原生 API 支持通过 `Session `和 `SessionPool `两种方式与数据库进行交互。由于 `Session `非线程安全,因此强烈推荐使用 `SessionPool `编程。在多线程并发的情形下,`SessionPool `能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。
本文将围绕 `SessionPool` 的使用进行说明,涵盖从环境准备、核心操作步骤到全量接口的完整内容。
## 1. 环境准备
### 1.1 前置依赖
* golang >= 1.13
* make >= 3.0
* curl >= 7.1.1
* thrift: 0.15.0
* Linux、Macos 或其他类 unix 系统
* Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可)
### 1.2 安装方法
* 使用 go mod
```Bash
# 切换到 GOPATH 的 HOME 路径,启用 Go Modules 功能
export GO111MODULE=on
# 配置 GOPROXY 环境变量
export GOPROXY=https://goproxy.io
# 创建命名的文件夹或目录,并切换当前目录
mkdir session_example && cd session_example
# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 环境
go mod init session_example
# 下载依赖包
go mod tidy
# 编译并运行程序
go run session_example.go
```
* 使用 GOPATH
```Bash
# get thrift 0.13.0
go get github.com/apache/thrift@0.13.0
# 递归创建目录
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
# 切换到当前目录
cd $GOPATH/src/iotdb-client-go-example/session_example
# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 环境
go mod init
# 下载依赖包
go mod tidy
# 编译并运行程序
go run session_example.go
```
## 2. 核心步骤
使用 Go 原生接口操作 IoTDB 的三个核心步骤如下:
1. 创建连接池实例:初始化一个`SessionPool`对象,配置连接参数和池大小。
2. 执行数据库操作:从连接池中`GetSession()`,执行数据写入或查询等操作,完成后必须`PutBack(session)`。
3. 关闭连接池资源:程序结束时调用`sessionPool.Close()`,释放所有连接。
下面的章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](../API/Programming-Go-Native-API.md#_3-全量接口) 或 查阅: [SessionPool 示例源码](https://github.com/apache/iotdb-client-go/blob/main/example/session_pool/session_pool_example.go)
### 2.1 创建连接池实例
* 单实例
```Go
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer​ ​sessionPool.Close()
```
* 分布式或双活
```Go
config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer​ ​sessionPool.Close()
```
### 2.2 数据库操作
#### 2.2.1 数据写入
```Go
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
```
#### 2.2.2 数据查询
```Go
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
```
### 2.3 使用示例
```Go
import (
"flag"
"fmt"
"log"
"math/rand"
"strings"
"time"
"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)
var (
host string
port string
user string
password string
)
var sessionPool client.SessionPool
func main() {
flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
flag.StringVar(&port, "port", "6667", "--port=6667")
flag.StringVar(&user, "user", "root", "--user=root")
flag.StringVar(&password, "password", "root", "--password=root")
flag.Parse()
//1.创建连接池
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
//2.创建存储组
setStorageGroup("root.sg1")
//3. 创建时间序列
createTimeseries("root.sg1.dev1.temperature")
//4.数据写入
insertTablet()
//5. 数据查询
executeQueryStatement("select temperature from root.sg1.dev1")
//6. 删除
deleteTimeseries("root.sg1.dev1.temperature")
deleteStorageGroup("root.sg1")
}
// 设置存储组
func setStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
session.SetStorageGroup(sg)
}
}
// 删除存储组
func deleteStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteStorageGroup(sg))
}
}
// 创建时间序列
func createTimeseries(path string) {
var (
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
}
// 删除时间序列
func deleteTimeseries(paths ...string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteTimeseries(paths))
}
}
// 插入Tablet数据
func insertTablet() {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
} else {
log.Fatal(err)
}
}
}
//创建Tablet
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{
{
Measurement: "temperature",
DataType: client.FLOAT,
},
}, rowCount)
if err != nil {
return nil, err
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Float32(), 0, row)
tablet.RowSize++
}
return tablet, nil
}
// 执行查询语句
func executeQueryStatement(sql string) {
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
}
// 打印查询结果
func printDataSet(sds *client.SessionDataSet) {
columnNames := sds.GetColumnNames()
for _, value := range columnNames {
fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
for _, columnName := range columnNames {
isNull, _ := sds.IsNull(columnName)
if isNull {
fmt.Printf("%v\t\t", "null")
} else {
v, _ := sds.GetString(columnName)
fmt.Printf("%v\t\t", v)
}
}
fmt.Println()
}
}
// 检查错误
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
```
## 3. 全量接口
### 3.1 SessionPool 管理接口
| 接口名称 | 功能描述 | 参数说明 |
|--------------------------------------------------------------------------------------------------------------|--------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| `NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool` | 创建并返回一个Session连接池实例。 | `config`: 连接池配置 <br>`maxSize`: 最大连接数(≤0时取CPU数\*5) <br>`connTimeoutMs`: TCP连接超时(ms) <br>`waitTimeoutMs`: 获取Session等待超时(ms) <br>`enableComp`: 是否启用压缩 |
| `GetSession() (Session, error)` | 从池中获取一个可用Session。若池满则阻塞等待,超时返回错误。​**必须与PutBack配对使用**​。 | 无 |
| `PutBack(session Session)` | 将使用完毕的Session归还到连接池中。 | `session`: 从GetSession获取的实例 |
| `Close()` | 关闭连接池,释放所有活跃连接。程序退出前必须调用。 | 无 |
### 3.2 数据写入接口
以下接口需通过获取的 Session 进行调用
| 接口名称 | 功能描述 | 参数说明 |
|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | 插入单条记录。| `deviceId`: 设备ID <br>`measurements`: 测点列表 <br>`dataTypes`: 数据类型列表<br>`values`: 值列表<br>`timestamp`: 时间戳 |
| `InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | 插入单条对齐记录。 | `deviceId`: 设备ID<br>`measurements`: 测点列表<br>`dataTypes`: 数据类型列表<br>`values`: 值列表<br>`timestamp`: 时间戳 |
| `InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)` | 插入字符串格式的单条记录。 | `deviceId`: 设备ID<br>`measurements`: 测点列表<br>`values`: 字符串类型的值列表<br>`timestamp`: 时间戳 |
| `InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | 插入多条记录。| `deviceIds`: 设备ID列表<br>`measurements`:二维测点列表<br>`dataTypes`: 二维数据类型列表<br>`values`: 二维值列表<br>`timestamps`: 时间戳列表 |
| `InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | 插入多个对齐设备的多条记录。 | `deviceIds`: 设备ID列表<br>`measurements`:二维测点列表<br>`dataTypes`: 二维数据类型列表<br>`values`: 二维值列表<br>`timestamps`: 时间戳列表 |
| `InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | 插入单个设备的多条数据。 | `tablet`: 要插入的`Tablet`数据<br>`sorted`: 数据是否已排序 |
| `InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | 插入单个对齐设备的多条数据。 | `tablet`: 要插入的`Tablet`数据<br>`sorted`: 数据是否已排序 |
| `InsertTablets``(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | 批量插入多个 Tablet 数据。 | `tablets`: 要插入的多个`Tablet `数据<br>`sorted`: 数据是否已排序 |
| `InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | 批量插入多个对齐设备的数据。 | `tablets`: 要插入的多个`Tablet `数据<br>`sorted`: 数据是否已排序 |
### 3.3 SQL与查询接口
以下接口需通过获取的 Session 进行调用
| 接口名称 | 功能描述 | 参数说明 |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------|
| `ExecuteStatement(sql string)`*`(`*`SessionDataSet, error)` | 执行SQL(主要查询),返回`SessionDataSet`。 | `sql`:要执行的SQL查询语句 |
| `ExecuteQueryStatement(sql string, timeoutMs int64) (SessionDataSet, error)` | 执行查询SQL,可指定超时,返回`SessionDataSet`。 | `sql`:要执行的SQL查询语句<br>`timeoutMs`: 查询超时时间(毫秒) |
| `ExecuteNonQueryStatementExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)` | 执行不返回结果集的SQL(如INSERT, CREATE, DELETE)。 | `sql`:要执行的SQL语句 |
| `ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)` | 查询指定时间序列在时间范围内的原始数据。 | `paths`: 查询路径列表<br>`startTime`: 起始时间戳<br>`endTime`: 结束时间戳 |
| `ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)` | 执行聚合查询(COUNT, AVG等)。 | `paths`: 查询路径列表<br>`aggregations`: 聚合类型列表<br>`startTime, endTime, interval`: 起始时间、结束时间和间隔时间<br>`timeoutMs`: 查询超时时间 |
| `ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)` | 批量执行多条SQL语句。 | `sqls`:要执行的SQL语句 |
### 3.4 元数据操作接口
以下接口需通过获取的 Session 进行调用
| 接口名称 | 功能描述 | 参数说明 |
| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| `SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | 创建数据库(存储组)。 | `storageGroupId`:数据库(存储组)名称 |
| `DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | 删除一个数据库(存储组)。 | `storageGroupId`:要删除的数据库(存储组)名称 |
| `DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)` | 删除多个数据库(存储组)。 | `storageGroupIds`:要删除的数据库(存储组)名称列表 |
| `CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)` | 创建非对齐时间序列。 | `path`: 时间序列路径<br>`dataType`: 数据类型<br>`encoding`: 编码方式<br>`compressor`: 压缩算法<br>`attributes`: (可选)序列属性<br>`tags`: (可选)序列标签 |
| `CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)` | 创建一组对齐时间序列。 | `prefixPath`: 时间序列路径前缀<br>`measurements`: 测点名称列表<br>`dataTypes, encodings, compressors`: 每个测点对应的数据类型、编码和压缩算法列表<br>`measurementAlias`: (可选)每个测点的别名列表 |
| `DeleteTimeseries(paths []string) (r common.TSStatus, err error)` | 删除多条时间序列(含数据)。 | `paths`:要删除的时间序列路径列表 |
| `DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)` | 删除指定时间序列在时间段内的数据(保留元数据)。 | `paths`: 要删除的时间序列路径列表<br>`startTime`: 起始时间戳<br>`endTime`: 结束时间戳。 |
| `SetTimeZone(timeZone string) (r common.TSStatus, err error)` | 设置当前会话时区。 | `timeZone`: 时区字符串,例如 ”UTC”, ”Asia/Shanghai”, ”GMT+8” |
| `GetTimeZone() (string, error)` | 获取当前会话时区。 | 无 |
### 3.5 关键配置结构 (PoolConfig)
| 字段 | 类型 | 必填 | 描述 |
| ----------------- | ---------------- | ------------------- | ----------------------------------------------------------- |
| `Host` | `string` | 与NodeUrls二选一 | 单节点主机地址。 |
| `Port` | `string` | 与NodeUrls二选一 | 单节点端口。 |
| `NodeUrls` | `[]string` | 与Host/Port二选一 | 集群节点地址列表,格式为`”host:port”`。 |
| `UserName` | `string` | 是 | 用户名。 |
| `Password` | `string` | 是 | 密码。 |
| `FetchSize` | `int32` | 否 | 查询结果集获取大小,默认1024。 |
| `TimeZone` | `string` | 否 | 会话时区,如`”Asia/Shanghai”`,默认使用服务端时区。 |
| `Database` | `string` | 否 | 表模型适用,用于设置会话默认数据库。 |