[IOTDB-4009] Add aligned timeseries APIs for go client (#47)
diff --git a/client/session.go b/client/session.go
index 187ad87..d591446 100644
--- a/client/session.go
+++ b/client/session.go
@@ -418,13 +418,14 @@
func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
measurements []string,
types []TSDataType,
- values []interface{}) (*rpc.TSInsertRecordReq, error) {
+ values []interface{},
+ isAligned bool) (*rpc.TSInsertRecordReq, error) {
request := &rpc.TSInsertRecordReq{}
request.SessionId = s.sessionId
request.PrefixPath = deviceId
request.Timestamp = time
request.Measurements = measurements
-
+ request.IsAligned = &isAligned
if bys, err := valuesToBytes(types, values); err == nil {
request.Values = bys
} else {
@@ -434,7 +435,24 @@
}
func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
- request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values)
+ request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false)
+ if err != nil {
+ return nil, err
+ }
+ r, err = s.client.InsertRecord(context.Background(), request)
+
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertRecord(context.Background(), request)
+ }
+ }
+
+ return r, err
+}
+
+func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
+ request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true)
if err != nil {
return nil, err
}
@@ -455,6 +473,7 @@
measurementsSlice [][]string
dataTypesSlice [][]TSDataType
valuesSlice [][]interface{}
+ isAligned bool
}
func (d *deviceData) Len() int {
@@ -518,6 +537,49 @@
return r, err
}
+func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *rpc.TSStatus, err error) {
+ length := len(timestamps)
+ if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length {
+ return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal")
+ }
+
+ if !sorted {
+ sort.Sort(&deviceData{
+ timestamps: timestamps,
+ measurementsSlice: measurementsSlice,
+ dataTypesSlice: dataTypesSlice,
+ valuesSlice: valuesSlice,
+ })
+ }
+
+ valuesList := make([][]byte, length)
+ for i := 0; i < length; i++ {
+ if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil {
+ return nil, err
+ }
+ }
+ var isAligned = true
+ request := &rpc.TSInsertRecordsOfOneDeviceReq{
+ SessionId: s.sessionId,
+ PrefixPath: deviceId,
+ Timestamps: timestamps,
+ MeasurementsList: measurementsSlice,
+ ValuesList: valuesList,
+ IsAligned: &isAligned,
+ }
+
+ r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+ }
+ }
+
+ return r, err
+}
+
/*
*insert multiple rows of data, records are independent to each other, in other words, there's no relationship
*between those records
@@ -532,7 +594,24 @@
*/
func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64) (r *rpc.TSStatus, err error) {
- request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps)
+ request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false)
+ if err != nil {
+ return nil, err
+ } else {
+ r, err = s.client.InsertRecords(context.Background(), request)
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertRecords(context.Background(), request)
+ }
+ }
+ return r, err
+ }
+}
+
+func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
+ timestamps []int64) (r *rpc.TSStatus, err error) {
+ request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true)
if err != nil {
return nil, err
} else {
@@ -560,7 +639,29 @@
}
}
}
- request, err := s.genInsertTabletsReq(tablets)
+ request, err := s.genInsertTabletsReq(tablets, false)
+ if err != nil {
+ return nil, err
+ }
+ r, err = s.client.InsertTablets(context.Background(), request)
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertTablets(context.Background(), request)
+ }
+ }
+ return r, err
+}
+
+func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus, err error) {
+ if !sorted {
+ for _, t := range tablets {
+ if err := t.Sort(); err != nil {
+ return nil, err
+ }
+ }
+ }
+ request, err := s.genInsertTabletsReq(tablets, true)
if err != nil {
return nil, err
}
@@ -633,7 +734,7 @@
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
}
-func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsReq, error) {
+func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.TSInsertTabletsReq, error) {
var (
length = len(tablets)
deviceIds = make([]string, length)
@@ -665,12 +766,13 @@
ValuesList: valuesList,
TimestampsList: timestampsList,
SizeList: sizeList,
+ IsAligned: &isAligned,
}
return &request, nil
}
func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
- timestamps []int64) (*rpc.TSInsertRecordsReq, error) {
+ timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
length := len(deviceIds)
if length != len(timestamps) || length != len(measurements) || length != len(values) {
return nil, errLength
@@ -680,6 +782,7 @@
PrefixPaths: deviceIds,
MeasurementsList: measurements,
Timestamps: timestamps,
+ IsAligned: &isAligned,
}
v := make([][]byte, length)
for i := 0; i < len(measurements); i++ {
@@ -760,7 +863,7 @@
return nil, err
}
}
- request, err := s.genTSInsertTabletReq(tablet)
+ request, err := s.genTSInsertTabletReq(tablet, false)
if err != nil {
return nil, err
}
@@ -777,7 +880,30 @@
return r, err
}
-func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, error) {
+func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, err error) {
+ if !sorted {
+ if err := tablet.Sort(); err != nil {
+ return nil, err
+ }
+ }
+ request, err := s.genTSInsertTabletReq(tablet, true)
+ if err != nil {
+ return nil, err
+ }
+
+ r, err = s.client.InsertTablet(context.Background(), request)
+
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertTablet(context.Background(), request)
+ }
+ }
+
+ return r, err
+}
+
+func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) (*rpc.TSInsertTabletReq, error) {
if values, err := tablet.getValuesBytes(); err == nil {
request := &rpc.TSInsertTabletReq{
SessionId: s.sessionId,
@@ -787,6 +913,7 @@
Timestamps: tablet.GetTimestampBytes(),
Types: tablet.getDataTypes(),
Size: int32(tablet.rowCount),
+ IsAligned: &isAligned,
}
return request, nil
} else {
diff --git a/example/session_example.go b/example/session_example.go
index c10154b..2398b94 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -87,7 +87,7 @@
} else {
log.Fatal(err)
}
-
+ deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
insertTablets()
deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
@@ -110,6 +110,23 @@
//0.12.x and newer
insertRecordsOfOneDevice()
deleteTimeseries("root.sg1.dev0.*")
+
+ insertAlignedRecord()
+ deleteTimeseries("root.al1.dev1.*")
+
+ insertAlignedRecords()
+ deleteTimeseries("root.al1.**")
+
+ insertAlignedRecordsOfOneDevice()
+ deleteTimeseries("root.al1.dev4.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablet()
+ deleteTimeseries("root.ln.device1.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablets()
+ deleteTimeseries("root.ln.device1.*")
}
func printDevice1(sds *client.SessionDataSet) {
@@ -309,6 +326,25 @@
checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
}
+func insertAlignedRecord() {
+ var (
+ deviceId = "root.al1.dev1"
+ measurements = []string{"status"}
+ values = []interface{}{"123"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ timestamp int64 = 12
+ )
+ checkError(session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+}
+
func insertRecords() {
var (
deviceId = []string{"root.sg1.dev1"}
@@ -320,6 +356,25 @@
checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
}
+func insertAlignedRecords() {
+ var (
+ deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
+ measurements = [][]string{{"status"}, {"temperature"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+ values = [][]interface{}{{"33"}, {"44"}}
+ timestamps = []int64{12, 13}
+ )
+ checkError(session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+}
+
func insertRecordsOfOneDevice() {
ts := time.Now().UTC().UnixNano() / 1000000
var (
@@ -341,6 +396,42 @@
checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
}
+func insertAlignedRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.al1.dev4"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ checkError(session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ sessionDataSetNew, err := session.ExecuteStatement("select restart_count,tick_count,price,temperature,description,status from root.al1.dev4")
+ if err == nil {
+ printDataSet0(sessionDataSetNew)
+ sessionDataSetNew.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+}
+
func deleteData() {
var (
paths = []string{"root.sg1.dev1.status"}
@@ -359,6 +450,22 @@
}
}
+func insertAlignedTablet() {
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertAlignedTablet(tablet, false)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ var timeout int64 = 1000
+ if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {
+ printDevice1(ds)
+ ds.Close()
+ } else {
+ log.Fatal(err)
+ }
+}
+
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
{
@@ -426,6 +533,20 @@
checkError(session.InsertTablets(tablets, false))
}
+func insertAlignedTablets() {
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ checkError(session.InsertAlignedTablets(tablets, false))
+}
+
func setTimeZone() {
var timeZone = "GMT"
session.SetTimeZone(timeZone)
@@ -477,6 +598,18 @@
var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
"insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
checkError(session.ExecuteBatchStatement(sqls))
+ var (
+ paths []string = []string{"root.ln.wf02.wt02.s5"}
+ startTime int64 = 1
+ endTime int64 = 200
+ )
+ sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
+ if err == nil {
+ printDataSet2(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
}
func checkError(status *rpc.TSStatus, err error) {
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 645ac60..171d0d6 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -20,6 +20,9 @@
package e2e
import (
+ "fmt"
+ "log"
+ "math/rand"
"testing"
"time"
@@ -110,3 +113,169 @@
assert.NoError(ds.Scan(&status))
assert.Equal(status, "Working")
}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecord() {
+ var (
+ deviceId = "root.tsg2.dev1"
+ measurements = []string{"status"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ values = []interface{}{"Working"}
+ timestamp = time.Now().UTC().UnixNano() / 1000000
+ )
+ s.checkError(s.session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+
+ ds, err := s.session.ExecuteQueryStatement("select status from root.tsg2.dev1", nil)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "Working")
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecords() {
+ var (
+ deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
+ measurements = [][]string{{"status"}, {"temperature"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+ values = [][]interface{}{{"33"}, {"44"}}
+ timestamps = []int64{12, 13}
+ )
+ s.checkError(s.session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+ ds, err := s.session.ExecuteQueryStatement("select temperature from root.al1.dev3", nil)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var temperature string
+ assert.NoError(ds.Scan(&temperature))
+ assert.Equal(temperature, "44")
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.al1.dev4"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ s.checkError(s.session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+ ds, err := s.session.ExecuteStatement("select temperature from root.al1.dev4")
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "12.1")
+}
+func (s *e2eTestSuite) Test_InsertAlignedTablet() {
+ var timeseries = []string{"root.ln.device1.**"}
+ s.session.DeleteTimeseries(timeseries)
+ if tablet, err := createTablet(12); err == nil {
+ status, err := s.session.InsertAlignedTablet(tablet, false)
+ s.checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ var timeout int64 = 1000
+ ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", &timeout)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "12")
+ s.session.DeleteStorageGroup("root.ln.**")
+}
+func createTablet(rowCount int) (*client.Tablet, error) {
+ tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
+ {
+ Measurement: "restart_count",
+ DataType: client.INT32,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "price",
+ DataType: client.DOUBLE,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "tick_count",
+ DataType: client.INT64,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "temperature",
+ DataType: client.FLOAT,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "description",
+ DataType: client.TEXT,
+ Encoding: client.PLAIN,
+ Compressor: client.SNAPPY,
+ },
+ {
+ Measurement: "status",
+ DataType: client.BOOLEAN,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ },
+ }, rowCount)
+
+ if err != nil {
+ return nil, err
+ }
+ ts := time.Now().UTC().UnixNano() / 1000000
+ for row := 0; row < int(rowCount); row++ {
+ ts++
+ tablet.SetTimestamp(ts, row)
+ tablet.SetValueAt(rand.Int31(), 0, row)
+ tablet.SetValueAt(rand.Float64(), 1, row)
+ tablet.SetValueAt(rand.Int63(), 2, row)
+ tablet.SetValueAt(rand.Float32(), 3, row)
+ tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+ tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+ }
+ return tablet, nil
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedTablets() {
+ var timeseries = []string{"root.ln.device1.**"}
+ s.session.DeleteTimeseries(timeseries)
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ s.checkError(s.session.InsertAlignedTablets(tablets, false))
+
+ ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", nil)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "8")
+ s.session.DeleteStorageGroup("root.ln.**")
+}