[IOTDB-4009] Add aligned timeseries APIs for go client (#47)

diff --git a/client/session.go b/client/session.go
index 187ad87..d591446 100644
--- a/client/session.go
+++ b/client/session.go
@@ -418,13 +418,14 @@
 func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
 	measurements []string,
 	types []TSDataType,
-	values []interface{}) (*rpc.TSInsertRecordReq, error) {
+	values []interface{},
+	isAligned bool) (*rpc.TSInsertRecordReq, error) {
 	request := &rpc.TSInsertRecordReq{}
 	request.SessionId = s.sessionId
 	request.PrefixPath = deviceId
 	request.Timestamp = time
 	request.Measurements = measurements
-
+	request.IsAligned = &isAligned
 	if bys, err := valuesToBytes(types, values); err == nil {
 		request.Values = bys
 	} else {
@@ -434,7 +435,24 @@
 }
 
 func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
-	request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values)
+	request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false)
+	if err != nil {
+		return nil, err
+	}
+	r, err = s.client.InsertRecord(context.Background(), request)
+
+	if err != nil && r == nil {
+		if s.reconnect() {
+			request.SessionId = s.sessionId
+			r, err = s.client.InsertRecord(context.Background(), request)
+		}
+	}
+
+	return r, err
+}
+
+func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
+	request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true)
 	if err != nil {
 		return nil, err
 	}
@@ -455,6 +473,7 @@
 	measurementsSlice [][]string
 	dataTypesSlice    [][]TSDataType
 	valuesSlice       [][]interface{}
+	isAligned         bool
 }
 
 func (d *deviceData) Len() int {
@@ -518,6 +537,49 @@
 	return r, err
 }
 
+func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *rpc.TSStatus, err error) {
+	length := len(timestamps)
+	if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length {
+		return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal")
+	}
+
+	if !sorted {
+		sort.Sort(&deviceData{
+			timestamps:        timestamps,
+			measurementsSlice: measurementsSlice,
+			dataTypesSlice:    dataTypesSlice,
+			valuesSlice:       valuesSlice,
+		})
+	}
+
+	valuesList := make([][]byte, length)
+	for i := 0; i < length; i++ {
+		if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil {
+			return nil, err
+		}
+	}
+	var isAligned = true
+	request := &rpc.TSInsertRecordsOfOneDeviceReq{
+		SessionId:        s.sessionId,
+		PrefixPath:       deviceId,
+		Timestamps:       timestamps,
+		MeasurementsList: measurementsSlice,
+		ValuesList:       valuesList,
+		IsAligned:        &isAligned,
+	}
+
+	r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+	if err != nil && r == nil {
+		if s.reconnect() {
+			request.SessionId = s.sessionId
+			r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+		}
+	}
+
+	return r, err
+}
+
 /*
  *insert multiple rows of data, records are independent to each other, in other words, there's no relationship
  *between those records
@@ -532,7 +594,24 @@
  */
 func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
 	timestamps []int64) (r *rpc.TSStatus, err error) {
-	request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps)
+	request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false)
+	if err != nil {
+		return nil, err
+	} else {
+		r, err = s.client.InsertRecords(context.Background(), request)
+		if err != nil && r == nil {
+			if s.reconnect() {
+				request.SessionId = s.sessionId
+				r, err = s.client.InsertRecords(context.Background(), request)
+			}
+		}
+		return r, err
+	}
+}
+
+func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
+	timestamps []int64) (r *rpc.TSStatus, err error) {
+	request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true)
 	if err != nil {
 		return nil, err
 	} else {
@@ -560,7 +639,29 @@
 			}
 		}
 	}
-	request, err := s.genInsertTabletsReq(tablets)
+	request, err := s.genInsertTabletsReq(tablets, false)
+	if err != nil {
+		return nil, err
+	}
+	r, err = s.client.InsertTablets(context.Background(), request)
+	if err != nil && r == nil {
+		if s.reconnect() {
+			request.SessionId = s.sessionId
+			r, err = s.client.InsertTablets(context.Background(), request)
+		}
+	}
+	return r, err
+}
+
+func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus, err error) {
+	if !sorted {
+		for _, t := range tablets {
+			if err := t.Sort(); err != nil {
+				return nil, err
+			}
+		}
+	}
+	request, err := s.genInsertTabletsReq(tablets, true)
 	if err != nil {
 		return nil, err
 	}
@@ -633,7 +734,7 @@
 	return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
 }
 
-func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsReq, error) {
+func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.TSInsertTabletsReq, error) {
 	var (
 		length           = len(tablets)
 		deviceIds        = make([]string, length)
@@ -665,12 +766,13 @@
 		ValuesList:       valuesList,
 		TimestampsList:   timestampsList,
 		SizeList:         sizeList,
+		IsAligned:        &isAligned,
 	}
 	return &request, nil
 }
 
 func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
-	timestamps []int64) (*rpc.TSInsertRecordsReq, error) {
+	timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
 	length := len(deviceIds)
 	if length != len(timestamps) || length != len(measurements) || length != len(values) {
 		return nil, errLength
@@ -680,6 +782,7 @@
 		PrefixPaths:      deviceIds,
 		MeasurementsList: measurements,
 		Timestamps:       timestamps,
+		IsAligned:        &isAligned,
 	}
 	v := make([][]byte, length)
 	for i := 0; i < len(measurements); i++ {
@@ -760,7 +863,7 @@
 			return nil, err
 		}
 	}
-	request, err := s.genTSInsertTabletReq(tablet)
+	request, err := s.genTSInsertTabletReq(tablet, false)
 	if err != nil {
 		return nil, err
 	}
@@ -777,7 +880,30 @@
 	return r, err
 }
 
-func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, error) {
+func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, err error) {
+	if !sorted {
+		if err := tablet.Sort(); err != nil {
+			return nil, err
+		}
+	}
+	request, err := s.genTSInsertTabletReq(tablet, true)
+	if err != nil {
+		return nil, err
+	}
+
+	r, err = s.client.InsertTablet(context.Background(), request)
+
+	if err != nil && r == nil {
+		if s.reconnect() {
+			request.SessionId = s.sessionId
+			r, err = s.client.InsertTablet(context.Background(), request)
+		}
+	}
+
+	return r, err
+}
+
+func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) (*rpc.TSInsertTabletReq, error) {
 	if values, err := tablet.getValuesBytes(); err == nil {
 		request := &rpc.TSInsertTabletReq{
 			SessionId:    s.sessionId,
@@ -787,6 +913,7 @@
 			Timestamps:   tablet.GetTimestampBytes(),
 			Types:        tablet.getDataTypes(),
 			Size:         int32(tablet.rowCount),
+			IsAligned:    &isAligned,
 		}
 		return request, nil
 	} else {
diff --git a/example/session_example.go b/example/session_example.go
index c10154b..2398b94 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -87,7 +87,7 @@
 	} else {
 		log.Fatal(err)
 	}
-
+	deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
 	insertTablets()
 	deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
 
@@ -110,6 +110,23 @@
 	//0.12.x and newer
 	insertRecordsOfOneDevice()
 	deleteTimeseries("root.sg1.dev0.*")
+
+	insertAlignedRecord()
+	deleteTimeseries("root.al1.dev1.*")
+
+	insertAlignedRecords()
+	deleteTimeseries("root.al1.**")
+
+	insertAlignedRecordsOfOneDevice()
+	deleteTimeseries("root.al1.dev4.*")
+
+	deleteStorageGroup("root.ln")
+	insertAlignedTablet()
+	deleteTimeseries("root.ln.device1.*")
+
+	deleteStorageGroup("root.ln")
+	insertAlignedTablets()
+	deleteTimeseries("root.ln.device1.*")
 }
 
 func printDevice1(sds *client.SessionDataSet) {
@@ -309,6 +326,25 @@
 	checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
 }
 
+func insertAlignedRecord() {
+	var (
+		deviceId           = "root.al1.dev1"
+		measurements       = []string{"status"}
+		values             = []interface{}{"123"}
+		dataTypes          = []client.TSDataType{client.TEXT}
+		timestamp    int64 = 12
+	)
+	checkError(session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+	sessionDataSet, err := session.ExecuteStatement("show devices")
+	if err == nil {
+		printDataSet0(sessionDataSet)
+		sessionDataSet.Close()
+	} else {
+		log.Println(err)
+	}
+	fmt.Println()
+}
+
 func insertRecords() {
 	var (
 		deviceId     = []string{"root.sg1.dev1"}
@@ -320,6 +356,25 @@
 	checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
 }
 
+func insertAlignedRecords() {
+	var (
+		deviceIds    = []string{"root.al1.dev2", "root.al1.dev3"}
+		measurements = [][]string{{"status"}, {"temperature"}}
+		dataTypes    = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+		values       = [][]interface{}{{"33"}, {"44"}}
+		timestamps   = []int64{12, 13}
+	)
+	checkError(session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+	sessionDataSet, err := session.ExecuteStatement("show devices")
+	if err == nil {
+		printDataSet0(sessionDataSet)
+		sessionDataSet.Close()
+	} else {
+		log.Println(err)
+	}
+	fmt.Println()
+}
+
 func insertRecordsOfOneDevice() {
 	ts := time.Now().UTC().UnixNano() / 1000000
 	var (
@@ -341,6 +396,42 @@
 	checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
 }
 
+func insertAlignedRecordsOfOneDevice() {
+	ts := time.Now().UTC().UnixNano() / 1000000
+	var (
+		deviceId          = "root.al1.dev4"
+		measurementsSlice = [][]string{
+			{"restart_count", "tick_count", "price"},
+			{"temperature", "description", "status"},
+		}
+		dataTypes = [][]client.TSDataType{
+			{client.INT32, client.INT64, client.DOUBLE},
+			{client.FLOAT, client.TEXT, client.BOOLEAN},
+		}
+		values = [][]interface{}{
+			{int32(1), int64(2018), float64(1988.1)},
+			{float32(12.1), "Test Device 1", false},
+		}
+		timestamps = []int64{ts, ts - 1}
+	)
+	checkError(session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+	sessionDataSet, err := session.ExecuteStatement("show devices")
+	if err == nil {
+		printDataSet0(sessionDataSet)
+		sessionDataSet.Close()
+	} else {
+		log.Println(err)
+	}
+	sessionDataSetNew, err := session.ExecuteStatement("select restart_count,tick_count,price,temperature,description,status from  root.al1.dev4")
+	if err == nil {
+		printDataSet0(sessionDataSetNew)
+		sessionDataSetNew.Close()
+	} else {
+		log.Println(err)
+	}
+	fmt.Println()
+}
+
 func deleteData() {
 	var (
 		paths           = []string{"root.sg1.dev1.status"}
@@ -359,6 +450,22 @@
 	}
 }
 
+func insertAlignedTablet() {
+	if tablet, err := createTablet(12); err == nil {
+		status, err := session.InsertAlignedTablet(tablet, false)
+		checkError(status, err)
+	} else {
+		log.Fatal(err)
+	}
+	var timeout int64 = 1000
+	if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {
+		printDevice1(ds)
+		ds.Close()
+	} else {
+		log.Fatal(err)
+	}
+}
+
 func createTablet(rowCount int) (*client.Tablet, error) {
 	tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
 		{
@@ -426,6 +533,20 @@
 	checkError(session.InsertTablets(tablets, false))
 }
 
+func insertAlignedTablets() {
+	tablet1, err := createTablet(8)
+	if err != nil {
+		log.Fatal(err)
+	}
+	tablet2, err := createTablet(4)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	tablets := []*client.Tablet{tablet1, tablet2}
+	checkError(session.InsertAlignedTablets(tablets, false))
+}
+
 func setTimeZone() {
 	var timeZone = "GMT"
 	session.SetTimeZone(timeZone)
@@ -477,6 +598,18 @@
 	var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
 		"insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
 	checkError(session.ExecuteBatchStatement(sqls))
+	var (
+		paths     []string = []string{"root.ln.wf02.wt02.s5"}
+		startTime int64    = 1
+		endTime   int64    = 200
+	)
+	sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
+	if err == nil {
+		printDataSet2(sessionDataSet)
+		sessionDataSet.Close()
+	} else {
+		log.Println(err)
+	}
 }
 
 func checkError(status *rpc.TSStatus, err error) {
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 645ac60..171d0d6 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -20,6 +20,9 @@
 package e2e
 
 import (
+	"fmt"
+	"log"
+	"math/rand"
 	"testing"
 	"time"
 
@@ -110,3 +113,169 @@
 	assert.NoError(ds.Scan(&status))
 	assert.Equal(status, "Working")
 }
+
+func (s *e2eTestSuite) Test_InsertAlignedRecord() {
+	var (
+		deviceId     = "root.tsg2.dev1"
+		measurements = []string{"status"}
+		dataTypes    = []client.TSDataType{client.TEXT}
+		values       = []interface{}{"Working"}
+		timestamp    = time.Now().UTC().UnixNano() / 1000000
+	)
+	s.checkError(s.session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+
+	ds, err := s.session.ExecuteQueryStatement("select status from root.tsg2.dev1", nil)
+	assert := s.Require()
+	assert.NoError(err)
+	defer ds.Close()
+	assert.True(ds.Next())
+	var status string
+	assert.NoError(ds.Scan(&status))
+	assert.Equal(status, "Working")
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecords() {
+	var (
+		deviceIds    = []string{"root.al1.dev2", "root.al1.dev3"}
+		measurements = [][]string{{"status"}, {"temperature"}}
+		dataTypes    = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+		values       = [][]interface{}{{"33"}, {"44"}}
+		timestamps   = []int64{12, 13}
+	)
+	s.checkError(s.session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+	ds, err := s.session.ExecuteQueryStatement("select temperature from root.al1.dev3", nil)
+	assert := s.Require()
+	assert.NoError(err)
+	defer ds.Close()
+	assert.True(ds.Next())
+	var temperature string
+	assert.NoError(ds.Scan(&temperature))
+	assert.Equal(temperature, "44")
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecordsOfOneDevice() {
+	ts := time.Now().UTC().UnixNano() / 1000000
+	var (
+		deviceId          = "root.al1.dev4"
+		measurementsSlice = [][]string{
+			{"restart_count", "tick_count", "price"},
+			{"temperature", "description", "status"},
+		}
+		dataTypes = [][]client.TSDataType{
+			{client.INT32, client.INT64, client.DOUBLE},
+			{client.FLOAT, client.TEXT, client.BOOLEAN},
+		}
+		values = [][]interface{}{
+			{int32(1), int64(2018), float64(1988.1)},
+			{float32(12.1), "Test Device 1", false},
+		}
+		timestamps = []int64{ts, ts - 1}
+	)
+	s.checkError(s.session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+	ds, err := s.session.ExecuteStatement("select temperature from root.al1.dev4")
+	assert := s.Require()
+	assert.NoError(err)
+	defer ds.Close()
+	assert.True(ds.Next())
+	var status string
+	assert.NoError(ds.Scan(&status))
+	assert.Equal(status, "12.1")
+}
+func (s *e2eTestSuite) Test_InsertAlignedTablet() {
+	var timeseries = []string{"root.ln.device1.**"}
+	s.session.DeleteTimeseries(timeseries)
+	if tablet, err := createTablet(12); err == nil {
+		status, err := s.session.InsertAlignedTablet(tablet, false)
+		s.checkError(status, err)
+	} else {
+		log.Fatal(err)
+	}
+	var timeout int64 = 1000
+	ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", &timeout)
+	assert := s.Require()
+	assert.NoError(err)
+	defer ds.Close()
+	assert.True(ds.Next())
+	var status string
+	assert.NoError(ds.Scan(&status))
+	assert.Equal(status, "12")
+	s.session.DeleteStorageGroup("root.ln.**")
+}
+func createTablet(rowCount int) (*client.Tablet, error) {
+	tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
+		{
+			Measurement: "restart_count",
+			DataType:    client.INT32,
+			Encoding:    client.RLE,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "price",
+			DataType:    client.DOUBLE,
+			Encoding:    client.GORILLA,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "tick_count",
+			DataType:    client.INT64,
+			Encoding:    client.RLE,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "temperature",
+			DataType:    client.FLOAT,
+			Encoding:    client.GORILLA,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "description",
+			DataType:    client.TEXT,
+			Encoding:    client.PLAIN,
+			Compressor:  client.SNAPPY,
+		},
+		{
+			Measurement: "status",
+			DataType:    client.BOOLEAN,
+			Encoding:    client.RLE,
+			Compressor:  client.SNAPPY,
+		},
+	}, rowCount)
+
+	if err != nil {
+		return nil, err
+	}
+	ts := time.Now().UTC().UnixNano() / 1000000
+	for row := 0; row < int(rowCount); row++ {
+		ts++
+		tablet.SetTimestamp(ts, row)
+		tablet.SetValueAt(rand.Int31(), 0, row)
+		tablet.SetValueAt(rand.Float64(), 1, row)
+		tablet.SetValueAt(rand.Int63(), 2, row)
+		tablet.SetValueAt(rand.Float32(), 3, row)
+		tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+		tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+	}
+	return tablet, nil
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedTablets() {
+	var timeseries = []string{"root.ln.device1.**"}
+	s.session.DeleteTimeseries(timeseries)
+	tablet1, err := createTablet(8)
+	if err != nil {
+		log.Fatal(err)
+	}
+	tablet2, err := createTablet(4)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	tablets := []*client.Tablet{tablet1, tablet2}
+	s.checkError(s.session.InsertAlignedTablets(tablets, false))
+
+	ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", nil)
+	assert := s.Require()
+	assert.NoError(err)
+	defer ds.Close()
+	assert.True(ds.Next())
+	var status string
+	assert.NoError(ds.Scan(&status))
+	assert.Equal(status, "8")
+	s.session.DeleteStorageGroup("root.ln.**")
+}