blob: 631afaa050229e18fa31dea7ab90250ed80114f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package client
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"time"
"github.com/apache/iotdb-client-go/rpc"
)
const (
startIndex = 2
flag = 0x80
)
var (
errClosed error = errors.New("DataSet is Closed")
tsTypeMap map[string]TSDataType = map[string]TSDataType{
"BOOLEAN": BOOLEAN,
"INT32": INT32,
"INT64": INT64,
"FLOAT": FLOAT,
"DOUBLE": DOUBLE,
"TEXT": TEXT,
}
)
type IoTDBRpcDataSet struct {
columnCount int
sessionId int64
queryId int64
lastReadWasNull bool
rowsIndex int
queryDataSet *rpc.TSQueryDataSet
sql string
fetchSize int32
columnNameList []string
columnTypeList []TSDataType
columnOrdinalMap map[string]int32
columnTypeDeduplicatedList []TSDataType
currentBitmap []byte
time []byte
values [][]byte
client *rpc.TSIServiceClient
emptyResultSet bool
ignoreTimeStamp bool
closed bool
timeoutMs *int64
}
func (s *IoTDBRpcDataSet) getColumnIndex(columnName string) int32 {
if s.closed {
return -1
}
return s.columnOrdinalMap[columnName] - startIndex
}
func (s *IoTDBRpcDataSet) getColumnType(columnName string) TSDataType {
if s.closed {
return UNKNOW
}
return s.columnTypeDeduplicatedList[s.getColumnIndex(columnName)]
}
func (s *IoTDBRpcDataSet) isNull(columnIndex int, rowIndex int) bool {
if s.closed {
return true
}
bitmap := s.currentBitmap[columnIndex]
shift := rowIndex % 8
return ((flag >> shift) & (bitmap & 0xff)) == 0
}
func (s *IoTDBRpcDataSet) constructOneRow() error {
if s.closed {
return errClosed
}
// simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
s.time = s.queryDataSet.Time[:8]
s.queryDataSet.Time = s.queryDataSet.Time[8:]
for i := 0; i < len(s.queryDataSet.BitmapList); i++ {
bitmapBuffer := s.queryDataSet.BitmapList[i]
if s.rowsIndex%8 == 0 {
s.currentBitmap[i] = bitmapBuffer[0]
s.queryDataSet.BitmapList[i] = bitmapBuffer[1:]
}
if !s.isNull(i, s.rowsIndex) {
valueBuffer := s.queryDataSet.ValueList[i]
dataType := s.columnTypeDeduplicatedList[i]
switch dataType {
case BOOLEAN:
s.values[i] = valueBuffer[:1]
s.queryDataSet.ValueList[i] = valueBuffer[1:]
case INT32:
s.values[i] = valueBuffer[:4]
s.queryDataSet.ValueList[i] = valueBuffer[4:]
case INT64:
s.values[i] = valueBuffer[:8]
s.queryDataSet.ValueList[i] = valueBuffer[8:]
case FLOAT:
s.values[i] = valueBuffer[:4]
s.queryDataSet.ValueList[i] = valueBuffer[4:]
case DOUBLE:
s.values[i] = valueBuffer[:8]
s.queryDataSet.ValueList[i] = valueBuffer[8:]
case TEXT:
length := bytesToInt32(valueBuffer[:4])
s.values[i] = valueBuffer[4 : 4+length]
s.queryDataSet.ValueList[i] = valueBuffer[4+length:]
default:
return fmt.Errorf("unsupported data type %d", dataType)
}
}
}
s.rowsIndex++
return nil
}
func (s *IoTDBRpcDataSet) GetTimestamp() int64 {
if s.closed {
return -1
}
return bytesToInt64(s.time)
}
func (s *IoTDBRpcDataSet) getText(columnName string) string {
if s.closed {
return ""
}
if columnName == TimestampColumnName {
return time.Unix(0, bytesToInt64(s.time)*1000000).Format(time.RFC3339)
}
columnIndex := s.getColumnIndex(columnName)
if columnIndex < 0 || int(columnIndex) >= len(s.values) || s.isNull(int(columnIndex), s.rowsIndex-1) {
s.lastReadWasNull = true
return ""
}
s.lastReadWasNull = false
return s.getString(int(columnIndex), s.columnTypeDeduplicatedList[columnIndex])
}
func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string {
if s.closed {
return ""
}
valueBytes := s.values[columnIndex]
switch dataType {
case BOOLEAN:
if valueBytes[0] != 0 {
return "true"
}
return "false"
case INT32:
return int32ToString(bytesToInt32(valueBytes))
case INT64:
return int64ToString(bytesToInt64(valueBytes))
case FLOAT:
bits := binary.BigEndian.Uint32(valueBytes)
return float32ToString(math.Float32frombits(bits))
case DOUBLE:
bits := binary.BigEndian.Uint64(valueBytes)
return float64ToString(math.Float64frombits(bits))
case TEXT:
return string(valueBytes)
default:
return ""
}
}
func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} {
if s.closed {
return nil
}
columnIndex := int(s.getColumnIndex(columnName))
if s.isNull(columnIndex, s.rowsIndex-1) {
return nil
}
dataType := s.getColumnType(columnName)
valueBytes := s.values[columnIndex]
switch dataType {
case BOOLEAN:
return bool(valueBytes[0] != 0)
case INT32:
return bytesToInt32(valueBytes)
case INT64:
return bytesToInt64(valueBytes)
case FLOAT:
bits := binary.BigEndian.Uint32(valueBytes)
return math.Float32frombits(bits)
case DOUBLE:
bits := binary.BigEndian.Uint64(valueBytes)
return math.Float64frombits(bits)
case TEXT:
return string(valueBytes)
default:
return nil
}
}
func (s *IoTDBRpcDataSet) getRowRecord() (*RowRecord, error) {
if s.closed {
return nil, errClosed
}
fields := make([]*Field, s.columnCount)
for i := 0; i < s.columnCount; i++ {
columnName := s.columnNameList[i]
field := Field{
name: columnName,
dataType: s.getColumnType(columnName),
value: s.getValue(columnName),
}
fields[i] = &field
}
return &RowRecord{
timestamp: s.GetTimestamp(),
fields: fields,
}, nil
}
func (s *IoTDBRpcDataSet) getBool(columnName string) bool {
if s.closed {
return false
}
columnIndex := s.getColumnIndex(columnName)
if !s.isNull(int(columnIndex), s.rowsIndex-1) {
return s.values[columnIndex][0] != 0
}
s.lastReadWasNull = true
return false
}
func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
if s.closed {
return errClosed
}
count := s.columnCount
if count > len(dest) {
count = len(dest)
}
for i := 0; i < count; i++ {
columnName := s.columnNameList[i]
columnIndex := int(s.getColumnIndex(columnName))
if s.isNull(columnIndex, s.rowsIndex-1) {
continue
}
dataType := s.getColumnType(columnName)
d := dest[i]
valueBytes := s.values[columnIndex]
switch dataType {
case BOOLEAN:
switch t := d.(type) {
case *bool:
*t = bool(valueBytes[0] != 0)
case *string:
if valueBytes[0] != 0 {
*t = "true"
} else {
*t = "false"
}
default:
return fmt.Errorf("dest[%d] types must be *bool or *string", i)
}
case INT32:
switch t := d.(type) {
case *int32:
*t = bytesToInt32(valueBytes)
case *string:
*t = int32ToString(bytesToInt32(valueBytes))
default:
return fmt.Errorf("dest[%d] types must be *int32 or *string", i)
}
case INT64:
switch t := d.(type) {
case *int64:
*t = bytesToInt64(valueBytes)
case *string:
*t = int64ToString(bytesToInt64(valueBytes))
default:
return fmt.Errorf("dest[%d] types must be *int64 or *string", i)
}
case FLOAT:
switch t := d.(type) {
case *float32:
bits := binary.BigEndian.Uint32(valueBytes)
*t = math.Float32frombits(bits)
case *string:
bits := binary.BigEndian.Uint32(valueBytes)
*t = float32ToString(math.Float32frombits(bits))
default:
return fmt.Errorf("dest[%d] types must be *float32 or *string", i)
}
case DOUBLE:
switch t := d.(type) {
case *float64:
bits := binary.BigEndian.Uint64(valueBytes)
*t = math.Float64frombits(bits)
case *string:
bits := binary.BigEndian.Uint64(valueBytes)
*t = float64ToString(math.Float64frombits(bits))
default:
return fmt.Errorf("dest[%d] types must be *float64 or *string", i)
}
case TEXT:
switch t := d.(type) {
case *string:
*t = string(valueBytes)
default:
return fmt.Errorf("dest[%d] types must be *string", i)
}
default:
return nil
}
}
return nil
}
func (s *IoTDBRpcDataSet) getFloat(columnName string) float32 {
if s.closed {
return 0
}
columnIndex := s.getColumnIndex(columnName)
if !s.isNull(int(columnIndex), s.rowsIndex-1) {
s.lastReadWasNull = false
bits := binary.BigEndian.Uint32(s.values[columnIndex])
return math.Float32frombits(bits)
}
s.lastReadWasNull = true
return 0
}
func (s *IoTDBRpcDataSet) getDouble(columnName string) float64 {
if s.closed {
return 0
}
columnIndex := s.getColumnIndex(columnName)
if !s.isNull(int(columnIndex), s.rowsIndex-1) {
s.lastReadWasNull = false
bits := binary.BigEndian.Uint64(s.values[columnIndex])
return math.Float64frombits(bits)
}
s.lastReadWasNull = true
return 0
}
func (s *IoTDBRpcDataSet) getInt32(columnName string) int32 {
if s.closed {
return 0
}
columnIndex := s.getColumnIndex(columnName)
if !s.isNull(int(columnIndex), s.rowsIndex-1) {
s.lastReadWasNull = false
return bytesToInt32(s.values[columnIndex])
}
s.lastReadWasNull = true
return 0
}
func (s *IoTDBRpcDataSet) getInt64(columnName string) int64 {
if s.closed {
return 0
}
if columnName == TimestampColumnName {
return bytesToInt64(s.time)
}
columnIndex := s.getColumnIndex(columnName)
bys := s.values[columnIndex]
if !s.isNull(int(columnIndex), s.rowsIndex-1) {
s.lastReadWasNull = false
return bytesToInt64(bys)
}
s.lastReadWasNull = true
return 0
}
func (s *IoTDBRpcDataSet) hasCachedResults() bool {
if s.closed {
return false
}
return (s.queryDataSet != nil && len(s.queryDataSet.Time) > 0)
}
func (s *IoTDBRpcDataSet) next() (bool, error) {
if s.closed {
return false, errClosed
}
if s.hasCachedResults() {
s.constructOneRow()
return true, nil
}
if s.emptyResultSet {
return false, nil
}
r, err := s.fetchResults()
if err == nil && r {
s.constructOneRow()
return true, nil
}
return false, nil
}
func (s *IoTDBRpcDataSet) fetchResults() (bool, error) {
if s.closed {
return false, errClosed
}
s.rowsIndex = 0
req := rpc.TSFetchResultsReq{
SessionId: s.sessionId,
Statement: s.sql,
FetchSize: s.fetchSize,
QueryId: s.queryId,
IsAlign: true,
Timeout: s.timeoutMs,
}
resp, err := s.client.FetchResults(context.Background(), &req)
if err != nil {
return false, err
}
if err = VerifySuccess(resp.Status); err != nil {
return false, err
}
if !resp.HasResultSet {
s.emptyResultSet = true
} else {
s.queryDataSet = resp.GetQueryDataSet()
}
return resp.HasResultSet, nil
}
func (s *IoTDBRpcDataSet) IsClosed() bool {
return s.closed
}
func (s *IoTDBRpcDataSet) Close() (err error) {
if s.IsClosed() {
return nil
}
if s.client != nil {
closeRequest := &rpc.TSCloseOperationReq{
SessionId: s.sessionId,
QueryId: &s.queryId,
}
var status *rpc.TSStatus
status, err = s.client.CloseOperation(context.Background(), closeRequest)
if err == nil {
err = VerifySuccess(status)
}
}
s.columnCount = 0
s.sessionId = -1
s.queryId = -1
s.rowsIndex = -1
s.queryDataSet = nil
s.sql = ""
s.fetchSize = 0
s.columnNameList = nil
s.columnTypeList = nil
s.columnOrdinalMap = nil
s.columnTypeDeduplicatedList = nil
s.currentBitmap = nil
s.time = nil
s.values = nil
s.client = nil
s.emptyResultSet = true
s.closed = true
return err
}
func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypes []string,
columnNameIndex map[string]int32,
queryId int64, client *rpc.TSIServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet,
ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64) *IoTDBRpcDataSet {
ds := &IoTDBRpcDataSet{
sql: sql,
columnNameList: columnNameList,
ignoreTimeStamp: ignoreTimeStamp,
queryId: queryId,
client: client,
sessionId: sessionId,
queryDataSet: queryDataSet,
fetchSize: fetchSize,
currentBitmap: make([]byte, len(columnNameList)),
values: make([][]byte, len(columnTypes)),
columnCount: len(columnNameList),
closed: false,
timeoutMs: timeoutMs,
}
ds.columnTypeList = make([]TSDataType, 0)
// deduplicate and map
ds.columnOrdinalMap = make(map[string]int32)
if !ignoreTimeStamp {
ds.columnOrdinalMap[TimestampColumnName] = 1
}
if columnNameIndex != nil {
ds.columnTypeDeduplicatedList = make([]TSDataType, len(columnNameIndex))
for i, name := range columnNameList {
columnTypeString := columnTypes[i]
columnDataType := tsTypeMap[columnTypeString]
ds.columnTypeList = append(ds.columnTypeList, columnDataType)
if _, exists := ds.columnOrdinalMap[name]; !exists {
index := columnNameIndex[name]
ds.columnOrdinalMap[name] = index + startIndex
ds.columnTypeDeduplicatedList[index] = tsTypeMap[columnTypeString]
}
}
} else {
ds.columnTypeDeduplicatedList = make([]TSDataType, ds.columnCount)
index := startIndex
for i := 0; i < len(columnNameList); i++ {
name := columnNameList[i]
dataType := tsTypeMap[columnTypes[i]]
ds.columnTypeList = append(ds.columnTypeList, dataType)
ds.columnTypeDeduplicatedList[i] = dataType
if _, exists := ds.columnOrdinalMap[name]; !exists {
ds.columnOrdinalMap[name] = int32(index)
index++
}
}
}
return ds
}