blob: e56ce95f901965efba0810e55f69d2b44763703e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package e2e
import (
"log"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/apache/iotdb-client-go/v2/client"
)
var (
nodeUrls = "iotdb:6668,iotdb:6667,iotdb:6669"
host = "iotdb"
port = "6667"
username = "root"
password = "root"
database = "test_db"
)
type e2eTableTestSuite struct {
suite.Suite
session client.ITableSession
}
func TestE2ETableTestSuite(t *testing.T) {
suite.Run(t, &e2eTableTestSuite{})
}
func (s *e2eTableTestSuite) SetupSuite() {
clusterConfig := client.ClusterConfig{
NodeUrls: strings.Split(nodeUrls, ","),
UserName: username,
Password: password,
}
session, err := client.NewClusterTableSession(&clusterConfig, false)
s.Require().NoError(err)
s.session = session
}
func (s *e2eTableTestSuite) TearDownSuite() {
s.session.Close()
}
func (s *e2eTableTestSuite) SetupTest() {
s.checkError(s.session.ExecuteNonQueryStatement("create database " + database))
s.checkError(s.session.ExecuteNonQueryStatement("use " + database))
for i := 0; i < 10; i++ {
s.checkError(s.session.ExecuteNonQueryStatement("create database db" + strconv.Itoa(i)))
}
}
func (s *e2eTableTestSuite) TearDownTest() {
s.checkError(s.session.ExecuteNonQueryStatement("drop database " + database))
for i := 0; i < 10; i++ {
s.checkError(s.session.ExecuteNonQueryStatement("drop database db" + strconv.Itoa(i)))
}
}
func (s *e2eTableTestSuite) Test_CreateTableSession() {
assert := s.Require()
config := &client.Config{
Host: host,
Port: port,
UserName: username,
Password: password,
}
session, err := client.NewTableSession(config, false, 3000)
assert.NoError(err)
defer session.Close()
s.checkError(session.ExecuteNonQueryStatement("use " + database))
}
func (s *e2eTableTestSuite) Test_CreateTableSessionWithDatabase() {
assert := s.Require()
config := &client.Config{
Host: host,
Port: port,
UserName: username,
Password: password,
Database: database,
}
session, err := client.NewTableSession(config, false, 3000)
defer session.Close()
assert.NoError(err)
timeoutInMs := int64(1000)
_, err = session.ExecuteQueryStatement("show tables", &timeoutInMs)
assert.NoError(err)
}
func (s *e2eTableTestSuite) Test_GetSessionFromTableSessionPool() {
assert := s.Require()
poolConfig := &client.PoolConfig{
Host: host,
Port: port,
UserName: username,
Password: password,
}
sessionPool := client.NewTableSessionPool(poolConfig, 3, 10000, 3000, false)
defer sessionPool.Close()
session1, err := sessionPool.GetSession()
assert.NoError(err)
s.checkError(session1.ExecuteNonQueryStatement("use " + database))
session1.Close()
// test get session timeout
var wg sync.WaitGroup
getNum := 4
wg.Add(getNum)
successNum := int32(0)
for i := 0; i < getNum; i++ {
go func() {
defer wg.Done()
session, getSessionErr := sessionPool.GetSession()
// timeout after 3s
if getSessionErr != nil {
return
}
atomic.AddInt32(&successNum, 1)
defer func() {
time.Sleep(time.Second * 4)
session.Close()
}()
}()
}
wg.Wait()
assert.Equal(int32(3), successNum)
// test get session
getNum = 10
wg.Add(getNum)
successNum = int32(0)
for i := 0; i < getNum; i++ {
go func() {
defer wg.Done()
session, getSessionErr := sessionPool.GetSession()
if getSessionErr != nil {
return
}
atomic.AddInt32(&successNum, 1)
defer session.Close()
s.checkError(session.ExecuteNonQueryStatement("use " + database))
}()
}
wg.Wait()
assert.Equal(int32(10), successNum)
}
func (s *e2eTableTestSuite) Test_GetSessionFromSessionPoolWithSpecificDatabase() {
assert := s.Require()
poolConfig := &client.PoolConfig{
Host: host,
Port: port,
UserName: username,
Password: password,
Database: database,
}
sessionPool := client.NewTableSessionPool(poolConfig, 3, 10000, 3000, false)
defer sessionPool.Close()
session1, err := sessionPool.GetSession()
assert.NoError(err)
s.checkError(session1.ExecuteNonQueryStatement("create table table_in_" + database + " (tag1 string tag, tag2 string tag, s1 text field, s2 text field)"))
session1.Close()
var wg sync.WaitGroup
getNum := 10
wg.Add(getNum)
successNum := int32(0)
for i := 0; i < getNum; i++ {
currentDbName := "db" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, getSessionErr := sessionPool.GetSession()
if getSessionErr != nil {
return
}
defer session.Close()
timeoutInMs := int64(3000)
dataSet, queryErr := session.ExecuteQueryStatement("show tables", &timeoutInMs)
defer dataSet.Close()
assert.NoError(queryErr)
assert.True(dataSet.Next())
value, err := dataSet.GetString("TableName")
assert.NoError(err)
assert.Equal("table_in_"+database, value)
// modify using database
s.checkError(session.ExecuteNonQueryStatement("use " + currentDbName))
atomic.AddInt32(&successNum, 1)
}()
}
wg.Wait()
assert.Equal(int32(10), successNum)
// database in session should be reset to test_db
wg.Add(getNum)
for i := 0; i < getNum; i++ {
go func() {
defer wg.Done()
session, getSessionErr := sessionPool.GetSession()
// timeout after 3s
if getSessionErr != nil {
return
}
defer session.Close()
atomic.AddInt32(&successNum, 1)
timeoutInMs := int64(3000)
dataSet, queryErr := session.ExecuteQueryStatement("show tables", &timeoutInMs)
defer dataSet.Close()
assert.NoError(queryErr)
assert.True(dataSet.Next())
value, err := dataSet.GetString("TableName")
assert.NoError(err)
assert.Equal("table_in_"+database, value)
}()
}
wg.Wait()
}
func (s *e2eTableTestSuite) Test_ErrInTableSessionPool() {
assert := s.Require()
poolConfig := &client.PoolConfig{
Host: host,
Port: port,
UserName: username,
Password: password,
Database: database,
}
sessionPool := client.NewTableSessionPool(poolConfig, 3, 10000, 3000, false)
defer sessionPool.Close()
session1, err := sessionPool.GetSession()
assert.NoError(err)
s.checkError(session1.ExecuteNonQueryStatement("create table test_timeout(tag1 string tag, tag2 string tag, s1 text field, s2 text field)"))
timeoutInMs := int64(1)
dataSet, err := session1.ExecuteQueryStatement("select * from test_timeout", &timeoutInMs)
if err == nil {
dataSet.Close()
err = session1.Close()
assert.NoError(err)
return
}
err = session1.Close()
assert.NoError(err)
// test repeated close
err = session1.Close()
assert.NoError(err)
timeoutInMs = int64(60000)
session1, err = sessionPool.GetSession()
assert.NoError(err)
defer session1.Close()
dataSet, err = session1.ExecuteQueryStatement("show tables", &timeoutInMs)
assert.NoError(err)
dataSet.Close()
session2, err := sessionPool.GetSession()
assert.NoError(err)
defer session2.Close()
dataSet, err = session2.ExecuteQueryStatement("show tables", &timeoutInMs)
assert.NoError(err)
dataSet.Close()
session3, err := sessionPool.GetSession()
assert.NoError(err)
defer session3.Close()
dataSet, err = session3.ExecuteQueryStatement("show tables", &timeoutInMs)
assert.NoError(err)
dataSet.Close()
_, err = sessionPool.GetSession()
assert.NotNil(err)
}
func (s *e2eTableTestSuite) Test_InsertTabletAndQuery() {
assert := s.Require()
s.checkError(s.session.ExecuteNonQueryStatement("create table t1 (tag1 string tag, tag2 string tag, s1 text field, s2 text field)"))
timeoutInMs := int64(10000)
// show tables
dataSet, err := s.session.ExecuteQueryStatement("show tables", &timeoutInMs)
assert.NoError(err)
hasNext, err := dataSet.Next()
assert.NoError(err)
assert.True(hasNext)
value, err := dataSet.GetString("TableName")
assert.NoError(err)
assert.Equal("t1", value)
dataSet.Close()
// insert relational tablet
tablet, err := client.NewRelationalTablet("t1", []*client.MeasurementSchema{
{
Measurement: "tag1",
DataType: client.STRING,
},
{
Measurement: "tag2",
DataType: client.STRING,
},
{
Measurement: "s1",
DataType: client.TEXT,
},
{
Measurement: "s2",
DataType: client.TEXT,
},
}, []client.ColumnCategory{client.TAG, client.TAG, client.FIELD, client.FIELD}, 1024)
assert.NoError(err)
values := [][]interface{}{
{"tag1_value_1", "tag2_value_1", "s1_value_1", "s2_value_1"},
{"tag1_value_1", "tag2_value_1", "s1_value_2", "s2_value_2"},
{"tag1_value_1", "tag2_value_1", nil, "s2_value_2"},
{"tag1_value_2", "tag2_value_2", "s1_value_1", "s2_value_1"},
{"tag1_value_2", "tag2_value_2", "s1_value_1", "s2_value_1"},
{"tag1_value_3", "tag2_value_3", "s1_value_1", "s2_value_1"},
{"tag1_value_3", "tag2_value_3", "s1_value_2", nil},
{"tag1_value_3", "tag2_value_3", "s1_value_3", "s2_value_3"},
}
ts := int64(0)
for row := 0; row < 8; row++ {
tablet.SetTimestamp(ts, row)
assert.NoError(tablet.SetValueAt(values[row][0], 0, row))
assert.NoError(tablet.SetValueAt(values[row][1], 1, row))
assert.NoError(tablet.SetValueAt(values[row][2], 2, row))
assert.NoError(tablet.SetValueAt(values[row][3], 3, row))
ts++
tablet.RowSize++
}
s.checkError(s.session.Insert(tablet))
// query
dataSet, err = s.session.ExecuteQueryStatement("select * from t1 order by time asc", &timeoutInMs)
assert.NoError(err)
count := int64(0)
for {
hasNext, err := dataSet.Next()
assert.NoError(err)
if !hasNext {
break
}
value, err := dataSet.GetLong("time")
assert.NoError(err)
assert.Equal(count, value)
assert.Equal(values[count][0], getValueFromDataSet(dataSet, "tag1"))
assert.Equal(values[count][1], getValueFromDataSet(dataSet, "tag2"))
assert.Equal(values[count][2], getValueFromDataSet(dataSet, "s1"))
assert.Equal(values[count][3], getValueFromDataSet(dataSet, "s2"))
count++
}
assert.Equal(int64(8), count)
dataSet.Close()
// query
dataSet, err = s.session.ExecuteQueryStatement("select s1, s1 from t1 order by time asc", &timeoutInMs)
assert.NoError(err)
count = int64(0)
for {
hasNext, err := dataSet.Next()
assert.NoError(err)
if !hasNext {
break
}
assert.Equal(values[count][2], getValueFromDataSetByIndex(dataSet, 1))
assert.Equal(values[count][2], getValueFromDataSetByIndex(dataSet, 2))
count++
}
assert.Equal(int64(8), count)
dataSet.Close()
// query
dataSet, err = s.session.ExecuteQueryStatement("select s1, s2 as s1 from t1 order by time asc", &timeoutInMs)
defer dataSet.Close()
assert.NoError(err)
count = int64(0)
for {
hasNext, err := dataSet.Next()
assert.NoError(err)
if !hasNext {
break
}
assert.Equal(values[count][2], getValueFromDataSetByIndex(dataSet, 1))
assert.Equal(values[count][3], getValueFromDataSetByIndex(dataSet, 2))
count++
}
assert.Equal(int64(8), count)
}
func getValueFromDataSet(dataSet *client.SessionDataSet, columnName string) interface{} {
if isNull, err := dataSet.IsNull(columnName); err != nil {
log.Fatal(err)
} else if isNull {
return nil
}
v, err := dataSet.GetString(columnName)
if err != nil {
log.Fatal(err)
}
return v
}
func getValueFromDataSetByIndex(dataSet *client.SessionDataSet, columnIndex int32) interface{} {
if isNull, err := dataSet.IsNullByIndex(columnIndex); err != nil {
log.Fatal(err)
} else if isNull {
return nil
}
v, err := dataSet.GetStringByIndex(columnIndex)
if err != nil {
log.Fatal(err)
}
return v
}
func (s *e2eTableTestSuite) checkError(err error) {
s.Require().NoError(err)
}