fix: fetch data fail due to different endpoint in SessionDataSet (#28) (#30)
* fix: fetch data fail due to different endpoint in SessionDataSet; update Docker configuration and .NET version; enhance command-line options
* fix typo
* remove some comments
diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index 80c0491..3f0367f 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -19,7 +19,7 @@
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
- dotnet-version: '6.0.x'
+ dotnet-version: '9.0.x'
- name: Restore dependencies
run: dotnet restore "src/Apache.IoTDB/Apache.IoTDB.csproj"
- name: Check License Header
diff --git a/.github/workflows/e2e-multinode.yml b/.github/workflows/e2e-multinode.yml
new file mode 100644
index 0000000..60d0a03
--- /dev/null
+++ b/.github/workflows/e2e-multinode.yml
@@ -0,0 +1,25 @@
+name: E2E Tests in MultiNode IoTDB
+
+on:
+ push:
+ branches: [ main, dev/* ]
+ pull_request:
+ branches: [ main ]
+
+jobs:
+
+ build:
+ name: e2e test in MultiNode IoTDB
+ runs-on: ubuntu-latest
+ steps:
+
+ - name: Check out code into the CSharp module directory
+ uses: actions/checkout@v4
+
+ - name: Set Docker & Run Test
+ run: |
+ docker compose -f docker-compose-2c2d.yml up --build --abort-on-container-exit --remove-orphans
+
+ - name: Clean IoTDB & Shut Down Docker
+ run: |
+ docker compose -f docker-compose-2c2d.yml down
diff --git a/docker-compose-2c2d.yml b/docker-compose-2c2d.yml
new file mode 100644
index 0000000..318e8e8
--- /dev/null
+++ b/docker-compose-2c2d.yml
@@ -0,0 +1,133 @@
+version: "3"
+services:
+
+ # ConfigNode 1
+ confignode-1:
+ image: apache/iotdb:2.0.1-beta-standalone
+ command: ["bash", "-c", "entrypoint.sh confignode"]
+ restart: always
+ healthcheck:
+ test: ["CMD", "ls", "/iotdb/data"]
+ interval: 3s
+ timeout: 5s
+ retries: 30
+ start_period: 30s
+ environment:
+ - cn_internal_address=127.0.0.1
+ - cn_internal_port=10710
+ - cn_consensus_port=10720
+ - cn_seed_config_node=127.0.0.1:10710
+ - schema_replication_factor=2
+ - data_replication_factor=2
+ privileged: true
+ volumes:
+ - ./iotdb/confignode-1/data:/iotdb/data
+ - ./iotdb/confignode-1/logs:/iotdb/logs
+ network_mode: host
+
+ # ConfigNode 2
+ confignode-2:
+ image: apache/iotdb:2.0.1-beta-standalone
+ command: ["bash", "-c", "entrypoint.sh confignode"]
+ restart: always
+ healthcheck:
+ test: ["CMD", "ls", "/iotdb/data"]
+ interval: 3s
+ timeout: 5s
+ retries: 30
+ start_period: 30s
+ environment:
+ - cn_internal_address=127.0.0.1
+ - cn_internal_port=10711
+ - cn_consensus_port=10721
+ - cn_seed_config_node=127.0.0.1:10710
+ - schema_replication_factor=2
+ - data_replication_factor=2
+ privileged: true
+ volumes:
+ - ./iotdb/confignode-2/data:/iotdb/data
+ - ./iotdb/confignode-2/logs:/iotdb/logs
+ network_mode: host
+
+ # DataNode 1
+ datanode-1:
+ image: apache/iotdb:2.0.1-beta-standalone
+ command: ["bash", "-c", "entrypoint.sh datanode"]
+ restart: always
+ healthcheck:
+ test: ["CMD", "ls", "/iotdb/data/datanode/system"]
+ interval: 10s
+ timeout: 60s
+ retries: 30
+ start_period: 30s
+ depends_on:
+ confignode-1:
+ condition: service_healthy
+ environment:
+ - dn_rpc_address=127.0.0.1
+ - dn_internal_address=127.0.0.1
+ - dn_seed_config_node=127.0.0.1:10710
+ - dn_rpc_port=6667
+ - dn_internal_port=10730
+ - dn_mpp_data_exchange_port=10740
+ - dn_schema_region_consensus_port=10750
+ - dn_data_region_consensus_port=10760
+ - schema_replication_factor=2
+ - data_replication_factor=2
+ privileged: true
+ volumes:
+ - ./iotdb/datanode-1/data:/iotdb/data
+ - ./iotdb/datanode-1/logs:/iotdb/logs
+ network_mode: host
+
+ # DataNode 2
+ datanode-2:
+ image: apache/iotdb:2.0.1-beta-standalone
+ command: ["bash", "-c", "entrypoint.sh datanode"]
+ restart: always
+ healthcheck:
+ test: ["CMD", "ls", "/iotdb/data/datanode/system"]
+ interval: 10s
+ timeout: 60s
+ retries: 30
+ start_period: 30s
+ depends_on:
+ confignode-1:
+ condition: service_healthy
+ confignode-2:
+ condition: service_healthy
+ environment:
+ - dn_rpc_address=127.0.0.1
+ - dn_internal_address=127.0.0.1
+ - dn_seed_config_node=127.0.0.1:10710
+ - dn_rpc_port=6668
+ - dn_internal_port=10731
+ - dn_mpp_data_exchange_port=10741
+ - dn_schema_region_consensus_port=10751
+ - dn_data_region_consensus_port=10761
+ - schema_replication_factor=2
+ - data_replication_factor=2
+ privileged: true
+ volumes:
+ - ./iotdb/datanode-2/data:/iotdb/data
+ - ./iotdb/datanode-2/logs:/iotdb/logs
+ network_mode: host
+
+ # C# Client
+ apache.iotdb.samples:
+ image: ${DOCKER_REGISTRY-}apacheiotdbsamples
+ depends_on:
+ confignode-1:
+ condition: service_healthy
+ confignode-2:
+ condition: service_healthy
+ datanode-1:
+ condition: service_healthy
+ datanode-2:
+ condition: service_healthy
+ build:
+ context: .
+ dockerfile: samples/Apache.IoTDB.Samples/Dockerfile
+ command: ["--multi", "localhost:6667", "localhost:6668"]
+ # command: ["sleep", "infinity"]
+ network_mode: host
\ No newline at end of file
diff --git a/docker-compose.dcproj b/docker-compose.dcproj
deleted file mode 100644
index 322d139..0000000
--- a/docker-compose.dcproj
+++ /dev/null
@@ -1,15 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="15.0" Sdk="Microsoft.Docker.Sdk">
- <PropertyGroup Label="Globals">
- <ProjectVersion>2.1</ProjectVersion>
- <DockerTargetOS>Linux</DockerTargetOS>
- <ProjectGuid>4d457769-80cb-401f-9155-c3125c04facd</ProjectGuid>
- </PropertyGroup>
- <ItemGroup>
- <None Include="docker-compose.override.yml">
- <DependentUpon>docker-compose.yml</DependentUpon>
- </None>
- <None Include="docker-compose.yml" />
- <None Include=".dockerignore" />
- </ItemGroup>
-</Project>
\ No newline at end of file
diff --git a/docker-compose.override.yml b/docker-compose.override.yml
deleted file mode 100644
index 8e89b07..0000000
--- a/docker-compose.override.yml
+++ /dev/null
@@ -1 +0,0 @@
-version: '3.4'
diff --git a/docker-compose.yml b/docker-compose.yml
index df2b2c8..78ddd7f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -11,6 +11,7 @@
build:
context: .
dockerfile: samples/Apache.IoTDB.Samples/Dockerfile
+ command: ["--single", "iotdb"]
networks:
iotdb-network:
ipv4_address: 172.18.0.2
diff --git a/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj b/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj
index 7dd6f32..bae70a7 100644
--- a/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj
+++ b/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
- <TargetFramework>net6.0</TargetFramework>
+ <TargetFramework>net9.0</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<DockerfileContext>..\..</DockerfileContext>
</PropertyGroup>
@@ -19,6 +19,7 @@
<PackageReference Include="ConsoleTableExt" Version="3.2.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.15.1" />
<PackageReference Include="NLog.Extensions.Logging" Version="5.0.1" />
+ <PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>
<ItemGroup>
diff --git a/samples/Apache.IoTDB.Samples/Dockerfile b/samples/Apache.IoTDB.Samples/Dockerfile
index 77c600b..26ee0a9 100644
--- a/samples/Apache.IoTDB.Samples/Dockerfile
+++ b/samples/Apache.IoTDB.Samples/Dockerfile
@@ -17,10 +17,10 @@
#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging.
-FROM mcr.microsoft.com/dotnet/runtime:6.0 AS base
+FROM mcr.microsoft.com/dotnet/runtime:9.0 AS base
WORKDIR /app
-FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
+FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build
WORKDIR /src
COPY ["samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj", "samples/Apache.IoTDB.Samples/"]
COPY ["src/Apache.IoTDB/Apache.IoTDB.csproj", "src/Apache.IoTDB/"]
diff --git a/samples/Apache.IoTDB.Samples/Program.cs b/samples/Apache.IoTDB.Samples/Program.cs
index 6b054e7..aee4081 100644
--- a/samples/Apache.IoTDB.Samples/Program.cs
+++ b/samples/Apache.IoTDB.Samples/Program.cs
@@ -19,8 +19,10 @@
using Microsoft.Extensions.Logging;
using NLog.Extensions.Logging;
-using System;
using System.Threading.Tasks;
+using System.CommandLine;
+using System.Collections.Generic;
+using System;
namespace Apache.IoTDB.Samples
{
@@ -28,10 +30,51 @@
{
public static async Task Main(string[] args)
{
+ var singleOption = new Option<string>(
+ "--single",
+ () => "localhost",
+ description: "Use single endpoint (e.g. --single localhost)");
+
+ var multiOption = new Option<List<string>>(
+ "--multi",
+ description: "Use multiple endpoints (e.g. --multi localhost:6667 localhost:6668)")
+ {
+ AllowMultipleArgumentsPerToken = true
+ };
+
+ var rootCommand = new RootCommand
+ {
+ singleOption,
+ multiOption
+ };
+
+ rootCommand.SetHandler(async (string single, List<string> multi) =>
+ {
var utilsTest = new UtilsTest();
utilsTest.TestParseEndPoint();
- var sessionPoolTest = new SessionPoolTest("iotdb");
+
+ SessionPoolTest sessionPoolTest;
+
+ if (!string.IsNullOrEmpty(single) && (multi == null || multi.Count == 0))
+ {
+ sessionPoolTest = new SessionPoolTest(single);
+ }
+ else if (multi != null && multi.Count != 0)
+ {
+ sessionPoolTest = new SessionPoolTest(multi);
+ }
+ else
+ {
+ Console.WriteLine("Please specify either --single or --multi endpoints.");
+ return;
+ }
+
await sessionPoolTest.Test();
+
+
+ }, singleOption, multiOption);
+
+ await rootCommand.InvokeAsync(args);
}
public static void OpenDebugMode(this SessionPool session)
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs
index 754c632..03d211e 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs
@@ -52,97 +52,104 @@
public List<string> test_measurements = new List<string>(
measurement_count.ConvertAll(x => test_measurement + x.ToString()).ToArray()
);
-
-
public SessionPoolTest(string _host = "localhost")
{
host = _host;
node_urls.Add(host + ":" + port);
}
+ public SessionPoolTest(List<string> _nodeUrls)
+ {
+ nodeUrls = _nodeUrls;
+ }
public async Task Test()
{
- await TestOpenWithNodeUrls();
+ if(nodeUrls.Count == 1){
+ await TestOpenWithNodeUrls();
- await TestOpenWith2NodeUrls();
+ await TestOpenWith2NodeUrls();
- await TestOpenWithNodeUrlsAndInsertOneRecord();
+ await TestOpenWithNodeUrlsAndInsertOneRecord();
- await TestInsertOneRecord();
+ await TestInsertOneRecord();
- await TestInsertAlignedRecord();
+ await TestInsertAlignedRecord();
- await TestInsertAlignedRecords();
+ await TestInsertAlignedRecords();
- await TestInsertAlignedStringRecords();
+ await TestInsertAlignedStringRecords();
- await TestInsertAlignedStringRecordsOfOneDevice();
+ await TestInsertAlignedStringRecordsOfOneDevice();
- await TestInsertStringRecord();
+ await TestInsertStringRecord();
- await TestInsertAlignedStringRecord();
+ await TestInsertAlignedStringRecord();
- await TestInsertStringRecords();
+ await TestInsertStringRecords();
- await TestInsertStringRecordsOfOneDevice();
+ await TestInsertStringRecordsOfOneDevice();
- await TestInsertAlignedRecordsOfOneDevice();
+ await TestInsertAlignedRecordsOfOneDevice();
- await TestInsertAlignedTablet();
+ await TestInsertAlignedTablet();
- await TestInsertAlignedTablets();
+ await TestInsertAlignedTablets();
- await TestInsertRecord();
+ await TestInsertRecord();
- await TestCreateMultiTimeSeries();
+ await TestCreateMultiTimeSeries();
- await TestInsertStrRecord();
+ await TestInsertStrRecord();
- await TestInsertRecords();
+ await TestInsertRecords();
- await TestInsertRecordsWithAllType();
+ await TestInsertRecordsWithAllType();
- await TestInsertRecordsOfOneDevice();
+ await TestInsertRecordsOfOneDevice();
- await TestInsertTablet();
+ await TestInsertTablet();
- await TestInsertTabletWithAllType();
+ await TestInsertTabletWithAllType();
- await TestInsertTabletWithNullValue();
+ await TestInsertTabletWithNullValue();
- await TestInsertTablets();
+ await TestInsertTablets();
- await TestSetAndUnsetSchemaTemplate();
+ await TestSetAndUnsetSchemaTemplate();
- await TestCreateAlignedTimeseries();
+ await TestCreateAlignedTimeseries();
- await TestCreateAndDropSchemaTemplate();
+ await TestCreateAndDropSchemaTemplate();
- await TestGetTimeZone();
+ await TestGetTimeZone();
- await TestCreateAndDeleteDatabase();
+ await TestCreateAndDeleteDatabase();
- await TestCreateTimeSeries();
+ await TestCreateTimeSeries();
- await TestDeleteTimeSeries();
+ await TestDeleteTimeSeries();
- await TestDeleteDatabase();
+ await TestDeleteDatabase();
- await TestCheckTimeSeriesExists();
+ await TestCheckTimeSeriesExists();
- await TestSetTimeZone();
+ await TestSetTimeZone();
- await TestDeleteData();
+ await TestDeleteData();
- await TestNonSql();
+ await TestNonSql();
- await TestRawDataQuery();
+ await TestRawDataQuery();
- await TestLastDataQuery();
+ await TestLastDataQuery();
- await TestSqlQuery();
+ await TestSqlQuery();
- await TestNonSqlBy_ADO();
+ await TestNonSqlBy_ADO();
+ }
+ else {
+ await TestMultiNodeDataFetch();
+ }
}
public async Task TestOpenWithNodeUrls()
{
@@ -454,7 +461,7 @@
await res.Close();
Console.WriteLine("SHOW DEVICES sql passed!");
- res = await session_pool.ExecuteQueryStatementAsync("COUNT TIMESERIES root");
+ res = await session_pool.ExecuteQueryStatementAsync($"COUNT TIMESERIES {testDatabaseName}");
res.ShowTableNames();
while (res.HasNext()) Console.WriteLine(res.Next());
@@ -572,5 +579,56 @@
await session_pool.Close();
Console.WriteLine("LastDataQuery Passed");
}
+
+ public async Task TestMultiNodeDataFetch(){
+ System.Diagnostics.Debug.Assert(nodeUrls.Count > 1, "nodeUrls.Count should be greater than 1 in MultiNode Test");
+ var session_pool = new SessionPool.Builder()
+ .SetUsername(username)
+ .SetPassword(password)
+ .SetNodeUrl(nodeUrls)
+ .SetPoolSize(4)
+ .Build();
+ await session_pool.Open(false);
+ if (debug) session_pool.OpenDebugMode();
+ var status = await session_pool.DeleteDatabaseAsync(testDatabaseName);
+ var device_id = string.Format("{0}.{1}", testDatabaseName, testDevice);
+ var measurements = new List<string> { testMeasurements[0], testMeasurements[1] };
+ var data_type_lst = new List<TSDataType> { TSDataType.BOOLEAN, TSDataType.FLOAT };
+ var encoding_lst = new List<TSEncoding> { TSEncoding.PLAIN, TSEncoding.PLAIN };
+ var compressor_lst = new List<Compressor> { Compressor.SNAPPY, Compressor.SNAPPY };
+ var ts_path_lst = new List<string>() {
+ string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[0]),
+ string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[1])
+ };
+ status = await session_pool.CreateMultiTimeSeriesAsync(ts_path_lst, data_type_lst, encoding_lst, compressor_lst);
+
+ var records = new List<RowRecord>();
+ var values = new List<object>() { true, 20.0f };
+ var device_id_lst = new List<string>() { };
+ for (int i = 1; i <= fetchSize * processedSize * 4 + 783; i++)
+ {
+ var record = new RowRecord(i, values, measurements);
+ records.Add(record);
+ device_id_lst.Add(device_id);
+ }
+
+ // insert data
+ status = await session_pool.InsertRecordsAsync(device_id_lst, records);
+ System.Diagnostics.Debug.Assert(status == 0);
+ // fetch data
+ var paths = new List<string>() { string.Format("{0}.{1}", device_id, testMeasurements[0]), string.Format("{0}.{1}", device_id, testMeasurements[1]) };
+ var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
+ res.ShowTableNames();
+ var count = 0;
+ while (res.HasNext())
+ {
+ var record = res.Next();
+ count++;
+ }
+ Console.WriteLine(count + " " + (fetchSize * processedSize * 4 + 783));
+ System.Diagnostics.Debug.Assert(count == fetchSize * processedSize * 4 + 783);
+ await res.Close();
+ Console.WriteLine("MultiNodeDataFetch Passed");
+ }
}
}
\ No newline at end of file
diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
index 32191a2..7ba49ea 100644
--- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
+++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
@@ -40,6 +40,7 @@
private List<ByteBuffer> _valueBufferLst, _bitmapBufferLst;
private ByteBuffer _timeBuffer;
private readonly ConcurrentClientQueue _clientQueue;
+ private Client _client;
private int _rowIndex;
private bool _hasCatchedResult;
private RowRecord _cachedRowRecord;
@@ -52,9 +53,10 @@
private int DefaultTimeout => 10000;
public int FetchSize { get; set; }
public int RowCount { get; set; }
- public SessionDataSet(string sql, TSExecuteStatementResp resp, ConcurrentClientQueue clientQueue, long statementId)
+ public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, ConcurrentClientQueue clientQueue, long statementId)
{
_clientQueue = clientQueue;
+ _client = client;
_sql = sql;
_queryDataset = resp.QueryDataSet;
_queryId = resp.QueryId;
@@ -266,14 +268,13 @@
private bool FetchResults()
{
_rowIndex = 0;
- var myClient = _clientQueue.Take();
- var req = new TSFetchResultsReq(myClient.SessionId, _sql, FetchSize, _queryId, true)
+ var req = new TSFetchResultsReq(_client.SessionId, _sql, FetchSize, _queryId, true)
{
Timeout = DefaultTimeout
};
try
{
- var task = myClient.ServiceClient.fetchResultsAsync(req);
+ var task = _client.ServiceClient.fetchResultsAsync(req);
var resp = task.ConfigureAwait(false).GetAwaiter().GetResult();
@@ -302,18 +303,13 @@
{
throw new TException("Cannot fetch result from server, because of network connection", e);
}
- finally
- {
- _clientQueue.Add(myClient);
- }
}
public async Task Close()
{
if (!_isClosed)
{
- var myClient = _clientQueue.Take();
- var req = new TSCloseOperationReq(myClient.SessionId)
+ var req = new TSCloseOperationReq(_client.SessionId)
{
QueryId = _queryId,
StatementId = _statementId
@@ -321,7 +317,7 @@
try
{
- var status = await myClient.ServiceClient.closeOperationAsync(req);
+ var status = await _client.ServiceClient.closeOperationAsync(req);
}
catch (TException e)
{
@@ -329,7 +325,8 @@
}
finally
{
- _clientQueue.Add(myClient);
+ _clientQueue.Add(_client);
+ _client = null;
}
}
}
diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index 6dfda4c..f5337e4 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -129,7 +129,7 @@
_enableRpcCompression = enableRpcCompression;
_timeout = timeout;
}
- public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string errMsg, bool retryOnFailure = true)
+ public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string errMsg, bool retryOnFailure = true, bool putClientBack = true)
{
Client client = _clients.Take();
try
@@ -173,7 +173,10 @@
}
finally
{
- _clients.Add(client);
+ if(putClientBack)
+ {
+ _clients.Add(client);
+ }
}
}
/// <summary>
@@ -350,8 +353,7 @@
}
}
- public async Task SetTimeZone(string zoneId)
- {
+ public async Task SetTimeZone(string zoneId){
_zoneId = zoneId;
foreach (var client in _clients.ClientQueue.AsEnumerable())
@@ -672,12 +674,13 @@
public async Task<bool> CheckTimeSeriesExistsAsync(string tsPath)
{
- // TBD by dalong
try
{
var sql = "SHOW TIMESERIES " + tsPath;
var sessionDataset = await ExecuteQueryStatementAsync(sql);
- return sessionDataset.HasNext();
+ bool timeSeriesExists = sessionDataset.HasNext();
+ await sessionDataset.Close(); // be sure to close the sessionDataset to put the client back to the pool
+ return timeSeriesExists;
}
catch (TException e)
{
@@ -1270,12 +1273,13 @@
throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message));
}
- return new SessionDataSet(sql, resp, _clients, client.StatementId)
+ return new SessionDataSet(sql, resp, client, _clients, client.StatementId)
{
FetchSize = _fetchSize,
};
},
- errMsg: "Error occurs when executing query statement"
+ errMsg: "Error occurs when executing query statement",
+ putClientBack: false
);
}
public async Task<SessionDataSet> ExecuteStatementAsync(string sql, long timeout)
@@ -1297,12 +1301,13 @@
throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message));
}
- return new SessionDataSet(sql, resp, _clients, client.StatementId)
+ return new SessionDataSet(sql, resp, client, _clients, client.StatementId)
{
FetchSize = _fetchSize,
};
},
- errMsg: "Error occurs when executing query statement"
+ errMsg: "Error occurs when executing query statement",
+ putClientBack: false
);
}
@@ -1346,12 +1351,13 @@
throw new Exception(string.Format("execute raw data query failed, message: {0}", status.Message));
}
- return new SessionDataSet("", resp, _clients, client.StatementId)
+ return new SessionDataSet("", resp, client, _clients, client.StatementId)
{
FetchSize = _fetchSize,
};
},
- errMsg: "Error occurs when executing raw data query"
+ errMsg: "Error occurs when executing raw data query",
+ putClientBack: false
);
}
public async Task<SessionDataSet> ExecuteLastDataQueryAsync(List<string> paths, long lastTime)
@@ -1373,12 +1379,13 @@
throw new Exception(string.Format("execute last data query failed, message: {0}", status.Message));
}
- return new SessionDataSet("", resp, _clients, client.StatementId)
+ return new SessionDataSet("", resp, client, _clients, client.StatementId)
{
FetchSize = _fetchSize,
};
},
- errMsg: "Error occurs when executing last data query"
+ errMsg: "Error occurs when executing last data query",
+ putClientBack: false
);
}
[Obsolete("This method is obsolete. Use SQL instead.", false)]