The Go Native API supports interaction with the database through both Session and SessionPool methods. Since Session is not thread-safe, using SessionPool is strongly recommended for programming. In multi-threaded concurrent scenarios, SessionPool can reasonably manage and allocate connection resources to enhance system performance and resource utilization efficiency.
This article focuses on the usage of SessionPool, covering the complete process from environment preparation and core operation steps to the full set of interfaces.
Using go mod
# Switch to the HOME path of GOPATH and enable the Go Modules feature export GO111MODULE=on # Configure the GOPROXY environment variable export GOPROXY=https://goproxy.io # Create a named folder or directory and switch to it mkdir session_example && cd session_example # Save the file, which will automatically redirect to the new address curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go # Initialize the go module environment go mod init session_example # Download dependency packages go mod tidy # Compile and run the program go run session_example.go
Using GOPATH
# Get thrift 0.13.0 go get github.com/apache/thrift@0.13.0 # Recursively create the directory mkdir -p $GOPATH/src/iotdb-client-go-example/session_example # Switch to the current directory cd $GOPATH/src/iotdb-client-go-example/session_example # Save the file, which will automatically redirect to the new address curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go # Initialize the go module environment go mod init # Download dependency packages go mod tidy # Compile and run the program go run session_example.go
The three core steps for using the Go native interface to operate IoTDB are as follows:
SessionPool object, configuring connection parameters and pool size.GetSession() from the pool, perform operations like data writing or querying, and must PutBack(session) upon completion.sessionPool.Close() at the end of the program to release all connections.The following sections illustrate the core development workflow and do not demonstrate all parameters and interfaces. For the complete functionality and parameters, please refer to: Full Interface Description or check: SessionPool Example Source Code
Single Instance
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
Distributed or Active-Active
config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
import ( "flag" "fmt" "log" "math/rand" "strings" "time" "github.com/apache/iotdb-client-go/v2/client" "github.com/apache/iotdb-client-go/v2/common" ) var ( host string port string user string password string ) var sessionPool client.SessionPool func main() { flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100") flag.StringVar(&port, "port", "6667", "--port=6667") flag.StringVar(&user, "user", "root", "--user=root") flag.StringVar(&password, "password", "root", "--password=root") flag.Parse() // 1. Create connection pool config := &client.PoolConfig{ Host: host, Port: port, UserName: user, Password: password, } sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) defer sessionPool.Close() // 2. Create storage group setStorageGroup("root.sg1") // 3. Create time series createTimeseries("root.sg1.dev1.temperature") // 4. Data insertion insertTablet() // 5. Data query executeQueryStatement("select temperature from root.sg1.dev1") // 6. Deletion deleteTimeseries("root.sg1.dev1.temperature") deleteStorageGroup("root.sg1") } // Set storage group func setStorageGroup(sg string) { session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) if err == nil { session.SetStorageGroup(sg) } } // Delete storage group func deleteStorageGroup(sg string) { session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) if err == nil { checkError(session.DeleteStorageGroup(sg)) } } // Create time series func createTimeseries(path string) { var ( dataType = client.FLOAT encoding = client.PLAIN compressor = client.SNAPPY ) session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) if err == nil { checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil)) } } // Delete time series func deleteTimeseries(paths ...string) { session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) if err == nil { checkError(session.DeleteTimeseries(paths)) } } // Insert Tablet data func insertTablet() { session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) if err == nil { if tablet, err := createTablet(12); err == nil { status, err := session.InsertTablet(tablet, false) tablet.Reset() checkError(status, err) } else { log.Fatal(err) } } } // Create Tablet func createTablet(rowCount int) (*client.Tablet, error) { tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{ { Measurement: "temperature", DataType: client.FLOAT, }, }, rowCount) if err != nil { return nil, err } ts := time.Now().UTC().UnixNano() / 1000000 for row := 0; row < int(rowCount); row++ { ts++ tablet.SetTimestamp(ts, row) tablet.SetValueAt(rand.Float32(), 0, row) tablet.RowSize++ } return tablet, nil } // Execute query statement func executeQueryStatement(sql string) { var timeout int64 = 1000 session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) if err != nil { log.Print(err) return } sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) if err == nil { defer sessionDataSet.Close() printDataSet(sessionDataSet) } else { log.Println(err) } } // Print query results func printDataSet(sds *client.SessionDataSet) { columnNames := sds.GetColumnNames() for _, value := range columnNames { fmt.Printf("%s\t", value) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { for _, columnName := range columnNames { isNull, _ := sds.IsNull(columnName) if isNull { fmt.Printf("%v\t\t", "null") } else { v, _ := sds.GetString(columnName) fmt.Printf("%v\t\t", v) } } fmt.Println() } } // Check error func checkError(status *common.TSStatus, err error) { if err != nil { log.Fatal(err) } if status != nil { if err = client.VerifySuccess(status); err != nil { log.Println(err) } } }
| Interface Name | Function Description | Parameter Description |
|---|---|---|
NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool | Creates and returns a Session connection pool instance. | config: Pool configurationmaxSize: Maximum connections (≤0 uses CPU count * 5)connTimeoutMs: TCP connection timeout (ms)waitTimeoutMs: Session acquisition wait timeout (ms)enableComp: Whether to enable compression |
GetSession() (Session, error) | Gets an available Session from the pool. Blocks if the pool is full, returns error on timeout. Must be paired with PutBack. | None |
PutBack(session Session) | Returns a used Session back to the connection pool. | session: The instance obtained from GetSession |
Close() | Closes the connection pool, releasing all active connections. Must be called before program exit. | None |
The following interfaces are called via the obtained Session.
| Interface Name | Function Description | Parameter Description |
|---|---|---|
InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | Inserts a single record. | deviceId: Device ID measurements: Measurement listdataTypes: Data type listvalues: Value listtimestamp: Timestamp |
InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | Inserts a single aligned record. | deviceId: Device IDmeasurements: Measurement listdataTypes: Data type listvalues: Value listtimestamp: Timestamp |
InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error) | Inserts a single record in string format. | deviceId: Device IDmeasurements: Measurement listvalues: String-type value listtimestamp: Timestamp |
InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | Inserts multiple records for multiple devices. | deviceIds: Device ID listmeasurements: 2D measurement listdataTypes: 2D data type listvalues: 2D value listtimestamps: Timestamp list |
InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | Inserts multiple records for multiple aligned devices. | deviceIds: Device ID listmeasurements: 2D measurement listdataTypes: 2D data type listvalues: 2D value listtimestamps: Timestamp list |
InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | Inserts multiple rows of data for a single device. | tablet: The Tablet data to insertsorted: Whether the data is sorted |
InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | Inserts multiple rows of data for a single aligned device. | tablet: The Tablet data to insertsorted: Whether the data is sorted |
InsertTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | Batch inserts multiple Tablet data. | tablets: Multiple Tablet data to insertsorted: Whether the data is sorted |
InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | Batch inserts multiple aligned devices' data. | tablets: Multiple Tablet data to insertsorted: Whether the data is sorted |
The following interfaces are called via the obtained Session.
| Interface Name | Function Description | Parameter Description |
|---|---|---|
ExecuteStatement(sql string)(SessionDataSet, error) | Executes SQL (primarily for queries), returns a SessionDataSet. | sql: The SQL query statement to execute |
ExecuteQueryStatement(sql string, timeoutMs *int64) (SessionDataSet, error) | Executes a query SQL with optional timeout, returns a SessionDataSet. | sql: The SQL query statement to execute timeoutMs: Query timeout time (milliseconds) |
ExecuteNonQueryStatement(sql string) (r common.TSStatus, err error) | Executes SQL that does not return a result set (e.g., INSERT, CREATE, DELETE). | sql: The SQL statement to execute |
ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) | Queries raw data for specified time series within a time range. | paths: Query path list startTime: Start timestampendTime: End timestamp |
ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error) | Executes an aggregation query (COUNT, AVG, etc.). | paths: Query path listaggregations: Aggregation type liststartTime, endTime, interval: Start time, end time, and intervaltimeoutMs: Query timeout time |
ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error) | Executes multiple SQL statements in batch. | sqls: The SQL statements to execute |
The following interfaces are called via the obtained Session.
| Interface Name | Function Description | Parameter Description |
|---|---|---|
SetStorageGroup(storageGroupId string) (r common.TSStatus, err error) | Creates a database (storage group). | storageGroupId: Database (storage group) name |
DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error) | Deletes a database (storage group). | storageGroupId: The database (storage group) name to delete |
DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error) | Deletes multiple databases (storage groups). | storageGroupIds: The list of database (storage group) names to delete |
CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error) | Creates a non-aligned time series. | path: Time series pathdataType: Data typeencoding: Encoding methodcompressor: Compression algorithmattributes: (Optional) Series attributestags: (Optional) Series tags |
CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error) | Creates a group of aligned time series. | prefixPath: Time series path prefix measurements: Measurement name listdataTypes, encodings, compressors: Data type, encoding, and compressor list for each measurementmeasurementAlias: (Optional) Alias list for each measurement |
DeleteTimeseries(paths []string) (r common.TSStatus, err error) | Deletes multiple time series (including their data). | paths: The list of time series paths to delete |
DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error) | Deletes data within a time period for specified time series (metadata is preserved). | paths: The list of time series pathsstartTime: Start timestampendTime: End timestamp |
SetTimeZone(timeZone string) (r common.TSStatus, err error) | Sets the time zone for the current session. | timeZone: Time zone string, e.g., “UTC”, “Asia/Shanghai”, “GMT+8” |
GetTimeZone() (string, error) | Gets the time zone of the current session. | None |
| Field | Type | Required | Description |
|---|---|---|---|
Host | string | Choose one with NodeUrls | Single-node host address. |
Port | string | Choose one with NodeUrls | Single-node port. |
NodeUrls | []string | Choose one with Host/Port | Cluster node address list, format: "host:port". |
UserName | string | Yes | Username. |
Password | string | Yes | Password. |
FetchSize | int32 | No | Query result set fetch size, default 1024. |
TimeZone | string | No | Session time zone, e.g., “Asia/Shanghai”. Default uses server time zone. |
Database | string | No | For table model; used to set the session's default database. |