session pool supports multiple nodes (#78)
Co-authored-by: fuliwen <fuliw@yonyou.com>
diff --git a/README.md b/README.md
index cc2a26e..831c60b 100644
--- a/README.md
+++ b/README.md
@@ -86,6 +86,7 @@
The PutBack method must be called after use
### New sessionPool
+standalone
```golang
@@ -98,6 +99,18 @@
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
```
+cluster or doubleLive
+
+```golang
+
+config := &client.PoolConfig{
+ UserName: user,
+ Password: password,
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+ }
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
### Get session through sessionPool, putback after use
diff --git a/README_ZH.md b/README_ZH.md
index 13d4ff8..3e88438 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -69,6 +69,7 @@
### 创建sessionPool
+单实例
```golang
config := &client.PoolConfig{
@@ -81,6 +82,20 @@
```
+分布式或双活
+
+```golang
+
+config := &client.PoolConfig{
+ UserName: user,
+ Password: password,
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+ }
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+
### 使用sessionPool获取session,使用完手动调用PutBack
例1:设置存储组
diff --git a/client/session.go b/client/session.go
index c2a29e7..f26699a 100644
--- a/client/session.go
+++ b/client/session.go
@@ -118,11 +118,12 @@
}
type ClusterConfig struct {
- NodeUrls []string //ip:port
- UserName string
- Password string
- FetchSize int32
- TimeZone string
+ NodeUrls []string //ip:port
+ UserName string
+ Password string
+ FetchSize int32
+ TimeZone string
+ ConnectRetryMax int
}
type ClusterSession struct {
@@ -975,12 +976,12 @@
return Session{config: config}
}
-func NewClusterSession(ClusterConfig *ClusterConfig) Session {
+func NewClusterSession(clusterConfig *ClusterConfig) Session {
session := Session{}
node := endPoint{}
- for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
- node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
- node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
+ for i := 0; i < len(clusterConfig.NodeUrls); i++ {
+ node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
+ node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
endPointList.PushBack(node)
}
var err error
@@ -996,7 +997,7 @@
log.Println(err)
} else {
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
- ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
+ clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
break
}
}
@@ -1052,14 +1053,15 @@
}
-func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
+func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config {
return &Config{
- Host: host,
- Port: port,
- UserName: userName,
- Password: passWord,
- FetchSize: fetchSize,
- TimeZone: timeZone,
+ Host: host,
+ Port: port,
+ UserName: userName,
+ Password: passWord,
+ FetchSize: fetchSize,
+ TimeZone: timeZone,
+ ConnectRetryMax: connectRetryMax,
}
}
diff --git a/client/sessionpool.go b/client/sessionpool.go
index dbcb7bb..156ce2a 100644
--- a/client/sessionpool.go
+++ b/client/sessionpool.go
@@ -78,7 +78,7 @@
if ok {
return session, nil
} else {
- log.Println("sessionpool has closed")
+ log.Println("sessionPool has closed")
return session, errPoolClosed
}
default:
@@ -93,11 +93,19 @@
}
}
-func (spool *SessionPool) ConstructSession(config *PoolConfig) (Session, error) {
- session := NewSession(getSessionConfig(config))
- if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
- log.Print(err)
- return session, err
+func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
+ if len(config.NodeUrls) > 0 {
+ session = NewClusterSession(getClusterSessionConfig(config))
+ if err := session.OpenCluster(spool.enableCompression); err != nil {
+ log.Print(err)
+ return session, err
+ }
+ } else {
+ session = NewSession(getSessionConfig(config))
+ if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
+ log.Print(err)
+ return session, err
+ }
}
return session, nil
}
@@ -114,6 +122,17 @@
}
}
+func getClusterSessionConfig(config *PoolConfig) *ClusterConfig {
+ return &ClusterConfig{
+ NodeUrls: config.NodeUrls,
+ UserName: config.UserName,
+ Password: config.Password,
+ FetchSize: config.FetchSize,
+ TimeZone: config.TimeZone,
+ ConnectRetryMax: config.ConnectRetryMax,
+ }
+}
+
func (spool *SessionPool) PutBack(session Session) {
if session.trans.IsOpen() {
spool.ch <- session
diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go
index 26a7fb7..c43a4fb 100644
--- a/example/session_pool/session_pool_example.go
+++ b/example/session_pool/session_pool_example.go
@@ -24,6 +24,7 @@
"fmt"
"log"
"math/rand"
+ "strings"
"sync"
"time"
@@ -60,8 +61,8 @@
wg.Add(1)
go func() {
defer wg.Done()
- setStorageGroup(fmt.Sprintf("root.ln%d", j))
- deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
+ setStorageGroup(fmt.Sprintf("root.ln-%d", j))
+ deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))
}()
@@ -134,17 +135,6 @@
insertAlignedTablets()
deleteTimeseries("root.ln.device1.*")
executeQueryStatement("show timeseries root.**")
- for i := 0; i < 10000; i++ {
- var j = i
- wg.Add(1)
- go func() {
- defer wg.Done()
- setStorageGroup(fmt.Sprintf("root.ln%d", j))
- deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
-
- }()
-
- }
wg.Wait()
}
@@ -773,3 +763,22 @@
}
}
}
+
+// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
+func useSessionPool() {
+
+ config := &client.PoolConfig{
+ UserName: user,
+ Password: password,
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+ }
+ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+ defer sessionPool.Close()
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+
+}