| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading.Tasks; |
| using Thrift; |
| |
| namespace Apache.IoTDB.DataStructure |
| { |
| public class SessionDataSet |
| { |
| private readonly long _queryId; |
| private readonly string _sql; |
| private readonly List<string> _columnNames; |
| private readonly Dictionary<string, int> _columnNameIndexMap; |
| private readonly Dictionary<int, int> _duplicateLocation; |
| private readonly List<string> _columnTypeLst; |
| private TSQueryDataSet _queryDataset; |
| private readonly byte[] _currentBitmap; |
| private readonly int _columnSize; |
| private List<ByteBuffer> _valueBufferLst, _bitmapBufferLst; |
| private ByteBuffer _timeBuffer; |
| private readonly ConcurrentClientQueue _clientQueue; |
| private int _rowIndex; |
| private bool _hasCatchedResult; |
| private RowRecord _cachedRowRecord; |
| private readonly bool _isClosed = false; |
| |
| |
| private string TimestampStr => "Time"; |
| private int StartIndex => 2; |
| private int Flag => 0x80; |
| private int DefaultTimeout => 10000; |
| |
| public int FetchSize { get; set; } |
| |
| public SessionDataSet(string sql, TSExecuteStatementResp resp, ConcurrentClientQueue clientQueue) |
| { |
| _clientQueue = clientQueue; |
| _sql = sql; |
| _queryDataset = resp.QueryDataSet; |
| _queryId = resp.QueryId; |
| _columnSize = resp.Columns.Count; |
| _currentBitmap = new byte[_columnSize]; |
| _columnNames = new List<string>(); |
| _timeBuffer = new ByteBuffer(_queryDataset.Time); |
| _columnNameIndexMap = new Dictionary<string, int>(); |
| _columnTypeLst = new List<string>(); |
| _duplicateLocation = new Dictionary<int, int>(); |
| _valueBufferLst = new List<ByteBuffer>(); |
| _bitmapBufferLst = new List<ByteBuffer>(); |
| // some internal variable |
| _hasCatchedResult = false; |
| _rowIndex = 0; |
| if (resp.ColumnNameIndexMap != null) |
| { |
| for (var index = 0; index < resp.Columns.Count; index++) |
| { |
| _columnNames.Add(""); |
| _columnTypeLst.Add(""); |
| } |
| |
| for (var index = 0; index < resp.Columns.Count; index++) |
| { |
| var name = resp.Columns[index]; |
| _columnNames[resp.ColumnNameIndexMap[name]] = name; |
| _columnTypeLst[resp.ColumnNameIndexMap[name]] = resp.DataTypeList[index]; |
| } |
| } |
| else |
| { |
| _columnNames = resp.Columns; |
| _columnTypeLst = resp.DataTypeList; |
| } |
| |
| for (int index = 0; index < _columnNames.Count; index++) |
| { |
| var columnName = _columnNames[index]; |
| if (_columnNameIndexMap.ContainsKey(columnName)) |
| { |
| _duplicateLocation[index] = _columnNameIndexMap[columnName]; |
| } |
| else |
| { |
| _columnNameIndexMap[columnName] = index; |
| } |
| |
| _valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[index])); |
| _bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[index])); |
| } |
| } |
| |
| private List<string> GetColumnNames() |
| { |
| var nameLst = new List<string> {"timestamp"}; |
| nameLst.AddRange(_columnNames); |
| return nameLst; |
| } |
| |
| public void ShowTableNames() |
| { |
| var str = GetColumnNames() |
| .Aggregate("", (current, name) => current + (name + "\t\t")); |
| |
| Console.WriteLine(str); |
| } |
| |
| public bool HasNext() |
| { |
| if (_hasCatchedResult) |
| { |
| return true; |
| } |
| |
| // we have consumed all current data, fetch some more |
| if (!_timeBuffer.HasRemaining()) |
| { |
| if (!FetchResults()) |
| { |
| return false; |
| } |
| } |
| |
| ConstructOneRow(); |
| _hasCatchedResult = true; |
| return true; |
| } |
| |
| public RowRecord Next() |
| { |
| if (!_hasCatchedResult) |
| { |
| if (!HasNext()) |
| { |
| return null; |
| } |
| } |
| |
| _hasCatchedResult = false; |
| return _cachedRowRecord; |
| } |
| |
| private TSDataType GetDataTypeFromStr(string str) |
| { |
| return str switch |
| { |
| "BOOLEAN" => TSDataType.BOOLEAN, |
| "INT32" => TSDataType.INT32, |
| "INT64" => TSDataType.INT64, |
| "FLOAT" => TSDataType.FLOAT, |
| "DOUBLE" => TSDataType.DOUBLE, |
| "TEXT" => TSDataType.TEXT, |
| "NULLTYPE" => TSDataType.NONE, |
| _ => TSDataType.TEXT |
| }; |
| } |
| |
| private void ConstructOneRow() |
| { |
| List<object> fieldLst = new List<Object>(); |
| |
| for (int i = 0; i < _columnSize; i++) |
| { |
| if (_duplicateLocation.ContainsKey(i)) |
| { |
| var field = fieldLst[_duplicateLocation[i]]; |
| fieldLst.Add(field); |
| } |
| else |
| { |
| var columnValueBuffer = _valueBufferLst[i]; |
| var columnBitmapBuffer = _bitmapBufferLst[i]; |
| |
| if (_rowIndex % 8 == 0) |
| { |
| _currentBitmap[i] = columnBitmapBuffer.GetByte(); |
| } |
| |
| object localField; |
| if (!IsNull(i, _rowIndex)) |
| { |
| var columnDataType = GetDataTypeFromStr(_columnTypeLst[i]); |
| |
| |
| switch (columnDataType) |
| { |
| case TSDataType.BOOLEAN: |
| localField = columnValueBuffer.GetBool(); |
| break; |
| case TSDataType.INT32: |
| localField = columnValueBuffer.GetInt(); |
| break; |
| case TSDataType.INT64: |
| localField = columnValueBuffer.GetLong(); |
| break; |
| case TSDataType.FLOAT: |
| localField = columnValueBuffer.GetFloat(); |
| break; |
| case TSDataType.DOUBLE: |
| localField = columnValueBuffer.GetDouble(); |
| break; |
| case TSDataType.TEXT: |
| localField = columnValueBuffer.GetStr(); |
| break; |
| default: |
| string err_msg = "value format not supported"; |
| throw new TException(err_msg, null); |
| } |
| |
| fieldLst.Add(localField); |
| } |
| else |
| { |
| localField = null; |
| fieldLst.Add("NULL"); |
| } |
| } |
| } |
| |
| long timestamp = _timeBuffer.GetLong(); |
| _rowIndex += 1; |
| _cachedRowRecord = new RowRecord(timestamp, fieldLst, _columnNames); |
| } |
| |
| private bool IsNull(int loc, int row_index) |
| { |
| byte bitmap = _currentBitmap[loc]; |
| int shift = row_index % 8; |
| return ((Flag >> shift) & bitmap) == 0; |
| } |
| |
| private bool FetchResults() |
| { |
| _rowIndex = 0; |
| var myClient = _clientQueue.Take(); |
| var req = new TSFetchResultsReq(myClient.SessionId, _sql, FetchSize, _queryId, true) |
| { |
| Timeout = DefaultTimeout |
| }; |
| try |
| { |
| var task = myClient.ServiceClient.fetchResultsAsync(req); |
| task.Wait(); |
| var resp = task.Result; |
| |
| if (resp.HasResultSet) |
| { |
| _queryDataset = resp.QueryDataSet; |
| // reset buffer |
| _timeBuffer = new ByteBuffer(resp.QueryDataSet.Time); |
| _valueBufferLst = new List<ByteBuffer>(); |
| _bitmapBufferLst = new List<ByteBuffer>(); |
| for (int index = 0; index < _queryDataset.ValueList.Count; index++) |
| { |
| _valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[index])); |
| _bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[index])); |
| } |
| |
| // reset row index |
| _rowIndex = 0; |
| } |
| |
| return resp.HasResultSet; |
| } |
| catch (TException e) |
| { |
| throw new TException("Cannot fetch result from server, because of network connection", e); |
| } |
| finally |
| { |
| _clientQueue.Add(myClient); |
| } |
| } |
| |
| public async Task Close() |
| { |
| if (!_isClosed) |
| { |
| var myClient = _clientQueue.Take(); |
| var req = new TSCloseOperationReq(myClient.SessionId) |
| { |
| QueryId = _queryId |
| }; |
| |
| try |
| { |
| await myClient.ServiceClient.closeOperationAsync(req); |
| } |
| catch (TException e) |
| { |
| throw new TException("Operation Handle Close Failed", e); |
| } |
| finally |
| { |
| _clientQueue.Add(myClient); |
| } |
| } |
| } |
| } |
| } |