blob: a4afa6c4fdaef2271bd1d24b62e2b3243dbf781e [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Go Native API
The Go Native API supports interaction with the database through both `Session` and `SessionPool` methods. Since `Session` is not thread-safe, using `SessionPool` is strongly recommended for programming. In multi-threaded concurrent scenarios, `SessionPool` can reasonably manage and allocate connection resources to enhance system performance and resource utilization efficiency.
This article focuses on the usage of `SessionPool`, covering the complete process from environment preparation and core operation steps to the full set of interfaces.
## 1. Environment Preparation
### 1.1 Prerequisites
- golang >= 1.13
- make >= 3.0
- curl >= 7.1.1
- thrift: 0.15.0
- Linux, MacOS, or other Unix-like systems
- Windows + bash (Git is needed to download the IoTDB Go client; any one of WSL, cygwin, or Git Bash is acceptable)
### 1.2 Installation Methods
- **Using go mod**
```bash
# Switch to the HOME path of GOPATH and enable the Go Modules feature
export GO111MODULE=on
# Configure the GOPROXY environment variable
export GOPROXY=https://goproxy.io
# Create a named folder or directory and switch to it
mkdir session_example && cd session_example
# Save the file, which will automatically redirect to the new address
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# Initialize the go module environment
go mod init session_example
# Download dependency packages
go mod tidy
# Compile and run the program
go run session_example.go
```
- **Using GOPATH**
```bash
# Get thrift 0.13.0
go get github.com/apache/thrift@0.13.0
# Recursively create the directory
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
# Switch to the current directory
cd $GOPATH/src/iotdb-client-go-example/session_example
# Save the file, which will automatically redirect to the new address
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# Initialize the go module environment
go mod init
# Download dependency packages
go mod tidy
# Compile and run the program
go run session_example.go
```
## 2. Core Steps
The three core steps for using the Go native interface to operate IoTDB are as follows:
1. **Create a connection pool instance**: Initialize a `SessionPool` object, configuring connection parameters and pool size.
2. **Execute database operations**: `GetSession()` from the pool, perform operations like data writing or querying, and **must** `PutBack(session)` upon completion.
3. **Close connection pool resources**: Call `sessionPool.Close()` at the end of the program to release all connections.
The following sections illustrate the core development workflow and do not demonstrate all parameters and interfaces. For the complete functionality and parameters, please refer to: **[Full Interface Description](../API/Programming-Go-Native-API.md#_3-full-interface-list)** or check: **[SessionPool Example Source Code](https://github.com/apache/iotdb-client-go/blob/main/example/session_pool/session_pool_example.go)**
### 2.1 Create Connection Pool Instance
- **Single Instance**
```go
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
```
- **Distributed or Active-Active**
```go
config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
```
### 2.2 Database Operations
#### 2.2.1 Data Insertion
```go
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
```
#### 2.2.2 Data Query
```go
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
```
### 2.3 Usage Example
```go
import (
"flag"
"fmt"
"log"
"math/rand"
"strings"
"time"
"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)
var (
host string
port string
user string
password string
)
var sessionPool client.SessionPool
func main() {
flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
flag.StringVar(&port, "port", "6667", "--port=6667")
flag.StringVar(&user, "user", "root", "--user=root")
flag.StringVar(&password, "password", "root", "--password=root")
flag.Parse()
// 1. Create connection pool
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
// 2. Create storage group
setStorageGroup("root.sg1")
// 3. Create time series
createTimeseries("root.sg1.dev1.temperature")
// 4. Data insertion
insertTablet()
// 5. Data query
executeQueryStatement("select temperature from root.sg1.dev1")
// 6. Deletion
deleteTimeseries("root.sg1.dev1.temperature")
deleteStorageGroup("root.sg1")
}
// Set storage group
func setStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
session.SetStorageGroup(sg)
}
}
// Delete storage group
func deleteStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteStorageGroup(sg))
}
}
// Create time series
func createTimeseries(path string) {
var (
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
}
// Delete time series
func deleteTimeseries(paths ...string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteTimeseries(paths))
}
}
// Insert Tablet data
func insertTablet() {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
} else {
log.Fatal(err)
}
}
}
// Create Tablet
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{
{
Measurement: "temperature",
DataType: client.FLOAT,
},
}, rowCount)
if err != nil {
return nil, err
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Float32(), 0, row)
tablet.RowSize++
}
return tablet, nil
}
// Execute query statement
func executeQueryStatement(sql string) {
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
}
// Print query results
func printDataSet(sds *client.SessionDataSet) {
columnNames := sds.GetColumnNames()
for _, value := range columnNames {
fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
for _, columnName := range columnNames {
isNull, _ := sds.IsNull(columnName)
if isNull {
fmt.Printf("%v\t\t", "null")
} else {
v, _ := sds.GetString(columnName)
fmt.Printf("%v\t\t", v)
}
}
fmt.Println()
}
}
// Check error
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
```
## 3. Full Interface List
### 3.1 SessionPool Management Interfaces
| Interface Name | Function Description | Parameter Description |
|:-------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool` | Creates and returns a Session connection pool instance. | `config`: Pool configuration<br> `maxSize`: Maximum connections (≤0 uses CPU count * 5)<br>`connTimeoutMs`: TCP connection timeout (ms)<br>`waitTimeoutMs`: Session acquisition wait timeout (ms)<br>`enableComp`: Whether to enable compression |
| `GetSession() (Session, error)` | Gets an available Session from the pool. Blocks if the pool is full, returns error on timeout. Must be paired with `PutBack`. | None |
| `PutBack(session Session)` | Returns a used Session back to the connection pool. | `session`: The instance obtained from `GetSession` |
| `Close()` | Closes the connection pool, releasing all active connections. Must be called before program exit. | None |
### 3.2 Data Insertion Interfaces
*The following interfaces are called via the obtained Session.*
| Interface Name | Function Description | Parameter Description |
|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | Inserts a single record. | `deviceId`: Device ID <br>`measurements`: Measurement list<br>`dataTypes`: Data type list<br>`values`: Value list<br>`timestamp`: Timestamp |
| `InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | Inserts a single aligned record. | `deviceId`: Device ID<br>`measurements`: Measurement list<br>`dataTypes`: Data type list<br>`values`: Value list<br>`timestamp`: Timestamp |
| `InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)` | Inserts a single record in string format. | `deviceId`: Device ID<br>`measurements`: Measurement list<br>`values`: String-type value list<br>`timestamp`: Timestamp |
| `InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | Inserts multiple records for multiple devices. | `deviceIds`: Device ID list<br>`measurements`: 2D measurement list<br>`dataTypes`: 2D data type list<br>`values`: 2D value list<br>`timestamps`: Timestamp list |
| `InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | Inserts multiple records for multiple aligned devices. | `deviceIds`: Device ID list<br>`measurements`: 2D measurement list<br>`dataTypes`: 2D data type list<br>`values`: 2D value list<br>`timestamps`: Timestamp list |
| `InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | Inserts multiple rows of data for a single device. | `tablet`: The Tablet data to insert<br>`sorted`: Whether the data is sorted |
| `InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | Inserts multiple rows of data for a single aligned device. | `tablet`: The Tablet data to insert<br>`sorted`: Whether the data is sorted |
| `InsertTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | Batch inserts multiple Tablet data. | `tablets`: Multiple Tablet data to insert<br>`sorted`: Whether the data is sorted |
| `InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | Batch inserts multiple aligned devices' data. | `tablets`: Multiple Tablet data to insert<br>`sorted`: Whether the data is sorted |
### 3.3 SQL and Query Interfaces
*The following interfaces are called via the obtained Session.*
| Interface Name | Function Description | Parameter Description |
|:---------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `ExecuteStatement(sql string)(SessionDataSet, error)` | Executes SQL (primarily for queries), returns a SessionDataSet. | `sql`: The SQL query statement to execute |
| `ExecuteQueryStatement(sql string, timeoutMs *int64) (SessionDataSet, error)` | Executes a query SQL with optional timeout, returns a SessionDataSet. | `sql`: The SQL query statement to execute <br> `timeoutMs`: Query timeout time (milliseconds) |
| `ExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)` | Executes SQL that does not return a result set (e.g., INSERT, CREATE, DELETE). | `sql`: The SQL statement to execute |
| `ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)` | Queries raw data for specified time series within a time range. | `paths`: Query path list <br>`startTime`: Start timestamp<br>`endTime`: End timestamp |
| `ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)` | Executes an aggregation query (COUNT, AVG, etc.). | `paths`: Query path list<br>`aggregations`: Aggregation type list<br>`startTime, endTime, interval`: Start time, end time, and interval<br>`timeoutMs`: Query timeout time |
| `ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)` | Executes multiple SQL statements in batch. | `sqls`: The SQL statements to execute |
### 3.4 Metadata Operation Interfaces
*The following interfaces are called via the obtained Session.*
| Interface Name | Function Description | Parameter Description |
|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | Creates a database (storage group). | `storageGroupId`: Database (storage group) name |
| `DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | Deletes a database (storage group). | `storageGroupId`: The database (storage group) name to delete |
| `DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)` | Deletes multiple databases (storage groups). | `storageGroupIds`: The list of database (storage group) names to delete |
| `CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)` | Creates a non-aligned time series. | `path`: Time series path<br>`dataType`: Data type<br>`encoding`: Encoding method<br>`compressor`: Compression algorithm<br>`attributes`: (Optional) Series attributes<br>`tags`: (Optional) Series tags |
| `CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)` | Creates a group of aligned time series. | `prefixPath`: Time series path prefix <br>`measurements`: Measurement name list<br>`dataTypes, encodings, compressors`: Data type, encoding, and compressor list for each measurement<br>`measurementAlias`: (Optional) Alias list for each measurement |
| `DeleteTimeseries(paths []string) (r common.TSStatus, err error)` | Deletes multiple time series (including their data). | `paths`: The list of time series paths to delete |
| `DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)` | Deletes data within a time period for specified time series (metadata is preserved). | `paths`: The list of time series paths<br>`startTime`: Start timestamp<br>`endTime`: End timestamp |
| `SetTimeZone(timeZone string) (r common.TSStatus, err error)` | Sets the time zone for the current session. | `timeZone`: Time zone string, e.g., "UTC", "Asia/Shanghai", "GMT+8" |
| `GetTimeZone() (string, error)` | Gets the time zone of the current session. | None |
### 3.5 Key Configuration Structure (PoolConfig)
| Field | Type | Required | Description |
|:------------|:-----------|:------------------------------|:-------------------------------------------------------------------------|
| `Host` | `string` | Choose one with `NodeUrls` | Single-node host address. |
| `Port` | `string` | Choose one with `NodeUrls` | Single-node port. |
| `NodeUrls` | `[]string` | Choose one with `Host`/`Port` | Cluster node address list, format: `"host:port"`. |
| `UserName` | `string` | Yes | Username. |
| `Password` | `string` | Yes | Password. |
| `FetchSize` | `int32` | No | Query result set fetch size, default 1024. |
| `TimeZone` | `string` | No | Session time zone, e.g., "Asia/Shanghai". Default uses server time zone. |
| `Database` | `string` | No | For table model; used to set the session's default database. |