Support connecting multiple nodes of the cluster (#25)

diff --git a/client/session.go b/client/session.go
index 9c89662..c5b4192 100644
--- a/client/session.go
+++ b/client/session.go
@@ -21,13 +21,16 @@
 
 import (
 	"bytes"
+	"container/list"
 	"context"
 	"encoding/binary"
 	"errors"
 	"fmt"
+	"log"
 	"net"
 	"reflect"
 	"sort"
+	"strings"
 	"time"
 
 	"github.com/apache/iotdb-client-go/rpc"
@@ -58,6 +61,14 @@
 	requestStatementId int64
 }
 
+type endPoint struct {
+	Host string
+	Port string
+}
+
+var endPointList = list.New()
+var session Session
+
 func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error {
 	if s.config.FetchSize <= 0 {
 		s.config.FetchSize = DefaultFetchSize
@@ -70,7 +81,7 @@
 	var err error
 
 	s.trans, err = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{
-         ConnectTimeout: time.Duration(connectionTimeoutInMs), // Use 0 for no timeout
+		ConnectTimeout: time.Duration(connectionTimeoutInMs), // Use 0 for no timeout
 	})
 	if err != nil {
 		return err
@@ -107,6 +118,59 @@
 	return err
 }
 
+type ClusterConfig struct {
+	NodeUrls  []string //ip:port
+	UserName  string
+	Password  string
+	FetchSize int32
+	TimeZone  string
+}
+
+type ClusterSession struct {
+	config             *ClusterConfig
+	client             *rpc.TSIServiceClient
+	sessionId          int64
+	trans              thrift.TTransport
+	requestStatementId int64
+}
+
+func (s *Session) OpenCluster(enableRPCCompression bool) error {
+	if s.config.FetchSize <= 0 {
+		s.config.FetchSize = DefaultFetchSize
+	}
+	if s.config.TimeZone == "" {
+		s.config.TimeZone = DefaultTimeZone
+	}
+
+	var protocolFactory thrift.TProtocolFactory
+	var err error
+
+	if enableRPCCompression {
+		protocolFactory = thrift.NewTCompactProtocolFactory()
+	} else {
+		protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
+	}
+	iprot := protocolFactory.GetProtocol(s.trans)
+	oprot := protocolFactory.GetProtocol(s.trans)
+	s.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
+	req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: &s.config.UserName,
+		Password: &s.config.Password}
+	fmt.Println(req)
+	resp, err := s.client.OpenSession(context.Background(), &req)
+	if err != nil {
+		return err
+	}
+	s.sessionId = resp.GetSessionId()
+	s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
+	if err != nil {
+		return err
+	}
+
+	s.SetTimeZone(s.config.TimeZone)
+	s.config.TimeZone, err = s.GetTimeZone()
+	return err
+}
+
 func (s *Session) Close() (r *rpc.TSStatus, err error) {
 	req := rpc.NewTSCloseSessionReq()
 	req.SessionId = s.sessionId
@@ -126,6 +190,11 @@
  */
 func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) {
 	r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
+		}
+	}
 	return r, err
 }
 
@@ -138,6 +207,11 @@
  */
 func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) {
 	r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
+		}
+	}
 	return r, err
 }
 
@@ -150,6 +224,11 @@
  */
 func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *rpc.TSStatus, err error) {
 	r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
+		}
+	}
 	return r, err
 }
 
@@ -167,6 +246,11 @@
 	request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding),
 		Compressor: int32(compressor), Attributes: attributes, Tags: tags}
 	status, err := s.client.CreateTimeseries(context.Background(), &request)
+	if err != nil && status == nil {
+		if reconnect() {
+			status, err = session.client.CreateTimeseries(context.Background(), &request)
+		}
+	}
 	return status, err
 }
 
@@ -200,6 +284,12 @@
 		Encodings: destEncodings, Compressors: destCompressions}
 	r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
 
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.CreateMultiTimeseries(context.Background(), &request)
+		}
+	}
+
 	return r, err
 }
 
@@ -212,6 +302,11 @@
  */
 func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err error) {
 	r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+		}
+	}
 	return r, err
 }
 
@@ -227,6 +322,11 @@
 func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *rpc.TSStatus, err error) {
 	request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime}
 	r, err = s.client.DeleteData(context.Background(), &request)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.DeleteData(context.Background(), &request)
+		}
+	}
 	return r, err
 }
 
@@ -244,6 +344,11 @@
 	request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, DeviceId: deviceId, Measurements: measurements,
 		Values: values, Timestamp: timestamp}
 	r, err = s.client.InsertStringRecord(context.Background(), &request)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.InsertStringRecord(context.Background(), &request)
+		}
+	}
 	return r, err
 }
 
@@ -270,6 +375,13 @@
 		FetchSize:   &s.config.FetchSize,
 	}
 	resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+	if err != nil && resp == nil {
+		if reconnect() {
+			resp, err = session.client.ExecuteStatement(context.Background(), &request)
+		}
+	}
+
 	return s.genDataSet(sql, resp), err
 }
 
@@ -283,6 +395,14 @@
 			return nil, statusErr
 		}
 	} else {
+		if reconnect() {
+			resp, err = session.client.ExecuteQueryStatement(context.Background(), &request)
+			if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+				return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+			} else {
+				return nil, statusErr
+			}
+		}
 		return nil, err
 	}
 }
@@ -311,6 +431,13 @@
 		return nil, err
 	}
 	r, err = s.client.InsertRecord(context.Background(), request)
+
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.InsertRecord(context.Background(), request)
+		}
+	}
+
 	return r, err
 }
 
@@ -369,7 +496,16 @@
 		MeasurementsList: measurementsSlice,
 		ValuesList:       valuesList,
 	}
-	return s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+	r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.InsertRecordsOfOneDevice(context.Background(), request)
+		}
+	}
+
+	return r, err
 }
 
 /*
@@ -390,7 +526,13 @@
 	if err != nil {
 		return nil, err
 	} else {
-		return s.client.InsertRecords(context.Background(), request)
+		r, err = s.client.InsertRecords(context.Background(), request)
+		if err != nil && r == nil {
+			if reconnect() {
+				r, err = session.client.InsertRecords(context.Background(), request)
+			}
+		}
+		return r, err
 	}
 }
 
@@ -411,7 +553,13 @@
 	if err != nil {
 		return nil, err
 	}
-	return s.client.InsertTablets(context.Background(), request)
+	r, err = s.client.InsertTablets(context.Background(), request)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.InsertTablets(context.Background(), request)
+		}
+	}
+	return r, err
 }
 
 func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, err error) {
@@ -419,7 +567,13 @@
 		SessionId:  s.sessionId,
 		Statements: inserts,
 	}
-	return s.client.ExecuteBatchStatement(context.Background(), &request)
+	r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.ExecuteBatchStatement(context.Background(), &request)
+		}
+	}
+	return r, err
 }
 
 func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) {
@@ -432,6 +586,13 @@
 		StatementId: s.requestStatementId,
 	}
 	resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request)
+
+	if err != nil && resp == nil {
+		if reconnect() {
+			resp, err = session.client.ExecuteRawDataQuery(context.Background(), &request)
+		}
+	}
+
 	return s.genDataSet("", resp), err
 }
 
@@ -443,6 +604,13 @@
 		FetchSize:   &s.config.FetchSize,
 	}
 	resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request)
+
+	if err != nil && resp == nil {
+		if reconnect() {
+			resp, err = session.client.ExecuteUpdateStatement(context.Background(), &request)
+		}
+	}
+
 	return s.genDataSet(sql, resp), err
 }
 
@@ -581,7 +749,16 @@
 	if err != nil {
 		return nil, err
 	}
-	return s.client.InsertTablet(context.Background(), request)
+
+	r, err = s.client.InsertTablet(context.Background(), request)
+
+	if err != nil && r == nil {
+		if reconnect() {
+			r, err = session.client.InsertTablet(context.Background(), request)
+		}
+	}
+
+	return r, err
 }
 
 func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, error) {
@@ -605,6 +782,114 @@
 	return s.sessionId
 }
 
-func NewSession(config *Config) *Session {
-	return &Session{config: config}
+func NewSession(config *Config) Session {
+	endPoint := endPoint{}
+	endPoint.Host = config.Host
+	endPoint.Port = config.Port
+	endPointList.PushBack(endPoint)
+	return Session{config: config}
+}
+
+func NewClusterSession(ClusterConfig *ClusterConfig) Session {
+
+	node := endPoint{}
+	for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
+		node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
+		node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
+		endPointList.PushBack(node)
+	}
+	var err error
+	for e := endPointList.Front(); e != nil; e = e.Next() {
+		session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{
+			ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+		})
+		if err == nil {
+			session.trans = thrift.NewTFramedTransport(session.trans)
+			if !session.trans.IsOpen() {
+				err = session.trans.Open()
+				if err != nil {
+					log.Println(err)
+				} else {
+					session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
+						ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
+					break
+				}
+			}
+		}
+	}
+	if err != nil {
+		log.Fatal("No Server Can Connect")
+	}
+	return session
+}
+
+func initClusterConn(node endPoint) error {
+	var err error
+
+	session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{
+		ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+	})
+	if err == nil {
+		session.trans = thrift.NewTFramedTransport(session.trans)
+		if !session.trans.IsOpen() {
+			err = session.trans.Open()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	var protocolFactory thrift.TProtocolFactory
+	protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
+	iprot := protocolFactory.GetProtocol(session.trans)
+	oprot := protocolFactory.GetProtocol(session.trans)
+	session.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
+	req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: session.config.TimeZone, Username: &session.config.UserName,
+		Password: &session.config.Password}
+	fmt.Println(req)
+	resp, err := session.client.OpenSession(context.Background(), &req)
+	if err != nil {
+		return err
+	}
+	session.sessionId = resp.GetSessionId()
+	session.requestStatementId, err = session.client.RequestStatementId(context.Background(), session.sessionId)
+	if err != nil {
+		return err
+	}
+
+	session.SetTimeZone(session.config.TimeZone)
+	session.config.TimeZone, err = session.GetTimeZone()
+	return err
+
+}
+
+func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
+	return &Config{
+		Host:      host,
+		Port:      port,
+		UserName:  userName,
+		Password:  passWord,
+		FetchSize: fetchSize,
+		TimeZone:  timeZone,
+	}
+}
+
+func reconnect() bool {
+	var err error
+	var connectedSuccess = false
+
+	for i := 0; i < 3; i++ {
+		for e := endPointList.Front(); e != nil; e = e.Next() {
+			err = initClusterConn(e.Value.(endPoint))
+			if err == nil {
+				connectedSuccess = true
+				break
+			} else {
+				log.Println("Connection refused:", e.Value.(endPoint))
+			}
+		}
+		if connectedSuccess {
+			break
+		}
+	}
+	return connectedSuccess
 }
diff --git a/example/session_example.go b/example/session_example.go
index 234d8d3..c10154b 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -24,6 +24,7 @@
 	"fmt"
 	"log"
 	"math/rand"
+	"strings"
 	"time"
 
 	"github.com/apache/iotdb-client-go/client"
@@ -36,7 +37,7 @@
 	user     string
 	password string
 )
-var session *client.Session
+var session client.Session
 
 func main() {
 	flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
@@ -489,3 +490,16 @@
 		}
 	}
 }
+
+// If your IotDB is a cluster version, you can use the following code for multi node connection
+func connectCluster() {
+	config := &client.ClusterConfig{
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669", ","),
+		UserName: "root",
+		Password: "root",
+	}
+	session = client.NewClusterSession(config)
+	if err := session.OpenCluster(false); err != nil {
+		log.Fatal(err)
+	}
+}
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 163bb34..645ac60 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -30,7 +30,7 @@
 
 type e2eTestSuite struct {
 	suite.Suite
-	session *client.Session
+	session client.Session
 }
 
 func TestE2ETestSuite(t *testing.T) {