blob: da1533a2e7ddbc333bf3e79c97f12ddb84ad0fd7 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Go 原生接口
## 1. 使用方法
### 1.1 依赖
* golang >= 1.13
* make >= 3.0
* curl >= 7.1.1
* thrift 0.15.0
* Linux、Macos 或其他类 unix 系统
* Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可)
### 1.2 安装方法
* 通过 go mod
```sh
# 切换到 GOPATH 的 HOME 路径,启用 Go Modules 功能
export GO111MODULE=on
# 配置 GOPROXY 环境变量
export GOPROXY=https://goproxy.io
# 创建命名的文件夹或目录,并切换当前目录
mkdir session_example && cd session_example
# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 环境
go mod init session_example
# 下载依赖包
go mod tidy
# 编译并运行程序
go run session_example.go
```
* 通过 GOPATH
```sh
# get thrift 0.13.0
go get github.com/apache/thrift@0.13.0
# 递归创建目录
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
# 切换到当前目录
cd $GOPATH/src/iotdb-client-go-example/session_example
# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 环境
go mod init
# 下载依赖包
go mod tidy
# 编译并运行程序
go run session_example.go
```
## 2. ITableSession 接口
### 2.1 功能描述
ITableSession 接口定义了与 IoTDB 交互的基本操作,可以执行数据插入、查询操作以及关闭会话等,非线程安全。
### 2.2 方法列表
以下是 ITableSession 接口中定义的方法及其详细说明:
| **方法名** | **描述** | **参数** | **返回值** | **返回异常** |
| -------------------------------------------------------------- | ---------------------------------------------------------------------- | --------------------------------------------------------------------------- | ---------------------------------------------------- | ----------------------------------------------- |
| `Insert(tablet *Tablet)` | 将一个包含时间序列数据的Tablet 插入到数据库中| `tablet`: 要插入的`Tablet`| `TSStatus`:执行结果的状态,由 common 包定义。 | `errer`:操作过程中的错误(如连接失败)。 |
| `xecuteNonQueryStatement(sql string)`| 执行非查询 SQL 语句,如 DDL (数据定义语言)或 DML (数据操作语言)命令 | `sql`: 要执行的 SQL 语句。| 同上 | 同上|
| `ExecuteQueryStatement (sql string, timeoutInMs *int64)` | 执行查询 SQL 语句,并返回查询结果集 | `sql`: 要执行的查询 SQL 语句。`timeoutInMs`: 查询超时时间(毫秒) | `SessionDataSet`:查询结果数据集。 | `errer`:操作过程中的错误(如连接失败)。 |
| `Close()` | 关闭会话,释放所持有的资源 | 无 | 无 | `errer`:关闭连接过程中的错误 |
### 2.3 接口展示
1. ITableSession
```go
// ITableSession defines an interface for interacting with IoTDB tables.
// It supports operations such as data insertion, executing queries, and closing the session.
// Implementations of this interface are expected to manage connections and ensure
// proper resource cleanup.
//
// Each method may return an error to indicate issues such as connection errors
// or execution failures.
//
// Since this interface includes a Close method, it is recommended to use
// defer to ensure the session is properly closed.
type ITableSession interface {
// Insert inserts a Tablet into the database.
//
// Parameters:
// - tablet: A pointer to a Tablet containing time-series data to be inserted.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation, such as a connection error or execution failure.
Insert(tablet *Tablet) (r *common.TSStatus, err error)
// ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL or DML command.
//
// Parameters:
// - sql: The SQL statement to execute.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation, such as a connection error or execution failure.
ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error)
// ExecuteQueryStatement executes a query SQL statement and returns the result set.
//
// Parameters:
// - sql: The SQL query statement to execute.
// - timeoutInMs: A pointer to the timeout duration in milliseconds for the query execution.
//
// Returns:
// - result: A pointer to SessionDataSet containing the query results.
// - err: An error if an issue occurs during the operation, such as a connection error or execution failure.
ExecuteQueryStatement(sql string, timeoutInMs *int64) (*SessionDataSet, error)
// Close closes the session, releasing any held resources.
//
// Returns:
// - err: An error if there is an issue with closing the IoTDB connection.
Close() (err error)
}
```
2. 构造 TableSession
* Config 中不需要手动设置 sqlDialect,使用时只需要使用对应的 NewSession 函数
```Go
type Config struct {
Host string
Port string
UserName string
Password string
FetchSize int32
TimeZone string
ConnectRetryMax int
sqlDialect string
Version Version
Database string
}
type ClusterConfig struct {
NodeUrls []string //ip:port
UserName string
Password string
FetchSize int32
TimeZone string
ConnectRetryMax int
sqlDialect string
Database string
}
// NewTableSession creates a new TableSession instance using the provided configuration.
//
// Parameters:
// - config: The configuration for the session.
// - enableRPCCompression: A boolean indicating whether RPC compression is enabled.
// - connectionTimeoutInMs: The timeout in milliseconds for establishing a connection.
//
// Returns:
// - An ITableSession instance if the session is successfully created.
// - An error if there is an issue during session initialization.
func NewTableSession(config *Config, enableRPCCompression bool, connectionTimeoutInMs int) (ITableSession, error)
// NewClusterTableSession creates a new TableSession instance for a cluster setup.
//
// Parameters:
// - clusterConfig: The configuration for the cluster session.
// - enableRPCCompression: A boolean indicating whether RPC compression is enabled.
//
// Returns:
// - An ITableSession instance if the session is successfully created.
// - An error if there is an issue during session initialization.
func NewClusterTableSession(clusterConfig *ClusterConfig, enableRPCCompression bool) (ITableSession, error)
```
> 注意:
>
> 通过 *NewTableSession 或 NewClusterTableSession* 得到的 TableSession,连接已经建立,不需要额外的 open 操作。
### 2.4 示例代码
```Go
package main
import (
"flag"
"github.com/apache/iotdb-client-go/client"
"github.com/apache/iotdb-client-go/common"
"log"
"math/rand"
"strconv"
"time"
)
func main() {
flag.Parse()
config := &client.Config{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "TimechoDB@2021", //V2.0.6.x 之前默认密码是root
Database: "test_session",
}
session, err := client.NewTableSession(config, false, 0)
if err != nil {
log.Fatal(err)
}
defer session.Close()
checkError(session.ExecuteNonQueryStatement("create database test_db"))
checkError(session.ExecuteNonQueryStatement("use test_db"))
checkError(session.ExecuteNonQueryStatement("create table t1 (id1 string tag, id2 string tag, s1 text field, s2 text field)"))
insertRelationalTablet(session)
showTables(session)
query(session)
}
func getTextValueFromDataSet(dataSet *client.SessionDataSet, columnName string) string {
if dataSet.IsNull(columnName) {
return "null"
} else {
return dataSet.GetText(columnName)
}
}
func insertRelationalTablet(session client.ITableSession) {
tablet, err := client.NewRelationalTablet("t1", []*client.MeasurementSchema{
{
Measurement: "id1",
DataType: client.STRING,
},
{
Measurement: "id2",
DataType: client.STRING,
},
{
Measurement: "s1",
DataType: client.TEXT,
},
{
Measurement: "s2",
DataType: client.TEXT,
},
}, []client.ColumnCategory{client.TAG, client.TAG, client.FIELD, client.FIELD}, 1024)
if err != nil {
log.Fatal("Failed to create relational tablet {}", err)
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < 16; row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt("id1_field_"+strconv.Itoa(row), 0, row)
tablet.SetValueAt("id2_field_"+strconv.Itoa(row), 1, row)
tablet.SetValueAt("s1_value_"+strconv.Itoa(row), 2, row)
tablet.SetValueAt("s2_value_"+strconv.Itoa(row), 3, row)
tablet.RowSize++
}
checkError(session.Insert(tablet))
tablet.Reset()
for row := 0; row < 16; row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt("id1_field_1", 0, row)
tablet.SetValueAt("id2_field_1", 1, row)
tablet.SetValueAt("s1_value_"+strconv.Itoa(row), 2, row)
tablet.SetValueAt("s2_value_"+strconv.Itoa(row), 3, row)
nullValueColumn := rand.Intn(4)
tablet.SetValueAt(nil, nullValueColumn, row)
tablet.RowSize++
}
checkError(session.Insert(tablet))
}
func showTables(session client.ITableSession) {
timeout := int64(2000)
dataSet, err := session.ExecuteQueryStatement("show tables", &timeout)
if err != nil {
log.Fatal(err)
}
for {
hasNext, err := dataSet.Next()
if err != nil {
log.Fatal(err)
}
if !hasNext {
break
}
log.Printf("tableName is", dataSet.GetText("TableName"))
}
}
func query(session client.ITableSession) {
timeout := int64(2000)
dataSet, err := session.ExecuteQueryStatement("select * from t1", &timeout)
if err != nil {
log.Fatal(err)
}
for {
hasNext, err := dataSet.Next()
if err != nil {
log.Fatal(err)
}
if !hasNext {
break
}
log.Printf("%v %v %v %v", getTextValueFromDataSet(dataSet, "id1"), getTextValueFromDataSet(dataSet, "id2"), getTextValueFromDataSet(dataSet, "s1"), getTextValueFromDataSet(dataSet, "s2"))
}
}
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
```
## 3. TableSessionPool 接口
### 3.1 功能描述
TableSessionPool 是一个用于管理 ITableSession 实例的池。这个池可以帮助我们高效地重用连接,并且在不需要时正确地清理资源, 该接口定义了如何从池中获取会话以及如何关闭池的基本操作。
### 3.2 方法列表
| **方法名** | **描述** | **返回值** | **返回异常** |
| -------------------- | -------------------------------------------------------------------- | --------------------------- | --------------------------------- |
| `GetSession()` | 从池中获取一个 ITableSession 实例,用于与 IoTDB 交互。 | `ITableSession `实例| `error`:获取失败的错误原因 |
| `Close()` | 关闭会话池,释放任何持有的资源。关闭后,不能再从池中获取新的会话。 | 无 | 无 |
### 3.3 接口展示
1. TableSessionPool
```Go
// TableSessionPool manages a pool of ITableSession instances, enabling efficient
// reuse and management of resources. It provides methods to acquire a session
// from the pool and to close the pool, releasing all held resources.
//
// This implementation ensures proper lifecycle management of sessions,
// including efficient reuse and cleanup of resources.
// GetSession acquires an ITableSession instance from the pool.
//
// Returns:
// - A usable ITableSession instance for interacting with IoTDB.
// - An error if a session cannot be acquired.
func (spool *TableSessionPool) GetSession() (ITableSession, error) {
return spool.sessionPool.getTableSession()
}
// Close closes the TableSessionPool, releasing all held resources.
// Once closed, no further sessions can be acquired from the pool.
func (spool *TableSessionPool) Close()
```
2. 构造 TableSessionPool
```Go
type PoolConfig struct {
Host string
Port string
NodeUrls []string
UserName string
Password string
FetchSize int32
TimeZone string
ConnectRetryMax int
Database string
sqlDialect string
}
// NewTableSessionPool creates a new TableSessionPool with the specified configuration.
//
// Parameters:
// - conf: PoolConfig defining the configuration for the pool.
// - maxSize: The maximum number of sessions the pool can hold.
// - connectionTimeoutInMs: Timeout for establishing a connection in milliseconds.
// - waitToGetSessionTimeoutInMs: Timeout for waiting to acquire a session in milliseconds.
// - enableCompression: A boolean indicating whether to enable compression.
//
// Returns:
// - A TableSessionPool instance.
func NewTableSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int,
enableCompression bool) TableSessionPool
```
> 注意:
>
> * 通过 TableSessionPool 得到的 TableSession,如果已经在创建 TableSessionPool 指定了 Database,使用时可以不再指定 Database。
> * 如果使用过程中通过 use database 指定了其他 database,在 close 放回 TableSessionPool 时会自动恢复为 TableSessionPool 所用的 database。
### 3.4 示例代码
```go
package main
import (
"github.com/apache/iotdb-client-go/client"
"github.com/apache/iotdb-client-go/common"
"log"
"strconv"
"sync"
"sync/atomic"
"time"
)
func main() {
sessionPoolWithSpecificDatabaseExample()
sessionPoolWithoutSpecificDatabaseExample()
putBackToSessionPoolExample()
}
func putBackToSessionPoolExample() {
// should create database test_db before executing
config := &client.PoolConfig{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "TimechoDB@2021", //V2.0.6.x 之前默认密码是root
Database: "test_db",
}
sessionPool := client.NewTableSessionPool(config, 3, 60000, 4000, false)
defer sessionPool.Close()
num := 4
successGetSessionNum := int32(0)
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
dbName := "db" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to create database "+dbName+"because ", err)
return
}
atomic.AddInt32(&successGetSessionNum, 1)
defer func() {
time.Sleep(6 * time.Second)
// put back to session pool
session.Close()
}()
checkError(session.ExecuteNonQueryStatement("create database " + dbName))
checkError(session.ExecuteNonQueryStatement("use " + dbName))
checkError(session.ExecuteNonQueryStatement("create table table_of_" + dbName + " (id1 string tag, id2 string tag, s1 text field, s2 text field)"))
}()
}
wg.Wait()
log.Println("success num is", successGetSessionNum)
log.Println("All session's database have been reset.")
// the using database will automatically reset to session pool's database after the session closed
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to get session because ", err)
}
defer session.Close()
timeout := int64(3000)
dataSet, err := session.ExecuteQueryStatement("show tables", &timeout)
for {
hasNext, err := dataSet.Next()
if err != nil {
log.Fatal(err)
}
if !hasNext {
break
}
log.Println("table is", dataSet.GetText("TableName"))
}
}()
}
wg.Wait()
}
func sessionPoolWithSpecificDatabaseExample() {
// should create database test_db before executing
config := &client.PoolConfig{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "TimechoDB@2021", //V2.0.6.x 之前默认密码是root
Database: "test_db",
}
sessionPool := client.NewTableSessionPool(config, 3, 60000, 8000, false)
defer sessionPool.Close()
num := 10
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
tableName := "t" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to create table "+tableName+"because ", err)
return
}
defer session.Close()
checkError(session.ExecuteNonQueryStatement("create table " + tableName + " (id1 string tag, id2 string tag, s1 text field, s2 text field)"))
}()
}
wg.Wait()
}
func sessionPoolWithoutSpecificDatabaseExample() {
config := &client.PoolConfig{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "TimechoDB@2021", //V2.0.6.x 之前默认密码是root
}
sessionPool := client.NewTableSessionPool(config, 3, 60000, 8000, false)
defer sessionPool.Close()
num := 10
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
dbName := "db" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to create database ", dbName, err)
return
}
defer session.Close()
checkError(session.ExecuteNonQueryStatement("create database " + dbName))
checkError(session.ExecuteNonQueryStatement("use " + dbName))
checkError(session.ExecuteNonQueryStatement("create table t1 (id1 string tag, id2 string tag, s1 text field, s2 text field)"))
}()
}
wg.Wait()
}
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
```