Fix SessionDataSet: row count, async dispose, and null measurement issues (#53, #54, #55) (#56)
* Fix SessionDataSet: row count, async dispose, and null measurement issues (#53, #54, #55)
修复 SessionDataSet 行数统计、异步释放及空值测点问题
- Fix CurrentBatchRowCount() always returning 0 by eagerly constructing
the first TsBlock in RpcDataSet constructor when initial data is available
修复 CurrentBatchRowCount() 始终返回 0 的问题,在构造函数中预先反序列化首个 TsBlock
- Add IAsyncDisposable to SessionDataSet and RpcDataSet, providing
DisposeAsync() that properly awaits Close() to avoid sync-over-async deadlocks
为 SessionDataSet 和 RpcDataSet 添加 IAsyncDisposable 接口,支持 await using 语法
- Fix GetRow() including null-valued columns in RowRecord by using
IsNull() check before calling type-specific getters, instead of relying
on value type null checks which always pass for int/bool/float/etc.
修复 GetRow() 中值类型默认值绕过 null 检查导致空值列被错误包含的问题
* Fix SessionDataSet.Close() idempotency and add regression tests
修复 SessionDataSet.Close() 幂等性问题并添加回归测试
- Set _isClosed = true in Close() finally block to prevent
NullReferenceException on repeated Close/Dispose/DisposeAsync calls
在 Close() 的 finally 块中设置 _isClosed = true,防止重复调用导致空引用异常
- Add RpcDataSetTests covering:
- CurrentBatchRowCount returns correct size before first row read
- CurrentBatchRowCount returns 0 when no data
- GetRow excludes null-valued measurements for value types (BOOLEAN, INT32, DOUBLE)
- DataTypes/Measurements/Values lists stay consistent
添加 RpcDataSet 回归测试覆盖行数统计和空值测点排除
diff --git a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs
index 350c417..1cc7c79 100644
--- a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs
+++ b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs
@@ -26,7 +26,7 @@
namespace Apache.IoTDB.DataStructure
{
- public class RpcDataSet : System.IDisposable
+ public class RpcDataSet : System.IDisposable, System.IAsyncDisposable
{
private const string TimestampColumnName = "Time";
private const string DefaultTimeFormat = "yyyy-MM-dd HH:mm:ss.fff";
@@ -140,6 +140,11 @@
_tsBlockSize = 0;
_tsBlockIndex = -1;
+ if (HasCachedByteBuffer())
+ {
+ ConstructOneTsBlock();
+ }
+
_zoneId = FindTimeZoneSafe(zoneId);
if (columnIndex2TsBlockColumnIndexList.Count != _columnNameList.Count)
@@ -172,6 +177,22 @@
GC.SuppressFinalize(this);
}
+ public async ValueTask DisposeAsync()
+ {
+ if (!disposedValue)
+ {
+ try
+ {
+ await Close().ConfigureAwait(false);
+ }
+ catch
+ {
+ }
+ disposedValue = true;
+ }
+ GC.SuppressFinalize(this);
+ }
+
public async Task Close()
{
if (_isClosed) return;
@@ -634,11 +655,9 @@
long timestamp = 0;
foreach (string columnName in columns)
{
- object localfield;
string typeStr = _columnTypeList[i];
TSDataType dataType = Client.GetDataTypeByStr(typeStr);
- // Identify the real time column by tsBlock index, not by data type
int tsBlockColumnIndex = GetTsBlockColumnIndexForColumnName(columnName);
if (tsBlockColumnIndex == -1)
{
@@ -647,6 +666,13 @@
continue;
}
+ if (IsNull(tsBlockColumnIndex, _tsBlockIndex))
+ {
+ i += 1;
+ continue;
+ }
+
+ object localfield;
switch (dataType)
{
case TSDataType.BOOLEAN:
@@ -682,12 +708,9 @@
string err_msg = "value format not supported";
throw new TException(err_msg, null);
}
- if (localfield != null)
- {
- fieldList.Add(localfield);
- measurementList.Add(columnName);
- dataTypeList.Add(dataType);
- }
+ fieldList.Add(localfield);
+ measurementList.Add(columnName);
+ dataTypeList.Add(dataType);
i += 1;
}
return new RowRecord(timestamp, fieldList, measurementList, dataTypeList);
diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
index dc21928..bc276af 100644
--- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
+++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
@@ -25,7 +25,7 @@
namespace Apache.IoTDB.DataStructure
{
- public class SessionDataSet : System.IDisposable
+ public class SessionDataSet : System.IDisposable, System.IAsyncDisposable
{
private readonly long _queryId;
private readonly long _statementId;
@@ -154,6 +154,7 @@
}
finally
{
+ _isClosed = true;
await _rpcDataSet.Close();
_clientQueue.Add(_client);
_client = null;
@@ -184,5 +185,21 @@
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
+
+ public async ValueTask DisposeAsync()
+ {
+ if (!disposedValue)
+ {
+ try
+ {
+ await this.Close().ConfigureAwait(false);
+ }
+ catch
+ {
+ }
+ disposedValue = true;
+ }
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs b/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs
new file mode 100644
index 0000000..cce3bb4
--- /dev/null
+++ b/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.IoTDB.DataStructure;
+using NUnit.Framework;
+
+namespace Apache.IoTDB.Tests
+{
+ [TestFixture]
+ public class RpcDataSetTests
+ {
+ /// <summary>
+ /// Builds a serialized TsBlock with 2 rows and 3 value columns (INT32, BOOLEAN, DOUBLE).
+ /// Row 0: time=1000, int32=42, boolean=null, double=3.14
+ /// Row 1: time=2000, int32=null, boolean=true, double=null
+ /// </summary>
+ private static byte[] BuildTestTsBlockBytes()
+ {
+ var buf = new ByteBuffer(256);
+
+ // 1. value column count
+ buf.AddInt(3);
+
+ // 2. value column data types: INT32(1), BOOLEAN(0), DOUBLE(4)
+ buf.AddByte((byte)TSDataType.INT32);
+ buf.AddByte((byte)TSDataType.BOOLEAN);
+ buf.AddByte((byte)TSDataType.DOUBLE);
+
+ // 3. position count
+ buf.AddInt(2);
+
+ // 4. column encodings: time=Int64Array(2), int32=Int32Array(1), boolean=ByteArray(0), double=Int64Array(2)
+ buf.AddByte((byte)ColumnEncoding.Int64Array);
+ buf.AddByte((byte)ColumnEncoding.Int32Array);
+ buf.AddByte((byte)ColumnEncoding.ByteArray);
+ buf.AddByte((byte)ColumnEncoding.Int64Array);
+
+ // 5. Time column (Int64Array): no nulls, 2 values
+ buf.AddByte(0); // mayHaveNull = false
+ buf.AddLong(1000L);
+ buf.AddLong(2000L);
+
+ // 6. INT32 column (Int32Array): row1 is null
+ buf.AddByte(1); // mayHaveNull = true
+ // null indicators packed: [false, true] → bit7=0, bit6=1 → 0x40
+ buf.AddByte(0x40);
+ // only non-null value (row 0)
+ buf.AddInt(42);
+
+ // 7. BOOLEAN column (ByteArray): row0 is null
+ buf.AddByte(1); // mayHaveNull = true
+ // null indicators packed: [true, false] → bit7=1, bit6=0 → 0x80
+ buf.AddByte(0x80);
+ // boolean values packed (all positions): [false, true] → bit7=0, bit6=1 → 0x40
+ buf.AddByte(0x40);
+
+ // 8. DOUBLE column (Int64Array): row1 is null
+ buf.AddByte(1); // mayHaveNull = true
+ // null indicators packed: [false, true] → 0x40
+ buf.AddByte(0x40);
+ // only non-null value (row 0)
+ buf.AddDouble(3.14);
+
+ return buf.GetBuffer();
+ }
+
+ private RpcDataSet CreateTestDataSet(List<byte[]> queryResult = null)
+ {
+ var columnNames = new List<string> { "s_int32", "s_boolean", "s_double" };
+ var columnTypes = new List<string> { "INT32", "BOOLEAN", "DOUBLE" };
+ var columnNameIndex = new Dictionary<string, int>
+ {
+ { "s_int32", 0 },
+ { "s_boolean", 1 },
+ { "s_double", 2 }
+ };
+ var columnIndex2TsBlockColumnIndexList = new List<int> { 0, 1, 2 };
+
+ queryResult ??= new List<byte[]> { BuildTestTsBlockBytes() };
+
+ return new RpcDataSet(
+ sql: "SELECT * FROM root.test",
+ columnNameList: columnNames,
+ columnTypeList: columnTypes,
+ columnNameIndex: columnNameIndex,
+ ignoreTimestamp: false,
+ moreData: false,
+ queryId: 1,
+ statementId: 1,
+ client: null,
+ sessionId: 1,
+ queryResult: queryResult,
+ fetchSize: 1024,
+ timeout: 10000,
+ zoneId: "UTC",
+ columnIndex2TsBlockColumnIndexList: columnIndex2TsBlockColumnIndexList
+ );
+ }
+
+ [Test]
+ public void CurrentBatchRowCount_ReturnsCorrectSize_BeforeFirstNext()
+ {
+ var dataSet = CreateTestDataSet();
+
+ Assert.That(dataSet._tsBlockSize, Is.EqualTo(2),
+ "CurrentBatchRowCount should return the TsBlock row count immediately after construction.");
+ }
+
+ [Test]
+ public void CurrentBatchRowCount_ReturnsZero_WhenNoData()
+ {
+ var dataSet = CreateTestDataSet(queryResult: new List<byte[]>());
+
+ Assert.That(dataSet._tsBlockSize, Is.EqualTo(0),
+ "CurrentBatchRowCount should return 0 when no query results are provided.");
+ }
+
+ [Test]
+ public void GetRow_ExcludesNullValuedMeasurements_ForValueTypes()
+ {
+ var dataSet = CreateTestDataSet();
+
+ // Advance to row 0: int32=42, boolean=null, double=3.14
+ dataSet.Next();
+ var row0 = dataSet.GetRow();
+
+ Assert.That(row0.Timestamps, Is.EqualTo(1000L));
+ Assert.That(row0.Measurements, Does.Contain("s_int32"));
+ Assert.That(row0.Measurements, Does.Not.Contain("s_boolean"),
+ "Null BOOLEAN measurement should be excluded from row.");
+ Assert.That(row0.Measurements, Does.Contain("s_double"));
+ Assert.That(row0.Values.Count, Is.EqualTo(2), "Row 0 should have 2 non-null values.");
+ }
+
+ [Test]
+ public void GetRow_ExcludesNullValuedMeasurements_ForInt32AndDouble()
+ {
+ var dataSet = CreateTestDataSet();
+
+ // Advance to row 0 then row 1
+ dataSet.Next();
+ dataSet.Next();
+ var row1 = dataSet.GetRow();
+
+ Assert.That(row1.Timestamps, Is.EqualTo(2000L));
+ Assert.That(row1.Measurements, Does.Not.Contain("s_int32"),
+ "Null INT32 measurement should be excluded from row.");
+ Assert.That(row1.Measurements, Does.Contain("s_boolean"));
+ Assert.That(row1.Measurements, Does.Not.Contain("s_double"),
+ "Null DOUBLE measurement should be excluded from row.");
+ Assert.That(row1.Values.Count, Is.EqualTo(1), "Row 1 should have 1 non-null value.");
+ Assert.That(row1.Values[0], Is.EqualTo(true));
+ }
+
+ [Test]
+ public void GetRow_DataTypesMatchMeasurements()
+ {
+ var dataSet = CreateTestDataSet();
+
+ dataSet.Next();
+ var row0 = dataSet.GetRow();
+
+ Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Measurements.Count),
+ "DataTypes count should match Measurements count.");
+ Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Values.Count),
+ "DataTypes count should match Values count.");
+ }
+ }
+}