style: add pre commit hook for style check and format (#29)
* fix: fetch data fail due to different endpoint in SessionDataSet; update Docker configuration and .NET version; enhance command-line options
* fix typo
* style: add pre commit hook for every pr or commit and format code style
* add license check ignore for pre commit check
* remove: delete obsolete iotdb symlink
* fix: add TMPDIR for pre-commit to avoid resolve permission issue
* fix: clean up dotnet temporary folder in pre-commit workflow
diff --git a/.github/workflows/pre-commit-format.yml b/.github/workflows/pre-commit-format.yml
new file mode 100644
index 0000000..786a5e5
--- /dev/null
+++ b/.github/workflows/pre-commit-format.yml
@@ -0,0 +1,48 @@
+name: Formatting
+
+on:
+ workflow_dispatch:
+ pull_request:
+ branches:
+ - '**'
+ merge_group:
+ branches: [ main ]
+ schedule:
+ - cron: "0 0 * * *"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ formatting-checks:
+ runs-on: ubuntu-20.04
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Setup dotnet
+ uses: actions/setup-dotnet@v4
+ with:
+ dotnet-version: '9.0.x'
+
+ - name: Setup Python environment (for pre-commit)
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.10'
+
+ - name: Clean dotnet temporary folder
+ run: |
+ sudo rm -rf /tmp/.dotnet
+
+ - name: Install pre-commit and dependencies
+ run: |
+ pip install pre-commit
+ pre-commit install-hooks
+
+ - name: Run pre-commit checks
+ env:
+ TMPDIR: ${{ runner.temp }}
+ DOTNET_CLI_HOME: ${{ runner.temp }}
+ run: |
+ pre-commit run --all-files
diff --git a/.licenserc.yaml b/.licenserc.yaml
index afe9c0c..4bda2c2 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -32,5 +32,6 @@
- 'Apache.IoTDB.sln'
- '.gitignore'
- '.dockerignore'
+ - '.pre-commit-config.yaml'
comment: on-failure
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..630051f
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,33 @@
+
+
+repos:
+ - repo: local
+ hooks:
+ - id: dotnet-format
+ name: dotnet format
+ entry: dotnet format --include
+ language: system
+ pass_filenames: true
+ files: '\.(cs|vb|fs|sln|csproj|fsproj)$'
+
+ - repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v4.6.0
+ hooks:
+ - id: trailing-whitespace
+ - id: end-of-file-fixer
+ - id: check-added-large-files
+ - id: check-yaml
+ - id: check-json
+ - id: check-xml
+
+ - repo: https://github.com/codespell-project/codespell
+ rev: v2.2.6
+ hooks:
+ - id: codespell
+ args: ["--ignore-words-list=fo,teh,ba"]
+
+exclude: |
+ (?x)^(
+ src/Apache\.IoTDB/Rpc/|
+ .*\.md$
+ )
diff --git a/Apache-IoTDB-Client-CSharp-UserCase/Program.cs b/Apache-IoTDB-Client-CSharp-UserCase/Program.cs
index 345e9a7..281104d 100644
--- a/Apache-IoTDB-Client-CSharp-UserCase/Program.cs
+++ b/Apache-IoTDB-Client-CSharp-UserCase/Program.cs
@@ -118,4 +118,3 @@
}
}
-
diff --git a/Apache.IoTDB.sln b/Apache.IoTDB.sln
index 7ddbed2..b6b5aaf 100644
--- a/Apache.IoTDB.sln
+++ b/Apache.IoTDB.sln
@@ -26,8 +26,6 @@
docs\time_profile_zh.pdf = docs\time_profile_zh.pdf
EndProjectSection
EndProject
-Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose", "docker-compose.dcproj", "{4D457769-80CB-401F-9155-C3125C04FACD}"
-EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.IoTDB.Data", "src\Apache.IoTDB.Data\Apache.IoTDB.Data.csproj", "{4308656F-D174-49A6-ACE4-15894B017D55}"
EndProject
Global
diff --git a/docker-compose-2c2d.yml b/docker-compose-2c2d.yml
index 318e8e8..4772779 100644
--- a/docker-compose-2c2d.yml
+++ b/docker-compose-2c2d.yml
@@ -130,4 +130,4 @@
dockerfile: samples/Apache.IoTDB.Samples/Dockerfile
command: ["--multi", "localhost:6667", "localhost:6668"]
# command: ["sleep", "infinity"]
- network_mode: host
\ No newline at end of file
+ network_mode: host
diff --git a/docker-compose.yml b/docker-compose.yml
index 3fa34a0..2c379f9 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -15,7 +15,7 @@
networks:
iotdb-network:
ipv4_address: 172.18.0.2
-
+
iotdb:
image: apache/iotdb:2.0.1-beta-datanode
restart: always
@@ -38,7 +38,7 @@
- dn_rpc_address=iotdb
- dn_internal_address=iotdb
- dn_seed_config_node=iotdb-confignode-1:22277
-
+
iotdb-confignode-1:
image: apache/iotdb:2.0.1-beta-confignode
restart: always
@@ -58,6 +58,6 @@
- cn_seed_config_node=iotdb-confignode-1:22277
-networks:
+networks:
iotdb-network:
external: true
diff --git a/launchSettings.json b/launchSettings.json
index cd5bd5a..6f2540e 100644
--- a/launchSettings.json
+++ b/launchSettings.json
@@ -11,4 +11,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/Dockerfile b/samples/Apache.IoTDB.Samples/Dockerfile
index 26ee0a9..52b2dcf 100644
--- a/samples/Apache.IoTDB.Samples/Dockerfile
+++ b/samples/Apache.IoTDB.Samples/Dockerfile
@@ -36,4 +36,4 @@
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
-ENTRYPOINT ["dotnet", "Apache.IoTDB.Samples.dll"]
\ No newline at end of file
+ENTRYPOINT ["dotnet", "Apache.IoTDB.Samples.dll"]
diff --git a/samples/Apache.IoTDB.Samples/Program.cs b/samples/Apache.IoTDB.Samples/Program.cs
index bc557bb..b825063 100644
--- a/samples/Apache.IoTDB.Samples/Program.cs
+++ b/samples/Apache.IoTDB.Samples/Program.cs
@@ -35,48 +35,48 @@
() => "localhost",
description: "Use single endpoint (e.g. --single localhost)");
- var multiOption = new Option<List<string>>(
- "--multi",
- description: "Use multiple endpoints (e.g. --multi localhost:6667 localhost:6668)")
- {
- AllowMultipleArgumentsPerToken = true
- };
+ var multiOption = new Option<List<string>>(
+ "--multi",
+ description: "Use multiple endpoints (e.g. --multi localhost:6667 localhost:6668)")
+ {
+ AllowMultipleArgumentsPerToken = true
+ };
- var rootCommand = new RootCommand
+ var rootCommand = new RootCommand
{
singleOption,
multiOption
};
- rootCommand.SetHandler(async (string single, List<string> multi) =>
- {
- var utilsTest = new UtilsTest();
- utilsTest.TestParseEndPoint();
-
- SessionPoolTest sessionPoolTest;
-
- if (!string.IsNullOrEmpty(single) && (multi == null || multi.Count == 0))
+ rootCommand.SetHandler(async (string single, List<string> multi) =>
{
- sessionPoolTest = new SessionPoolTest(single);
- }
- else if (multi != null && multi.Count != 0)
- {
- sessionPoolTest = new SessionPoolTest(multi);
- }
- else
- {
- Console.WriteLine("Please specify either --single or --multi endpoints.");
- return;
- }
+ var utilsTest = new UtilsTest();
+ utilsTest.TestParseEndPoint();
- await sessionPoolTest.Test();
+ SessionPoolTest sessionPoolTest;
- var tableSessionPoolTest = new TableSessionPoolTest(sessionPoolTest);
- await tableSessionPoolTest.Test();
+ if (!string.IsNullOrEmpty(single) && (multi == null || multi.Count == 0))
+ {
+ sessionPoolTest = new SessionPoolTest(single);
+ }
+ else if (multi != null && multi.Count != 0)
+ {
+ sessionPoolTest = new SessionPoolTest(multi);
+ }
+ else
+ {
+ Console.WriteLine("Please specify either --single or --multi endpoints.");
+ return;
+ }
- }, singleOption, multiOption);
+ await sessionPoolTest.Test();
- await rootCommand.InvokeAsync(args);
+ var tableSessionPoolTest = new TableSessionPoolTest(sessionPoolTest);
+ await tableSessionPoolTest.Test();
+
+ }, singleOption, multiOption);
+
+ await rootCommand.InvokeAsync(args);
}
public static void OpenDebugMode(this SessionPool session)
diff --git a/samples/Apache.IoTDB.Samples/Properties/launchSettings.json b/samples/Apache.IoTDB.Samples/Properties/launchSettings.json
index 8511cd9..8097449 100644
--- a/samples/Apache.IoTDB.Samples/Properties/launchSettings.json
+++ b/samples/Apache.IoTDB.Samples/Properties/launchSettings.json
@@ -7,4 +7,4 @@
"commandName": "Docker"
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs
index e43ef35..cb4d8f6 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs
@@ -496,4 +496,4 @@
Console.WriteLine("TestInsertAlignedStringRecordsOfOneDevice Passed!");
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs
index 1b63271..77050ad 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs
@@ -192,4 +192,4 @@
Console.WriteLine("TestInsertAlignedTablets Passed!");
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs
index 3a7f7c3..4cf68f4 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs
@@ -305,12 +305,12 @@
row = res.Next();
break;
}
-
+
Console.WriteLine($"{testDatabaseName}.{testDevice}.{row.Measurements[0]} {testMeasurements[3]}");
System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[3]}" == row.Measurements[0]);
System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[1]}" == row.Measurements[1]);
System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[2]}" == row.Measurements[2]);
-
+
status = await session_pool.DeleteDatabaseAsync(testDatabaseName);
System.Diagnostics.Debug.Assert(status == 0);
await session_pool.Close();
@@ -641,7 +641,7 @@
var rowRecord2 = new RowRecord(2, values2, measurements, dataTypes);
var rowRecord3 = new RowRecord(3, values3, measurements, dataTypes);
- var device_id = new List<string> { string.Format("{0}.{1}", testDatabaseName, testDevice),string.Format("{0}.{1}", testDatabaseName, testDevice),string.Format("{0}.{1}", testDatabaseName, testDevice) };
+ var device_id = new List<string> { string.Format("{0}.{1}", testDatabaseName, testDevice), string.Format("{0}.{1}", testDatabaseName, testDevice), string.Format("{0}.{1}", testDatabaseName, testDevice) };
var rowRecords = new List<RowRecord> { rowRecord1, rowRecord2, rowRecord3 };
status = await session_pool.InsertRecordsAsync(device_id, rowRecords);
@@ -664,4 +664,4 @@
Console.WriteLine("TestInsertRecordsWithAllType Passed!");
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs
index 299050e..bf2ae17 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs
@@ -271,7 +271,7 @@
TSDataType.BLOB,
TSDataType.STRING
};
- var timestamps = new List<long> {1, 2};
+ var timestamps = new List<long> { 1, 2 };
var tablet = new Tablet(device_id, measurements, datatypes, values, timestamps);
status = await session_pool.InsertTabletAsync(tablet);
System.Diagnostics.Debug.Assert(status == 0);
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs
index 1d1d84d..866a7ab 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs
@@ -101,4 +101,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs
index 8588066..9f2a61c 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs
@@ -351,4 +351,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs
index 32be49a..049407c 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs
@@ -194,4 +194,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs
index f24a0a5..a189461 100644
--- a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs
+++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs
@@ -64,7 +64,8 @@
}
public async Task Test()
{
- if(nodeUrls.Count == 1){
+ if (nodeUrls.Count == 1)
+ {
await TestOpenWithNodeUrls();
await TestOpenWith2NodeUrls();
@@ -147,7 +148,8 @@
await TestNonSqlBy_ADO();
}
- else {
+ else
+ {
await TestMultiNodeDataFetch();
}
}
@@ -580,7 +582,8 @@
Console.WriteLine("LastDataQuery Passed");
}
- public async Task TestMultiNodeDataFetch(){
+ public async Task TestMultiNodeDataFetch()
+ {
System.Diagnostics.Debug.Assert(nodeUrls.Count > 1, "nodeUrls.Count should be greater than 1 in MultiNode Test");
var session_pool = new SessionPool.Builder()
.SetUsername(username)
@@ -596,7 +599,7 @@
var data_type_lst = new List<TSDataType> { TSDataType.BOOLEAN, TSDataType.FLOAT };
var encoding_lst = new List<TSEncoding> { TSEncoding.PLAIN, TSEncoding.PLAIN };
var compressor_lst = new List<Compressor> { Compressor.SNAPPY, Compressor.SNAPPY };
- var ts_path_lst = new List<string>() {
+ var ts_path_lst = new List<string>() {
string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[0]),
string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[1])
};
@@ -631,4 +634,4 @@
Console.WriteLine("MultiNodeDataFetch Passed");
}
}
-}
\ No newline at end of file
+}
diff --git a/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs b/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs
index c0ef0ec..36d2b88 100644
--- a/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs
+++ b/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs
@@ -26,157 +26,157 @@
public class TableSessionPoolTest
{
- private readonly SessionPoolTest sessionPoolTest;
+ private readonly SessionPoolTest sessionPoolTest;
- public TableSessionPoolTest(SessionPoolTest sessionPoolTest)
- {
- this.sessionPoolTest = sessionPoolTest;
- }
+ public TableSessionPoolTest(SessionPoolTest sessionPoolTest)
+ {
+ this.sessionPoolTest = sessionPoolTest;
+ }
- public async Task Test()
- {
- await TestCleanup();
+ public async Task Test()
+ {
+ await TestCleanup();
- await TestSelectAndInsert();
- await TestUseDatabase();
- // await TestCleanup();
- }
+ await TestSelectAndInsert();
+ await TestUseDatabase();
+ // await TestCleanup();
+ }
- public async Task TestSelectAndInsert()
- {
- var tableSessionPool = new TableSessionPool.Builder()
- .SetNodeUrls(sessionPoolTest.nodeUrls)
- .SetUsername(sessionPoolTest.username)
- .SetPassword(sessionPoolTest.password)
- .SetFetchSize(1024)
- .Build();
+ public async Task TestSelectAndInsert()
+ {
+ var tableSessionPool = new TableSessionPool.Builder()
+ .SetNodeUrls(sessionPoolTest.nodeUrls)
+ .SetUsername(sessionPoolTest.username)
+ .SetPassword(sessionPoolTest.password)
+ .SetFetchSize(1024)
+ .Build();
- await tableSessionPool.Open(false);
+ await tableSessionPool.Open(false);
- if (sessionPoolTest.debug) tableSessionPool.OpenDebugMode();
+ if (sessionPoolTest.debug) tableSessionPool.OpenDebugMode();
- await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test1");
- await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test2");
+ await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test1");
+ await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test2");
- await tableSessionPool.ExecuteNonQueryStatementAsync("use test2");
+ await tableSessionPool.ExecuteNonQueryStatementAsync("use test2");
- // or use full qualified table name
- await tableSessionPool.ExecuteNonQueryStatementAsync(
- "create table test1.table1(region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD) with (TTL=3600000)");
+ // or use full qualified table name
+ await tableSessionPool.ExecuteNonQueryStatementAsync(
+ "create table test1.table1(region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD) with (TTL=3600000)");
- await tableSessionPool.ExecuteNonQueryStatementAsync(
- "create table table2(region_id STRING TAG, plant_id STRING TAG, color STRING ATTRIBUTE, temperature FLOAT FIELD, speed DOUBLE FIELD) with (TTL=6600000)");
+ await tableSessionPool.ExecuteNonQueryStatementAsync(
+ "create table table2(region_id STRING TAG, plant_id STRING TAG, color STRING ATTRIBUTE, temperature FLOAT FIELD, speed DOUBLE FIELD) with (TTL=6600000)");
- // show tables from current database
- var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
- res.ShowTableNames();
- while (res.HasNext()) Console.WriteLine(res.Next());
- await res.Close();
+ // show tables from current database
+ var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
+ res.ShowTableNames();
+ while (res.HasNext()) Console.WriteLine(res.Next());
+ await res.Close();
- // show tables by specifying another database
- // using SHOW tables FROM
- res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1");
- res.ShowTableNames();
- while (res.HasNext()) Console.WriteLine(res.Next());
- await res.Close();
+ // show tables by specifying another database
+ // using SHOW tables FROM
+ res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1");
+ res.ShowTableNames();
+ while (res.HasNext()) Console.WriteLine(res.Next());
+ await res.Close();
- var tableName = "testTable1";
- List<string> columnNames =
- new List<string> {
+ var tableName = "testTable1";
+ List<string> columnNames =
+ new List<string> {
"region_id",
"plant_id",
"device_id",
"model",
"temperature",
"humidity" };
- List<TSDataType> dataTypes =
- new List<TSDataType>{
+ List<TSDataType> dataTypes =
+ new List<TSDataType>{
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.FLOAT,
TSDataType.DOUBLE};
- List<ColumnCategory> columnCategories =
- new List<ColumnCategory>{
+ List<ColumnCategory> columnCategories =
+ new List<ColumnCategory>{
ColumnCategory.TAG,
ColumnCategory.TAG,
ColumnCategory.TAG,
ColumnCategory.ATTRIBUTE,
ColumnCategory.FIELD,
ColumnCategory.FIELD};
- var values = new List<List<object>> { };
- var timestamps = new List<long> { };
- for (long timestamp = 0; timestamp < 100; timestamp++)
- {
- timestamps.Add(timestamp);
- values.Add(new List<object> { "1", "5", "3", "A", 1.23F + timestamp, 111.1 + timestamp });
+ var values = new List<List<object>> { };
+ var timestamps = new List<long> { };
+ for (long timestamp = 0; timestamp < 100; timestamp++)
+ {
+ timestamps.Add(timestamp);
+ values.Add(new List<object> { "1", "5", "3", "A", 1.23F + timestamp, 111.1 + timestamp });
+ }
+ var tablet = new Tablet(tableName, columnNames, columnCategories, dataTypes, values, timestamps);
+
+ await tableSessionPool.InsertAsync(tablet);
+
+
+ res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 "
+ + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'");
+ res.ShowTableNames();
+ while (res.HasNext()) Console.WriteLine(res.Next());
+ await res.Close();
+
+ await tableSessionPool.Close();
}
- var tablet = new Tablet(tableName, columnNames, columnCategories, dataTypes, values, timestamps);
-
- await tableSessionPool.InsertAsync(tablet);
- res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 "
- + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'");
- res.ShowTableNames();
- while (res.HasNext()) Console.WriteLine(res.Next());
- await res.Close();
+ public async Task TestUseDatabase()
+ {
+ var tableSessionPool = new TableSessionPool.Builder()
+ .SetNodeUrls(sessionPoolTest.nodeUrls)
+ .SetUsername(sessionPoolTest.username)
+ .SetPassword(sessionPoolTest.password)
+ .SetDatabase("test1")
+ .SetFetchSize(1024)
+ .Build();
- await tableSessionPool.Close();
- }
+ await tableSessionPool.Open(false);
+
+ if (sessionPoolTest.debug) tableSessionPool.OpenDebugMode();
- public async Task TestUseDatabase()
- {
- var tableSessionPool = new TableSessionPool.Builder()
- .SetNodeUrls(sessionPoolTest.nodeUrls)
- .SetUsername(sessionPoolTest.username)
- .SetPassword(sessionPoolTest.password)
- .SetDatabase("test1")
- .SetFetchSize(1024)
- .Build();
+ // show tables from current database
+ var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
+ res.ShowTableNames();
+ while (res.HasNext()) Console.WriteLine(res.Next());
+ await res.Close();
- await tableSessionPool.Open(false);
+ await tableSessionPool.ExecuteNonQueryStatementAsync("use test2");
- if (sessionPoolTest.debug) tableSessionPool.OpenDebugMode();
+ // show tables from current database
+ res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
+ res.ShowTableNames();
+ while (res.HasNext()) Console.WriteLine(res.Next());
+ await res.Close();
+ await tableSessionPool.Close();
+ }
- // show tables from current database
- var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
- res.ShowTableNames();
- while (res.HasNext()) Console.WriteLine(res.Next());
- await res.Close();
+ public async Task TestCleanup()
+ {
+ var tableSessionPool = new TableSessionPool.Builder()
+ .SetNodeUrls(sessionPoolTest.nodeUrls)
+ .SetUsername(sessionPoolTest.username)
+ .SetPassword(sessionPoolTest.password)
+ .SetFetchSize(1024)
+ .Build();
- await tableSessionPool.ExecuteNonQueryStatementAsync("use test2");
+ await tableSessionPool.Open(false);
- // show tables from current database
- res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
- res.ShowTableNames();
- while (res.HasNext()) Console.WriteLine(res.Next());
- await res.Close();
+ if (sessionPoolTest.debug) tableSessionPool.OpenDebugMode();
- await tableSessionPool.Close();
- }
+ await tableSessionPool.ExecuteNonQueryStatementAsync("drop database test1");
+ await tableSessionPool.ExecuteNonQueryStatementAsync("drop database test2");
- public async Task TestCleanup()
- {
- var tableSessionPool = new TableSessionPool.Builder()
- .SetNodeUrls(sessionPoolTest.nodeUrls)
- .SetUsername(sessionPoolTest.username)
- .SetPassword(sessionPoolTest.password)
- .SetFetchSize(1024)
- .Build();
-
- await tableSessionPool.Open(false);
-
- if (sessionPoolTest.debug) tableSessionPool.OpenDebugMode();
-
- await tableSessionPool.ExecuteNonQueryStatementAsync("drop database test1");
- await tableSessionPool.ExecuteNonQueryStatementAsync("drop database test2");
-
- await tableSessionPool.Close();
- }
+ await tableSessionPool.Close();
+ }
}
diff --git a/src/Apache.IoTDB.Data/DataReaderExtensions.cs b/src/Apache.IoTDB.Data/DataReaderExtensions.cs
index 75f1137..afb8eb7 100644
--- a/src/Apache.IoTDB.Data/DataReaderExtensions.cs
+++ b/src/Apache.IoTDB.Data/DataReaderExtensions.cs
@@ -32,7 +32,7 @@
{
public static SessionPool CreateSession(this IoTDBConnectionStringBuilder db)
{
- return new SessionPool(db.DataSource, db.Port, db.Username, db.Password, db.FetchSize, db.ZoneId, db.PoolSize,db.Compression,db.TimeOut);
+ return new SessionPool(db.DataSource, db.Port, db.Username, db.Password, db.FetchSize, db.ZoneId, db.PoolSize, db.Compression, db.TimeOut);
}
public static List<T> ToObject<T>(this IDataReader dataReader)
diff --git a/src/Apache.IoTDB.Data/IoTDBCommand.cs b/src/Apache.IoTDB.Data/IoTDBCommand.cs
index adc9159..8daf86a 100644
--- a/src/Apache.IoTDB.Data/IoTDBCommand.cs
+++ b/src/Apache.IoTDB.Data/IoTDBCommand.cs
@@ -314,7 +314,7 @@
Debug.WriteLine($"_commandText:{_commandText}");
#endif
int _affectRows = 0;
- SessionDataSet dataSet=null;
+ SessionDataSet dataSet = null;
bool isok = false;
Task<SessionDataSet> taskDataSet = null;
if (_parameters.IsValueCreated)
@@ -336,9 +336,9 @@
}
}
- if (isok && dataSet != null )
+ if (isok && dataSet != null)
{
- dataReader = new IoTDBDataReader(this, dataSet, closeConnection );
+ dataReader = new IoTDBDataReader(this, dataSet, closeConnection);
}
else if (taskDataSet.Status == TaskStatus.Running || !isok)
{
@@ -354,7 +354,7 @@
}
else
{
- IoTDBException.ThrowExceptionForRC(_commandText, new IoTDBErrorResult() { Code = -10007, Error = $"Unknow Exception" });
+ IoTDBException.ThrowExceptionForRC(_commandText, new IoTDBErrorResult() { Code = -10007, Error = $"Unknown Exception" });
}
}
catch when (unprepared)
@@ -367,7 +367,7 @@
private RowRecord BindParamters(IoTDBParameterCollection pms)
{
var measures = new List<string>();
- var values = new List<object> ();
+ var values = new List<object>();
for (int i = 0; i < pms.Count; i++)
@@ -375,11 +375,11 @@
var tp = pms[i];
measures.Add(tp.ParameterName);
- // _commandText = _commandText.Replace(tp.ParameterName, "?");
+ // _commandText = _commandText.Replace(tp.ParameterName, "?");
switch (TypeInfo.GetTypeCode(tp.Value?.GetType()))
{
case TypeCode.Boolean:
- values.Add ((tp.Value as bool?).GetValueOrDefault());
+ values.Add((tp.Value as bool?).GetValueOrDefault());
break;
case TypeCode.Char:
values.Add(tp.Value as string);
@@ -431,7 +431,7 @@
}
}
- return new RowRecord(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),values,measures);
+ return new RowRecord(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), values, measures);
}
/// <summary>
@@ -523,8 +523,8 @@
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteNonQuery)}");
}
var result = Task.Run(() => _IoTDB.ExecuteNonQueryStatementAsync(_commandText));
- var ok = result.Wait(TimeSpan.FromSeconds(CommandTimeout));
- if (!ok) throw new TimeoutException();
+ var ok = result.Wait(TimeSpan.FromSeconds(CommandTimeout));
+ if (!ok) throw new TimeoutException();
return result.Result;
}
diff --git a/src/Apache.IoTDB.Data/IoTDBConnection.cs b/src/Apache.IoTDB.Data/IoTDBConnection.cs
index 860f25e..96b3154 100644
--- a/src/Apache.IoTDB.Data/IoTDBConnection.cs
+++ b/src/Apache.IoTDB.Data/IoTDBConnection.cs
@@ -45,7 +45,7 @@
private string _connectionString;
private ConnectionState _state;
- internal SessionPool _IoTDB;
+ internal SessionPool _IoTDB;
@@ -125,7 +125,7 @@
throw new NotImplementedException();
}
}
- public string ClientVersion
+ public string ClientVersion
{
get
{
@@ -177,7 +177,7 @@
{
OpenAsync().GetAwaiter().GetResult();
}
- public override async Task OpenAsync(CancellationToken cancellationToken=default)
+ public override async Task OpenAsync(CancellationToken cancellationToken = default)
{
if (State == ConnectionState.Open)
@@ -190,7 +190,7 @@
}
await _IoTDB.Open(ConnectionStringBuilder.Compression, cancellationToken);
- if (!_IoTDB.IsOpen())
+ if (!_IoTDB.IsOpen())
{
IoTDBException.ThrowExceptionForRC(-1, "Can't open IoTDB server.");
}
@@ -214,7 +214,7 @@
#endif
{
if (State != ConnectionState.Closed)
- await _IoTDB.Close();
+ await _IoTDB.Close();
Transaction?.Dispose();
_nowdatabase = string.Empty;
foreach (var reference in _commands)
@@ -343,9 +343,9 @@
}
internal string _nowdatabase = string.Empty;
- internal bool SelectedDataBase => _nowdatabase != string.Empty ;
+ internal bool SelectedDataBase => _nowdatabase != string.Empty;
- public override string Database => throw new NotSupportedException();
+ public override string Database => throw new NotSupportedException();
/// <summary>
/// Changes the current database.
diff --git a/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs b/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs
index 8c71b5e..c2cac87 100644
--- a/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs
+++ b/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs
@@ -65,24 +65,24 @@
private string _userName = "root";
private string _password = "root";
private bool _enableRpcCompression = false;
- private int _fetchSize = 1800;
+ private int _fetchSize = 1800;
private string _zoneId = "UTC+08:00";
- private int _port = 6667;
- private int _poolSize =8;
- private int _timeOut=10000;
+ private int _port = 6667;
+ private int _poolSize = 8;
+ private int _timeOut = 10000;
static IoTDBConnectionStringBuilder()
{
var validKeywords = new string[9];
validKeywords[(int)Keywords.DataSource] = DataSourceKeyword;
- validKeywords[(int)Keywords.Username] = UserNameKeyword;
+ validKeywords[(int)Keywords.Username] = UserNameKeyword;
validKeywords[(int)Keywords.Password] = PasswordKeyword;
validKeywords[(int)Keywords.Port] = PortKeyword;
validKeywords[(int)Keywords.FetchSize] = FetchSizeKeyword;
validKeywords[(int)Keywords.Compression] = CompressionKeyword;
validKeywords[(int)Keywords.PoolSize] = PoolSizeKeyword;
validKeywords[(int)Keywords.ZoneId] = ZoneIdKeyword;
- validKeywords[(int)Keywords.TimeOut] =TimeOutKeyword;
+ validKeywords[(int)Keywords.TimeOut] = TimeOutKeyword;
_validKeywords = validKeywords;
_keywords = new Dictionary<string, Keywords>(9, StringComparer.OrdinalIgnoreCase)
@@ -129,13 +129,13 @@
get => _userName;
set => base[UserNameKeyword] = _userName = value;
}
-
+
public virtual string Password
{
get => _password;
set => base[PasswordKeyword] = _password = value;
}
- public virtual int Port
+ public virtual int Port
{
get => _port;
set => base[PortKeyword] = _port = value;
@@ -163,7 +163,7 @@
}
public virtual int TimeOut
- {
+ {
get => _timeOut;
set => base[PoolSizeKeyword] = _timeOut = value;
}
@@ -215,19 +215,19 @@
return;
}
-
+
switch (GetIndex(keyword))
{
case Keywords.DataSource:
DataSource = Convert.ToString(value, CultureInfo.InvariantCulture);
return;
case Keywords.Username:
- Username= Convert.ToString(value, CultureInfo.InvariantCulture);
+ Username = Convert.ToString(value, CultureInfo.InvariantCulture);
return;
case Keywords.Password:
Password = Convert.ToString(value, CultureInfo.InvariantCulture);
return;
-
+
case Keywords.Port:
Port = Convert.ToInt32(value, CultureInfo.InvariantCulture);
return;
@@ -354,7 +354,7 @@
private object GetAt(Keywords index)
{
-
+
switch (index)
{
case Keywords.DataSource:
@@ -363,7 +363,7 @@
return Password;
case Keywords.Username:
return Username;
-
+
case Keywords.Port:
return Port;
case Keywords.FetchSize:
@@ -401,7 +401,7 @@
_userName = "root";
return;
case Keywords.Port:
- _port=6667;
+ _port = 6667;
return;
case Keywords.FetchSize:
_fetchSize = 1800;
diff --git a/src/Apache.IoTDB.Data/IoTDBDataReader.cs b/src/Apache.IoTDB.Data/IoTDBDataReader.cs
index 88c4de2..da4f107 100644
--- a/src/Apache.IoTDB.Data/IoTDBDataReader.cs
+++ b/src/Apache.IoTDB.Data/IoTDBDataReader.cs
@@ -41,12 +41,12 @@
private bool _hasRows;
private readonly int _recordsAffected;
private bool _closed;
- private readonly List<string> _metas;
+ private readonly List<string> _metas;
private bool _closeConnection;
private int _fieldCount;
- RowRecord rowdata= null;
+ RowRecord rowdata = null;
SessionDataSet _dataSet;
@@ -57,7 +57,7 @@
_closeConnection = closeConnection;
_fieldCount = dataSet.ColumnNames.Count;
_hasRows = dataSet.RowCount > 0;
- _recordsAffected =dataSet.RowCount;
+ _recordsAffected = dataSet.RowCount;
_closed = _closeConnection;
_metas = dataSet.ColumnNames;
_dataSet = dataSet;
@@ -73,7 +73,7 @@
/// Gets the number of columns in the current row.
/// </summary>
/// <value>The number of columns in the current row.</value>
- public override int FieldCount => _fieldCount+1;
+ public override int FieldCount => _fieldCount + 1;
/// <summary>
/// Gets a value indicating whether the data reader contains any rows.
@@ -178,7 +178,7 @@
_dataSet.Close().GetAwaiter().GetResult();
_command.DataReader = null;
- if (_closeConnection )
+ if (_closeConnection)
{
_command.Connection.Close();
_closed = true;
@@ -194,7 +194,7 @@
/// <returns>The name of the column.</returns>
public override string GetName(int ordinal)
{
- return ordinal==0? "timestamp" : rowdata.Measurements[ordinal-1];
+ return ordinal == 0 ? "timestamp" : rowdata.Measurements[ordinal - 1];
}
/// <summary>
@@ -203,7 +203,7 @@
/// <param name="name">The name of the column.</param>
/// <returns>The zero-based column ordinal.</returns>
public override int GetOrdinal(string name)
- => "timestamp"==name?0: rowdata.Measurements.IndexOf( name)+1;
+ => "timestamp" == name ? 0 : rowdata.Measurements.IndexOf(name) + 1;
public override string GetDataTypeName(int ordinal)
{
@@ -218,7 +218,7 @@
public override Type GetFieldType(int ordinal)
{
- return ordinal==0?typeof(DateTime): rowdata.GetCrlType(ordinal-1);
+ return ordinal == 0 ? typeof(DateTime) : rowdata.GetCrlType(ordinal - 1);
}
/// <summary>
@@ -438,9 +438,9 @@
for (int i = 0; i < _fieldCount; i++)
{
var obj = rowdata.Values[i];
- if (obj != null )
+ if (obj != null)
{
- values[i+1] = obj;
+ values[i + 1] = obj;
count++;
}
}
@@ -458,8 +458,8 @@
{
rowdata = _dataSet.GetRow();
}
- var schemaTable = new DataTable("SchemaTable");
- if (_metas != null && rowdata !=null)
+ var schemaTable = new DataTable("SchemaTable");
+ if (_metas != null && rowdata != null)
{
var ColumnName = new DataColumn(SchemaTableColumn.ColumnName, typeof(string));
var ColumnOrdinal = new DataColumn(SchemaTableColumn.ColumnOrdinal, typeof(int));
@@ -534,11 +534,11 @@
schemaTable.Rows.Add(schemaRow1);
- for (var i = 1; i < rowdata.Measurements.Count+1; i++)
+ for (var i = 1; i < rowdata.Measurements.Count + 1; i++)
{
var schemaRow = schemaTable.NewRow();
- var columnName = rowdata.Measurements[i-1] ;
+ var columnName = rowdata.Measurements[i - 1];
schemaRow[ColumnName] = columnName;
schemaRow[ColumnOrdinal] = i;
@@ -554,7 +554,7 @@
schemaRow[DataTypeName] = GetDataTypeName(i);
schemaRow[IsExpression] = columnName == null;
schemaRow[IsLong] = DBNull.Value;
- schemaRow[IsKey]= false;
+ schemaRow[IsKey] = false;
schemaRow[AllowDBNull] = true;
schemaTable.Rows.Add(schemaRow);
}
diff --git a/src/Apache.IoTDB.Data/IoTDBException.cs b/src/Apache.IoTDB.Data/IoTDBException.cs
index 978c9e8..2f39125 100644
--- a/src/Apache.IoTDB.Data/IoTDBException.cs
+++ b/src/Apache.IoTDB.Data/IoTDBException.cs
@@ -47,7 +47,7 @@
public override string Message => _IoTDBError?.Error;
- public override int ErrorCode => (int) _IoTDBError?.Code;
+ public override int ErrorCode => (int)_IoTDBError?.Code;
/// <summary>
/// Throws an exception with a specific IoTDB error code value.
/// </summary>
@@ -58,29 +58,29 @@
/// </remarks>
public static void ThrowExceptionForRC(string _commandText, IoTDBErrorResult IoTDBError)
{
- var te = new IoTDBException(IoTDBError);
- te.Data.Add("commandText", _commandText);
- throw te;
+ var tmp_exception = new IoTDBException(IoTDBError);
+ tmp_exception.Data.Add("commandText", _commandText);
+ throw tmp_exception;
}
- public static void ThrowExceptionForRC( IoTDBErrorResult IoTDBError)
+ public static void ThrowExceptionForRC(IoTDBErrorResult IoTDBError)
{
- var te = new IoTDBException(IoTDBError);
- throw te;
+ var tmp_exception = new IoTDBException(IoTDBError);
+ throw tmp_exception;
}
public static void ThrowExceptionForRC(IntPtr _IoTDB)
{
- var te = new IoTDBException(new IoTDBErrorResult() { });
- throw te;
+ var tmp_exception = new IoTDBException(new IoTDBErrorResult() { });
+ throw tmp_exception;
}
public static void ThrowExceptionForRC(int code, string message, Exception ex)
{
- var te = new IoTDBException(new IoTDBErrorResult() { Code = code, Error = message }, ex);
- throw te;
+ var tmp_exception = new IoTDBException(new IoTDBErrorResult() { Code = code, Error = message }, ex);
+ throw tmp_exception;
}
public static void ThrowExceptionForRC(int code, string message)
{
- var te = new IoTDBException(new IoTDBErrorResult() { Code = code, Error = message });
- throw te;
+ var tmp_exception = new IoTDBException(new IoTDBErrorResult() { Code = code, Error = message });
+ throw tmp_exception;
}
}
}
diff --git a/src/Apache.IoTDB.Data/IoTDBParameter.cs b/src/Apache.IoTDB.Data/IoTDBParameter.cs
index 03b7667..f49ea99 100644
--- a/src/Apache.IoTDB.Data/IoTDBParameter.cs
+++ b/src/Apache.IoTDB.Data/IoTDBParameter.cs
@@ -17,7 +17,7 @@
* under the License.
*/
- using System;
+using System;
using System.Data;
using System.Data.Common;
@@ -42,7 +42,7 @@
public IoTDBParameter()
{
}
-
+
/// <summary>
/// Initializes a new instance of the <see cref="IoTDBParameter" /> class.
/// </summary>
@@ -101,7 +101,7 @@
/// </summary>
/// <value>The type of the parameter.</value>
/// <remarks>Due to IoTDB's dynamic type system, parameter values are not converted.</remarks>
-
+
public override DbType DbType { get; set; } = DbType.String;
/// <summary>
@@ -221,6 +221,6 @@
IoTDBType = TSDataType.NONE;
}
-
+
}
}
diff --git a/src/Apache.IoTDB.Data/IoTDBParameterCollection.cs b/src/Apache.IoTDB.Data/IoTDBParameterCollection.cs
index 201448d..830e1e6 100644
--- a/src/Apache.IoTDB.Data/IoTDBParameterCollection.cs
+++ b/src/Apache.IoTDB.Data/IoTDBParameterCollection.cs
@@ -183,7 +183,7 @@
/// <param name="parameterName">The name of the parameter.</param>
/// <param name="value">The value of the parameter. Can be null.</param>
/// <returns>The parameter that was added.</returns>
- public virtual IoTDBParameter AddWithValue( object value)
+ public virtual IoTDBParameter AddWithValue(object value)
{
var parameter = new IoTDBParameter(Guid.NewGuid().ToString(), value);
Add(parameter);
@@ -354,7 +354,7 @@
protected override void SetParameter(string parameterName, DbParameter value)
=> SetParameter(IndexOfChecked(parameterName), value);
-
+
private int IndexOfChecked(string parameterName)
{
diff --git a/src/Apache.IoTDB.Data/IoTDBResult.cs b/src/Apache.IoTDB.Data/IoTDBResult.cs
index 1da73f2..fae9066 100644
--- a/src/Apache.IoTDB.Data/IoTDBResult.cs
+++ b/src/Apache.IoTDB.Data/IoTDBResult.cs
@@ -26,10 +26,10 @@
public class IoTDBErrorResult
{
- public int Code { get; set; }
+ public int Code { get; set; }
/// <summary>
///
/// </summary>
- public string Error { get; set; }
+ public string Error { get; set; }
}
}
diff --git a/src/Apache.IoTDB.Data/IoTDBTransaction.cs b/src/Apache.IoTDB.Data/IoTDBTransaction.cs
index a0a4c79..34fc547 100644
--- a/src/Apache.IoTDB.Data/IoTDBTransaction.cs
+++ b/src/Apache.IoTDB.Data/IoTDBTransaction.cs
@@ -62,7 +62,7 @@
/// </summary>
/// <value>The isolation level for the transaction.</value>
public override IsolationLevel IsolationLevel => IsolationLevel.Unspecified;
-
+
/// <summary>
/// Applies the changes made in the transaction.
@@ -73,7 +73,7 @@
//{
// throw new InvalidOperationException(Resources.TransactionCompleted);
//}
-
+
//_connection.ExecuteNonQuery("COMMIT;");
Complete();
}
@@ -109,7 +109,7 @@
private void Complete()
{
- if (_connection!=null)_connection.Transaction = null;
+ if (_connection != null) _connection.Transaction = null;
_connection = null;
_completed = true;
}
diff --git a/src/Apache.IoTDB/Client.cs b/src/Apache.IoTDB/Client.cs
index dead7b6..03238d2 100644
--- a/src/Apache.IoTDB/Client.cs
+++ b/src/Apache.IoTDB/Client.cs
@@ -38,4 +38,4 @@
EndPoint = endpoint;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/ConcurrentClientQueue.cs b/src/Apache.IoTDB/ConcurrentClientQueue.cs
index 0c54a12..e53d3f3 100644
--- a/src/Apache.IoTDB/ConcurrentClientQueue.cs
+++ b/src/Apache.IoTDB/ConcurrentClientQueue.cs
@@ -71,7 +71,8 @@
{
Client client = null;
Monitor.Enter(ClientQueue);
- while(true){
+ while (true)
+ {
bool timeout = false;
if (ClientQueue.IsEmpty)
{
@@ -79,7 +80,8 @@
}
ClientQueue.TryDequeue(out client);
- if(client != null || timeout){
+ if (client != null || timeout)
+ {
break;
}
}
@@ -91,4 +93,4 @@
return client;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/DataStructure/BitMap.cs b/src/Apache.IoTDB/DataStructure/BitMap.cs
index d5a66c6..5a1b607 100644
--- a/src/Apache.IoTDB/DataStructure/BitMap.cs
+++ b/src/Apache.IoTDB/DataStructure/BitMap.cs
@@ -146,4 +146,4 @@
}
return true;
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/DataStructure/ByteBuffer.cs b/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
index b97583f..a3e1823 100644
--- a/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
+++ b/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
@@ -135,10 +135,10 @@
public byte[] GetBinary()
{
- var length = GetInt();
- var buff = _buffer[_readPos..(_readPos + length)];
- _readPos += length;
- return buff;
+ var length = GetInt();
+ var buff = _buffer[_readPos..(_readPos + length)];
+ _readPos += length;
+ return buff;
}
public byte[] GetBuffer()
@@ -225,7 +225,7 @@
_writePos += strBuf.Length;
}
- public void AddBinary(byte[] value)
+ public void AddBinary(byte[] value)
{
AddInt(value.Length);
@@ -251,4 +251,4 @@
_writePos += 1;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/DataStructure/RowRecord.cs b/src/Apache.IoTDB/DataStructure/RowRecord.cs
index 9674f49..c5853bf 100644
--- a/src/Apache.IoTDB/DataStructure/RowRecord.cs
+++ b/src/Apache.IoTDB/DataStructure/RowRecord.cs
@@ -32,18 +32,18 @@
public List<TSDataType> DataTypes { get; }
public RowRecord(DateTime timestamp, List<object> values, List<string> measurements, List<string> dataTypes)
- :this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values,measurements, dataTypes)
+ : this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values, measurements, dataTypes)
{
}
public RowRecord(DateTime timestamp, List<object> values, List<string> measurements, List<TSDataType> dataTypes)
- :this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values,measurements, dataTypes)
+ : this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values, measurements, dataTypes)
{
}
[Obsolete("Use the constructor with List<TSDataType> instead")]
public RowRecord(DateTime timestamp, List<object> values, List<string> measurements)
- :this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values,measurements)
+ : this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values, measurements)
{
}
[Obsolete("Use the constructor with List<TSDataType> instead")]
@@ -53,7 +53,8 @@
Values = values;
Measurements = measurements;
}
- public RowRecord(long timestamps, List<object> values, List<string> measurements, List<string> dataTypes){
+ public RowRecord(long timestamps, List<object> values, List<string> measurements, List<string> dataTypes)
+ {
Timestamps = timestamps;
Values = values;
Measurements = measurements;
@@ -132,27 +133,27 @@
foreach (var rowValue in Values)
{
str += "\t\t";
- if(rowValue is byte[] bytes)
+ if (rowValue is byte[] bytes)
{
- str += Utils.ByteArrayToHexString(bytes);
+ str += Utils.ByteArrayToHexString(bytes);
}
else
{
- str += rowValue.ToString();
+ str += rowValue.ToString();
}
}
return str;
}
-
+
public Type GetCrlType(int index)
{
- Type tSDataType = typeof(object);
+ Type tSDataType = typeof(object);
var valueType = Values[index];
switch (valueType)
{
case bool _:
- tSDataType = typeof( bool);
+ tSDataType = typeof(bool);
break;
case int _:
tSDataType = typeof(int);
@@ -190,43 +191,43 @@
switch (dataType)
{
case TSDataType.BOOLEAN:
- buffer.AddByte((byte) TSDataType.BOOLEAN);
+ buffer.AddByte((byte)TSDataType.BOOLEAN);
buffer.AddBool((bool)value);
break;
case TSDataType.INT32:
- buffer.AddByte((byte) TSDataType.INT32);
+ buffer.AddByte((byte)TSDataType.INT32);
buffer.AddInt((int)value);
break;
case TSDataType.INT64:
- buffer.AddByte((byte) TSDataType.INT64);
+ buffer.AddByte((byte)TSDataType.INT64);
buffer.AddLong((long)value);
break;
case TSDataType.FLOAT:
- buffer.AddByte((byte) TSDataType.FLOAT);
+ buffer.AddByte((byte)TSDataType.FLOAT);
buffer.AddFloat((float)value);
break;
case TSDataType.DOUBLE:
- buffer.AddByte((byte) TSDataType.DOUBLE);
+ buffer.AddByte((byte)TSDataType.DOUBLE);
buffer.AddDouble((double)value);
break;
case TSDataType.TEXT:
- buffer.AddByte((byte) TSDataType.TEXT);
+ buffer.AddByte((byte)TSDataType.TEXT);
buffer.AddStr((string)value);
break;
case TSDataType.TIMESTAMP:
- buffer.AddByte((byte) TSDataType.TIMESTAMP);
+ buffer.AddByte((byte)TSDataType.TIMESTAMP);
buffer.AddLong((long)value);
break;
case TSDataType.BLOB:
- buffer.AddByte((byte) TSDataType.BLOB);
+ buffer.AddByte((byte)TSDataType.BLOB);
buffer.AddBinary((byte[])value);
break;
case TSDataType.DATE:
- buffer.AddByte((byte) TSDataType.DATE);
+ buffer.AddByte((byte)TSDataType.DATE);
buffer.AddInt(Utils.ParseDateToInt((DateTime)value));
break;
case TSDataType.STRING:
- buffer.AddByte((byte) TSDataType.STRING);
+ buffer.AddByte((byte)TSDataType.STRING);
buffer.AddStr((string)value);
break;
default:
@@ -262,4 +263,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
index 7ba49ea..3d606f4 100644
--- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
+++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
@@ -81,18 +81,25 @@
int deduplicateIdx = 0;
Dictionary<string, int> columnToFirstIndexMap = new Dictionary<string, int>();
- for(var i = 0; i < _columnSize; i++){
+ for (var i = 0; i < _columnSize; i++)
+ {
var columnName = _columnNames[i];
- if(_columnNameIndexMap.ContainsKey(columnName)){
+ if (_columnNameIndexMap.ContainsKey(columnName))
+ {
_duplicateLocation[i] = columnToFirstIndexMap[columnName];
- } else {
+ }
+ else
+ {
columnToFirstIndexMap[columnName] = i;
- if(resp.ColumnNameIndexMap != null) {
+ if (resp.ColumnNameIndexMap != null)
+ {
int valueIndex = resp.ColumnNameIndexMap[columnName];
_columnNameIndexMap[columnName] = valueIndex;
_valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[valueIndex]));
_bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[valueIndex]));
- } else {
+ }
+ else
+ {
_columnNameIndexMap[columnName] = deduplicateIdx;
_valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[deduplicateIdx]));
_bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[deduplicateIdx]));
@@ -182,14 +189,14 @@
private void ConstructOneRow()
{
- List<object> fieldLst = new List<Object>();
+ List<object> fieldList = new List<Object>();
for (int i = 0; i < _columnSize; i++)
{
if (_duplicateLocation.ContainsKey(i))
{
- var field = fieldLst[_duplicateLocation[i]];
- fieldLst.Add(field);
+ var field = fieldList[_duplicateLocation[i]];
+ fieldList.Add(field);
}
else
{
@@ -213,7 +220,7 @@
localField = columnValueBuffer.GetBool();
break;
case TSDataType.INT32:
- // case TSDataType.DATE:
+ // case TSDataType.DATE:
localField = columnValueBuffer.GetInt();
break;
case TSDataType.DATE:
@@ -231,7 +238,7 @@
break;
case TSDataType.TEXT:
case TSDataType.STRING:
- // case TSDataType.BLOB:
+ // case TSDataType.BLOB:
localField = columnValueBuffer.GetStr();
break;
case TSDataType.BLOB:
@@ -243,19 +250,19 @@
throw new TException(err_msg, null);
}
- fieldLst.Add(localField);
+ fieldList.Add(localField);
}
else
{
localField = null;
- fieldLst.Add(DBNull.Value);
+ fieldList.Add(DBNull.Value);
}
}
}
long timestamp = _timeBuffer.GetLong();
_rowIndex += 1;
- _cachedRowRecord = new RowRecord(timestamp, fieldLst, _columnNames);
+ _cachedRowRecord = new RowRecord(timestamp, fieldList, _columnNames);
}
private bool IsNull(int loc, int row_index)
@@ -359,4 +366,4 @@
GC.SuppressFinalize(this);
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/DataStructure/Tablet.cs b/src/Apache.IoTDB/DataStructure/Tablet.cs
index b6e2ca1..34007ee 100644
--- a/src/Apache.IoTDB/DataStructure/Tablet.cs
+++ b/src/Apache.IoTDB/DataStructure/Tablet.cs
@@ -38,7 +38,7 @@
*
* @deprecated
* Notice: The tablet should not have empty cell
- *
+ *
* From 0.13 IoTDB Server, tablet could have empty cell
*
*/
@@ -471,4 +471,4 @@
return buffer.GetBuffer();
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/IoTDBConstants.cs b/src/Apache.IoTDB/IoTDBConstants.cs
index 7dddd72..13fe80e 100644
--- a/src/Apache.IoTDB/IoTDBConstants.cs
+++ b/src/Apache.IoTDB/IoTDBConstants.cs
@@ -96,8 +96,9 @@
private TsFileConstant() { }
}
- public class IoTDBConstant {
+ public class IoTDBConstant
+ {
public static string TREE_SQL_DIALECT = "tree";
public static string TABLE_SQL_DIALECT = "table";
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/SessionPool.Builder.cs b/src/Apache.IoTDB/SessionPool.Builder.cs
index fcd10ce..69b8e42 100644
--- a/src/Apache.IoTDB/SessionPool.Builder.cs
+++ b/src/Apache.IoTDB/SessionPool.Builder.cs
@@ -24,116 +24,116 @@
public partial class SessionPool
{
- public class Builder
- {
- private string _host = "localhost";
- private int _port = 6667;
- private string _username = "root";
- private string _password = "root";
- private int _fetchSize = 1024;
- private string _zoneId = "UTC+08:00";
- private int _poolSize = 8;
- private bool _enableRpcCompression = false;
- private int _connectionTimeoutInMs = 500;
- private string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
- private string _database = "";
- private List<string> _nodeUrls = new List<string>();
-
- public Builder SetHost(string host)
+ public class Builder
{
- _host = host;
- return this;
- }
+ private string _host = "localhost";
+ private int _port = 6667;
+ private string _username = "root";
+ private string _password = "root";
+ private int _fetchSize = 1024;
+ private string _zoneId = "UTC+08:00";
+ private int _poolSize = 8;
+ private bool _enableRpcCompression = false;
+ private int _connectionTimeoutInMs = 500;
+ private string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
+ private string _database = "";
+ private List<string> _nodeUrls = new List<string>();
- public Builder SetPort(int port)
- {
- _port = port;
- return this;
- }
+ public Builder SetHost(string host)
+ {
+ _host = host;
+ return this;
+ }
- public Builder SetUsername(string username)
- {
- _username = username;
- return this;
- }
+ public Builder SetPort(int port)
+ {
+ _port = port;
+ return this;
+ }
- public Builder SetPassword(string password)
- {
- _password = password;
- return this;
- }
+ public Builder SetUsername(string username)
+ {
+ _username = username;
+ return this;
+ }
- public Builder SetFetchSize(int fetchSize)
- {
- _fetchSize = fetchSize;
- return this;
- }
+ public Builder SetPassword(string password)
+ {
+ _password = password;
+ return this;
+ }
- public Builder SetZoneId(string zoneId)
- {
- _zoneId = zoneId;
- return this;
- }
+ public Builder SetFetchSize(int fetchSize)
+ {
+ _fetchSize = fetchSize;
+ return this;
+ }
- public Builder SetPoolSize(int poolSize)
- {
- _poolSize = poolSize;
- return this;
- }
+ public Builder SetZoneId(string zoneId)
+ {
+ _zoneId = zoneId;
+ return this;
+ }
- public Builder SetEnableRpcCompression(bool enableRpcCompression)
- {
- _enableRpcCompression = enableRpcCompression;
- return this;
- }
+ public Builder SetPoolSize(int poolSize)
+ {
+ _poolSize = poolSize;
+ return this;
+ }
- public Builder SetConnectionTimeoutInMs(int timeout)
- {
- _connectionTimeoutInMs = timeout;
- return this;
- }
+ public Builder SetEnableRpcCompression(bool enableRpcCompression)
+ {
+ _enableRpcCompression = enableRpcCompression;
+ return this;
+ }
- public Builder SetNodeUrl(List<string> nodeUrls)
- {
- _nodeUrls = nodeUrls;
- return this;
- }
+ public Builder SetConnectionTimeoutInMs(int timeout)
+ {
+ _connectionTimeoutInMs = timeout;
+ return this;
+ }
- protected internal Builder SetSqlDialect(string sqlDialect)
- {
- _sqlDialect = sqlDialect;
- return this;
- }
+ public Builder SetNodeUrl(List<string> nodeUrls)
+ {
+ _nodeUrls = nodeUrls;
+ return this;
+ }
- protected internal Builder SetDatabase(string database)
- {
- _database = database;
- return this;
- }
+ protected internal Builder SetSqlDialect(string sqlDialect)
+ {
+ _sqlDialect = sqlDialect;
+ return this;
+ }
- public Builder()
- {
- _host = "localhost";
- _port = 6667;
- _username = "root";
- _password = "root";
- _fetchSize = 1024;
- _zoneId = "UTC+08:00";
- _poolSize = 8;
- _enableRpcCompression = false;
- _connectionTimeoutInMs = 500;
- _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
- _database = "";
- }
+ protected internal Builder SetDatabase(string database)
+ {
+ _database = database;
+ return this;
+ }
- public SessionPool Build()
- {
- // if nodeUrls is not empty, use nodeUrls to create session pool
- if (_nodeUrls.Count > 0)
- {
- return new SessionPool(_nodeUrls, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
- }
- return new SessionPool(_host, _port, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
+ public Builder()
+ {
+ _host = "localhost";
+ _port = 6667;
+ _username = "root";
+ _password = "root";
+ _fetchSize = 1024;
+ _zoneId = "UTC+08:00";
+ _poolSize = 8;
+ _enableRpcCompression = false;
+ _connectionTimeoutInMs = 500;
+ _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
+ _database = "";
+ }
+
+ public SessionPool Build()
+ {
+ // if nodeUrls is not empty, use nodeUrls to create session pool
+ if (_nodeUrls.Count > 0)
+ {
+ return new SessionPool(_nodeUrls, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
+ }
+ return new SessionPool(_host, _port, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
+ }
}
- }
}
diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index c6bd70f..0fb9089 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -35,1750 +35,1749 @@
namespace Apache.IoTDB
{
- public partial class SessionPool : IDisposable
- {
- private static readonly TSProtocolVersion ProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
+ public partial class SessionPool : IDisposable
+ {
+ private static readonly TSProtocolVersion ProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
- private readonly string _username;
- private readonly string _password;
- private bool _enableRpcCompression;
- private string _zoneId;
- private readonly List<string> _nodeUrls = new();
- private readonly List<TEndPoint> _endPoints = new();
- private readonly string _host;
- private readonly int _port;
- private readonly int _fetchSize;
- /// <summary>
- /// _timeout is the amount of time a Session will wait for a send operation to complete successfully.
- /// </summary>
- private readonly int _timeout;
- private readonly int _poolSize = 4;
- private readonly string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
- private string _database;
- private readonly Utils _utilFunctions = new();
- private const int RetryNum = 3;
- private bool _debugMode;
- private bool _isClose = true;
- private ConcurrentClientQueue _clients;
- private ILogger _logger;
- public delegate Task<TResult> AsyncOperation<TResult>(Client client);
+ private readonly string _username;
+ private readonly string _password;
+ private bool _enableRpcCompression;
+ private string _zoneId;
+ private readonly List<string> _nodeUrls = new();
+ private readonly List<TEndPoint> _endPoints = new();
+ private readonly string _host;
+ private readonly int _port;
+ private readonly int _fetchSize;
+ /// <summary>
+ /// _timeout is the amount of time a Session will wait for a send operation to complete successfully.
+ /// </summary>
+ private readonly int _timeout;
+ private readonly int _poolSize = 4;
+ private readonly string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
+ private string _database;
+ private readonly Utils _utilFunctions = new();
+ private const int RetryNum = 3;
+ private bool _debugMode;
+ private bool _isClose = true;
+ private ConcurrentClientQueue _clients;
+ private ILogger _logger;
+ public delegate Task<TResult> AsyncOperation<TResult>(Client client);
- [Obsolete("This method is deprecated, please use new SessionPool.Builder().")]
- public SessionPool(string host, int port, int poolSize)
- : this(host, port, "root", "root", 1024, "UTC+08:00", poolSize, true, 60)
- {
- }
-
- [Obsolete(" This method is deprecated, please use new SessionPool.Builder().")]
- public SessionPool(string host, int port, string username, string password)
- : this(host, port, username, password, 1024, "UTC+08:00", 8, true, 60)
- {
- }
-
- public SessionPool(string host, int port, string username, string password, int fetchSize)
- : this(host, port, username, password, fetchSize, "UTC+08:00", 8, true, 60)
- {
-
- }
-
- public SessionPool(string host, int port) : this(host, port, "root", "root", 1024, "UTC+08:00", 8, true, 60)
- {
- }
- public SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout)
- : this(host, port, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, IoTDBConstant.TREE_SQL_DIALECT, "")
- {
- }
- protected internal SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, string sqlDialect, string database)
- {
- _host = host;
- _port = port;
- _username = username;
- _password = password;
- _zoneId = zoneId;
- _fetchSize = fetchSize;
- _debugMode = false;
- _poolSize = poolSize;
- _enableRpcCompression = enableRpcCompression;
- _timeout = timeout;
- _sqlDialect = sqlDialect;
- _database = database;
- }
- /// <summary>
- /// Initializes a new instance of the <see cref="SessionPool"/> class.
- /// </summary>
- /// <param name="nodeUrls">The list of node URLs to connect to, multiple ip:rpcPort eg.127.0.0.1:9001</param>
- /// <param name="poolSize">The size of the session pool.</param>
- public SessionPool(List<string> nodeUrls, int poolSize)
- : this(nodeUrls, "root", "root", 1024, "UTC+08:00", poolSize, true, 60)
- {
- }
- public SessionPool(List<string> nodeUrls, string username, string password)
- : this(nodeUrls, username, password, 1024, "UTC+08:00", 8, true, 60)
- {
- }
- public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize)
- : this(nodeUrls, username, password, fetchSize, "UTC+08:00", 8, true, 60)
- {
- }
- public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId)
- : this(nodeUrls, username, password, fetchSize, zoneId, 8, true, 60)
- {
- }
- public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout)
- : this(nodeUrls, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, IoTDBConstant.TREE_SQL_DIALECT, "")
- {
-
- }
- protected internal SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, string sqlDialect, string database)
- {
- if (nodeUrls.Count == 0)
- {
- throw new ArgumentException("nodeUrls shouldn't be empty.");
- }
- _nodeUrls = nodeUrls;
- _endPoints = _utilFunctions.ParseSeedNodeUrls(nodeUrls);
- _username = username;
- _password = password;
- _zoneId = zoneId;
- _fetchSize = fetchSize;
- _debugMode = false;
- _poolSize = poolSize;
- _enableRpcCompression = enableRpcCompression;
- _timeout = timeout;
- _sqlDialect = sqlDialect;
- _database = database;
- }
- public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string errMsg, bool retryOnFailure = true, bool putClientBack=true)
- {
- Client client = _clients.Take();
- try
- {
- var resp = await operation(client);
- return resp;
- }
- catch (TException ex)
- {
- if (retryOnFailure)
+ [Obsolete("This method is deprecated, please use new SessionPool.Builder().")]
+ public SessionPool(string host, int port, int poolSize)
+ : this(host, port, "root", "root", 1024, "UTC+08:00", poolSize, true, 60)
{
- try
- {
- client = await Reconnect(client);
- return await operation(client);
- }
- catch (TException retryEx)
- {
- throw new TException(errMsg, retryEx);
- }
}
- else
+
+ [Obsolete(" This method is deprecated, please use new SessionPool.Builder().")]
+ public SessionPool(string host, int port, string username, string password)
+ : this(host, port, username, password, 1024, "UTC+08:00", 8, true, 60)
{
- throw new TException(errMsg, ex);
}
- }
- catch (Exception ex)
- {
- if (retryOnFailure)
+
+ public SessionPool(string host, int port, string username, string password, int fetchSize)
+ : this(host, port, username, password, fetchSize, "UTC+08:00", 8, true, 60)
{
- try
- {
- client = await Reconnect(client);
- return await operation(client);
- }
- catch (TException retryEx)
- {
- throw new TException(errMsg, retryEx);
- }
+
}
- else
+
+ public SessionPool(string host, int port) : this(host, port, "root", "root", 1024, "UTC+08:00", 8, true, 60)
{
- throw new TException(errMsg, ex);
}
- }
- finally
- {
- if(putClientBack)
+ public SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout)
+ : this(host, port, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, IoTDBConstant.TREE_SQL_DIALECT, "")
{
- _clients.Add(client);
}
- }
- }
- /// <summary>
- /// Gets or sets the amount of time a Session will wait for a send operation to complete successfully.
- /// </summary>
- /// <remarks> The send time-out value, in milliseconds. The default is 10000.</remarks>
- public int TimeOut { get; set; } = 10000;
-
- ILoggerFactory factory;
- private bool disposedValue;
-
- public void OpenDebugMode(Action<ILoggingBuilder> configure)
- {
- _debugMode = true;
- factory = LoggerFactory.Create(configure);
- _logger = factory.CreateLogger(nameof(Apache.IoTDB));
- }
-
- public void CloseDebugMode()
- {
- _debugMode = false;
- }
-
- public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default)
- {
- _enableRpcCompression = enableRpcCompression;
- await Open(cancellationToken);
- }
-
- public async Task Open(CancellationToken cancellationToken = default)
- {
- _clients = new ConcurrentClientQueue();
- _clients.Timeout = _timeout * 5;
-
- if (_nodeUrls.Count == 0)
- {
- for (var index = 0; index < _poolSize; index++)
+ protected internal SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, string sqlDialect, string database)
{
- try
- {
- _clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken));
- }
- catch (Exception e)
- {
- if (_debugMode)
+ _host = host;
+ _port = port;
+ _username = username;
+ _password = password;
+ _zoneId = zoneId;
+ _fetchSize = fetchSize;
+ _debugMode = false;
+ _poolSize = poolSize;
+ _enableRpcCompression = enableRpcCompression;
+ _timeout = timeout;
+ _sqlDialect = sqlDialect;
+ _database = database;
+ }
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SessionPool"/> class.
+ /// </summary>
+ /// <param name="nodeUrls">The list of node URLs to connect to, multiple ip:rpcPort eg.127.0.0.1:9001</param>
+ /// <param name="poolSize">The size of the session pool.</param>
+ public SessionPool(List<string> nodeUrls, int poolSize)
+ : this(nodeUrls, "root", "root", 1024, "UTC+08:00", poolSize, true, 60)
+ {
+ }
+ public SessionPool(List<string> nodeUrls, string username, string password)
+ : this(nodeUrls, username, password, 1024, "UTC+08:00", 8, true, 60)
+ {
+ }
+ public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize)
+ : this(nodeUrls, username, password, fetchSize, "UTC+08:00", 8, true, 60)
+ {
+ }
+ public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId)
+ : this(nodeUrls, username, password, fetchSize, zoneId, 8, true, 60)
+ {
+ }
+ public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout)
+ : this(nodeUrls, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, IoTDBConstant.TREE_SQL_DIALECT, "")
+ {
+
+ }
+ protected internal SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, string sqlDialect, string database)
+ {
+ if (nodeUrls.Count == 0)
{
- _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", _host, _port);
+ throw new ArgumentException("nodeUrls shouldn't be empty.");
}
- }
+ _nodeUrls = nodeUrls;
+ _endPoints = _utilFunctions.ParseSeedNodeUrls(nodeUrls);
+ _username = username;
+ _password = password;
+ _zoneId = zoneId;
+ _fetchSize = fetchSize;
+ _debugMode = false;
+ _poolSize = poolSize;
+ _enableRpcCompression = enableRpcCompression;
+ _timeout = timeout;
+ _sqlDialect = sqlDialect;
+ _database = database;
}
- }
- else
- {
- int startIndex = 0;
- for (var index = 0; index < _poolSize; index++)
+ public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string errMsg, bool retryOnFailure = true, bool putClientBack = true)
{
- bool isConnected = false;
- for (int i = 0; i < _endPoints.Count; i++)
- {
- var endPointIndex = (startIndex + i) % _endPoints.Count;
- var endPoint = _endPoints[endPointIndex];
+ Client client = _clients.Take();
try
{
- var client = await CreateAndOpen(endPoint.Ip, endPoint.Port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
- _clients.Add(client);
- isConnected = true;
- startIndex = (endPointIndex + 1) % _endPoints.Count;
- break;
+ var resp = await operation(client);
+ return resp;
}
- catch (Exception e)
+ catch (TException ex)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", endPoint.Ip, endPoint.Port);
- }
+ if (retryOnFailure)
+ {
+ try
+ {
+ client = await Reconnect(client);
+ return await operation(client);
+ }
+ catch (TException retryEx)
+ {
+ throw new TException(errMsg, retryEx);
+ }
+ }
+ else
+ {
+ throw new TException(errMsg, ex);
+ }
}
- }
- if (!isConnected) // current client could not connect to any endpoint
- {
- throw new TException("Error occurs when opening session pool. Could not connect to any server", null);
- }
+ catch (Exception ex)
+ {
+ if (retryOnFailure)
+ {
+ try
+ {
+ client = await Reconnect(client);
+ return await operation(client);
+ }
+ catch (TException retryEx)
+ {
+ throw new TException(errMsg, retryEx);
+ }
+ }
+ else
+ {
+ throw new TException(errMsg, ex);
+ }
+ }
+ finally
+ {
+ if (putClientBack)
+ {
+ _clients.Add(client);
+ }
+ }
}
- }
+ /// <summary>
+ /// Gets or sets the amount of time a Session will wait for a send operation to complete successfully.
+ /// </summary>
+ /// <remarks> The send time-out value, in milliseconds. The default is 10000.</remarks>
+ public int TimeOut { get; set; } = 10000;
- if (_clients.ClientQueue.Count != _poolSize)
- {
- throw new TException(string.Format("Error occurs when opening session pool. Client pool size is not equal to the expected size. Client pool size: {0}, expected size: {1}, Please check the server status", _clients.ClientQueue.Count, _poolSize), null);
- }
- _isClose = false;
- }
+ ILoggerFactory factory;
+ private bool disposedValue;
-
- public async Task<Client> Reconnect(Client originalClient = null, CancellationToken cancellationToken = default)
- {
- originalClient?.Transport.Close();
-
- if (_nodeUrls.Count == 0)
- {
- for (int attempt = 1; attempt <= RetryNum; attempt++)
+ public void OpenDebugMode(Action<ILoggingBuilder> configure)
{
- try
- {
- var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
- return client;
- }
- catch (Exception e)
- {
- if (_debugMode)
- {
- _logger.LogWarning(e, "Attempt reconnecting to {0}:{1} failed", _host, _port);
- }
- }
+ _debugMode = true;
+ factory = LoggerFactory.Create(configure);
+ _logger = factory.CreateLogger(nameof(Apache.IoTDB));
}
- }
- else
- {
- int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
- if (startIndex == -1)
+
+ public void CloseDebugMode()
{
- throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
+ _debugMode = false;
}
- for (int attempt = 1; attempt <= RetryNum; attempt++)
+ public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default)
{
- for (int i = 0; i < _endPoints.Count; i++)
- {
- int j = (startIndex + i) % _endPoints.Count;
- try
- {
- var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
- return client;
- }
- catch (Exception e)
- {
- if (_debugMode)
- {
- _logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
- }
- }
- }
+ _enableRpcCompression = enableRpcCompression;
+ await Open(cancellationToken);
}
- }
- throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null);
- }
-
-
- public bool IsOpen() => !_isClose;
-
- public async Task Close()
- {
- if (_isClose)
- {
- return;
- }
-
- foreach (var client in _clients.ClientQueue.AsEnumerable())
- {
- var closeSessionRequest = new TSCloseSessionReq(client.SessionId);
- try
+ public async Task Open(CancellationToken cancellationToken = default)
{
- await client.ServiceClient.closeSessionAsync(closeSessionRequest);
+ _clients = new ConcurrentClientQueue();
+ _clients.Timeout = _timeout * 5;
+
+ if (_nodeUrls.Count == 0)
+ {
+ for (var index = 0; index < _poolSize; index++)
+ {
+ try
+ {
+ _clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken));
+ }
+ catch (Exception e)
+ {
+ if (_debugMode)
+ {
+ _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", _host, _port);
+ }
+ }
+ }
+ }
+ else
+ {
+ int startIndex = 0;
+ for (var index = 0; index < _poolSize; index++)
+ {
+ bool isConnected = false;
+ for (int i = 0; i < _endPoints.Count; i++)
+ {
+ var endPointIndex = (startIndex + i) % _endPoints.Count;
+ var endPoint = _endPoints[endPointIndex];
+ try
+ {
+ var client = await CreateAndOpen(endPoint.Ip, endPoint.Port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
+ _clients.Add(client);
+ isConnected = true;
+ startIndex = (endPointIndex + 1) % _endPoints.Count;
+ break;
+ }
+ catch (Exception e)
+ {
+ if (_debugMode)
+ {
+ _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", endPoint.Ip, endPoint.Port);
+ }
+ }
+ }
+ if (!isConnected) // current client could not connect to any endpoint
+ {
+ throw new TException("Error occurs when opening session pool. Could not connect to any server", null);
+ }
+ }
+ }
+
+ if (_clients.ClientQueue.Count != _poolSize)
+ {
+ throw new TException(string.Format("Error occurs when opening session pool. Client pool size is not equal to the expected size. Client pool size: {0}, expected size: {1}, Please check the server status", _clients.ClientQueue.Count, _poolSize), null);
+ }
+ _isClose = false;
}
- catch (TException e)
+
+
+ public async Task<Client> Reconnect(Client originalClient = null, CancellationToken cancellationToken = default)
{
- throw new TException("Error occurs when closing session at server. Maybe server is down", e);
+ originalClient?.Transport.Close();
+
+ if (_nodeUrls.Count == 0)
+ {
+ for (int attempt = 1; attempt <= RetryNum; attempt++)
+ {
+ try
+ {
+ var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
+ return client;
+ }
+ catch (Exception e)
+ {
+ if (_debugMode)
+ {
+ _logger.LogWarning(e, "Attempt reconnecting to {0}:{1} failed", _host, _port);
+ }
+ }
+ }
+ }
+ else
+ {
+ int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
+ if (startIndex == -1)
+ {
+ throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
+ }
+
+ for (int attempt = 1; attempt <= RetryNum; attempt++)
+ {
+ for (int i = 0; i < _endPoints.Count; i++)
+ {
+ int j = (startIndex + i) % _endPoints.Count;
+ try
+ {
+ var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken);
+ return client;
+ }
+ catch (Exception e)
+ {
+ if (_debugMode)
+ {
+ _logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
+ }
+ }
+ }
+ }
+ }
+
+ throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null);
}
- finally
+
+ public bool IsOpen() => !_isClose;
+
+ public async Task Close()
{
- _isClose = true;
-
- client.Transport?.Close();
- }
- }
- }
-
- public async Task SetTimeZone(string zoneId)
- {
- _zoneId = zoneId;
-
- foreach (var client in _clients.ClientQueue.AsEnumerable())
- {
- var req = new TSSetTimeZoneReq(client.SessionId, zoneId);
- try
- {
- var resp = await client.ServiceClient.setTimeZoneAsync(req);
- if (_debugMode)
- {
- _logger.LogInformation("setting time zone_id as {0}, server message:{1}", zoneId, resp.Message);
- }
- }
- catch (TException e)
- {
- throw new TException("could not set time zone", e);
- }
- }
- }
-
- public async Task<string> GetTimeZone()
- {
- if (_zoneId != "")
- {
- return _zoneId;
- }
-
- var client = _clients.Take();
-
- try
- {
- var response = await client.ServiceClient.getTimeZoneAsync(client.SessionId);
-
- return response?.TimeZone;
- }
- catch (TException e)
- {
- throw new TException("could not get time zone", e);
- }
- finally
- {
- _clients.Add(client);
- }
- }
-
- private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCompression, int timeout, string sqlDialect, string database, CancellationToken cancellationToken = default)
- {
- var tcpClient = new TcpClient(host, port);
- tcpClient.SendTimeout = timeout;
- tcpClient.ReceiveTimeout = timeout;
- var transport = new TFramedTransport(new TSocketTransport(tcpClient, null));
-
- if (!transport.IsOpen)
- {
- await transport.OpenAsync(cancellationToken);
- }
-
- var client = enableRpcCompression ?
- new IClientRPCService.Client(new TCompactProtocol(transport)) :
- new IClientRPCService.Client(new TBinaryProtocol(transport));
-
- var openReq = new TSOpenSessionReq(ProtocolVersion, _zoneId, _username)
- {
- Password = _password,
- };
- if (openReq.Configuration == null)
- {
- openReq.Configuration = new Dictionary<string, string>();
- }
- openReq.Configuration.Add("sql_dialect", sqlDialect);
- if (!String.IsNullOrEmpty(database))
- {
- openReq.Configuration.Add("db", database);
- }
-
- try
- {
- var openResp = await client.openSessionAsync(openReq, cancellationToken);
-
- if (openResp.ServerProtocolVersion != ProtocolVersion)
- {
- throw new TException($"Protocol Differ, Client version is {ProtocolVersion} but Server version is {openResp.ServerProtocolVersion}", null);
- }
-
- if (openResp.ServerProtocolVersion == 0)
- {
- throw new TException("Protocol not supported", null);
- }
-
- var sessionId = openResp.SessionId;
- var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken);
-
- var endpoint = new TEndPoint(host, port);
-
- var returnClient = new Client(
- client,
- sessionId,
- statementId,
- transport,
- endpoint);
-
- return returnClient;
- }
- catch (Exception)
- {
- transport.Close();
-
- throw;
- }
- }
-
- public async Task<int> CreateDatabase(string dbName)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, dbName);
-
- if (_debugMode)
+ if (_isClose)
{
- _logger.LogInformation("create database {0} successfully, server message is {1}", dbName, status.Message);
+ return;
}
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when creating database"
- );
- }
-
- [Obsolete("This method is deprecated, please use createDatabase instead.")]
- public async Task<int> SetStorageGroup(string groupName)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, groupName);
- if (_debugMode)
+ foreach (var client in _clients.ClientQueue.AsEnumerable())
{
- _logger.LogInformation("set storage group {0} successfully, server message is {1}", groupName, status.Message);
- }
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when setting storage group"
- );
- }
- public async Task<int> CreateTimeSeries(
- string tsPath,
- TSDataType dataType,
- TSEncoding encoding,
- Compressor compressor)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSCreateTimeseriesReq(
- client.SessionId,
- tsPath,
- (int)dataType,
- (int)encoding,
- (int)compressor);
-
- var status = await client.ServiceClient.createTimeseriesAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("creating time series {0} successfully, server message is {1}", tsPath, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when creating time series"
- );
- }
- public async Task<int> CreateAlignedTimeseriesAsync(
- string prefixPath,
- List<string> measurements,
- List<TSDataType> dataTypeLst,
- List<TSEncoding> encodingLst,
- List<Compressor> compressorLst)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var dataTypes = dataTypeLst.ConvertAll(x => (int)x);
- var encodings = encodingLst.ConvertAll(x => (int)x);
- var compressors = compressorLst.ConvertAll(x => (int)x);
-
- var req = new TSCreateAlignedTimeseriesReq(
- client.SessionId,
- prefixPath,
- measurements,
- dataTypes,
- encodings,
- compressors);
-
- var status = await client.ServiceClient.createAlignedTimeseriesAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("creating aligned time series {0} successfully, server message is {1}", prefixPath, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when creating aligned time series"
- );
- }
- public async Task<int> DeleteDatabaseAsync(string dbName)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, new List<string> { dbName });
-
- if (_debugMode)
- {
- _logger.LogInformation("delete database {0} successfully, server message is {1}", dbName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting database"
- );
- }
- [Obsolete("This method is deprecated, please use DeleteDatabaseAsync instead.")]
- public async Task<int> DeleteStorageGroupAsync(string groupName)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, new List<string> { groupName });
-
- if (_debugMode)
- {
- _logger.LogInformation("delete storage group {0} successfully, server message is {1}", groupName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting storage group"
- );
- }
- public async Task<int> DeleteDatabasesAsync(List<string> dbNames)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, dbNames);
-
- if (_debugMode)
- {
- _logger.LogInformation("delete database(s) {0} successfully, server message is {1}", dbNames, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting database(s)"
- );
- }
- [Obsolete("This method is deprecated, please use DeleteDatabasesAsync instead.")]
- public async Task<int> DeleteStorageGroupsAsync(List<string> groupNames)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, groupNames);
-
- if (_debugMode)
- {
- _logger.LogInformation("delete storage group(s) {0} successfully, server message is {1}", groupNames, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting storage group(s)"
- );
- }
- public async Task<int> CreateMultiTimeSeriesAsync(
- List<string> tsPathLst,
- List<TSDataType> dataTypeLst,
- List<TSEncoding> encodingLst,
- List<Compressor> compressorLst)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var dataTypes = dataTypeLst.ConvertAll(x => (int)x);
- var encodings = encodingLst.ConvertAll(x => (int)x);
- var compressors = compressorLst.ConvertAll(x => (int)x);
-
- var req = new TSCreateMultiTimeseriesReq(client.SessionId, tsPathLst, dataTypes, encodings, compressors);
-
- var status = await client.ServiceClient.createMultiTimeseriesAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("creating multiple time series {0}, server message is {1}", tsPathLst, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when creating multiple time series"
- );
- }
- public async Task<int> DeleteTimeSeriesAsync(List<string> pathList)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var status = await client.ServiceClient.deleteTimeseriesAsync(client.SessionId, pathList);
-
- if (_debugMode)
- {
- _logger.LogInformation("deleting multiple time series {0}, server message is {1}", pathList, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting multiple time series"
- );
- }
-
- public async Task<int> DeleteTimeSeriesAsync(string tsPath)
- {
- return await DeleteTimeSeriesAsync(new List<string> { tsPath });
- }
-
- public async Task<bool> CheckTimeSeriesExistsAsync(string tsPath)
- {
- try
- {
- var sql = "SHOW TIMESERIES " + tsPath;
- var sessionDataset = await ExecuteQueryStatementAsync(sql);
- bool timeSeriesExists = sessionDataset.HasNext();
- await sessionDataset.Close(); // be sure to close the sessionDataset to put the client back to the pool
- return timeSeriesExists;
- }
- catch (TException e)
- {
- throw new TException("could not check if certain time series exists", e);
- }
- }
- public async Task<int> DeleteDataAsync(List<string> tsPathLst, long startTime, long endTime)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSDeleteDataReq(client.SessionId, tsPathLst, startTime, endTime);
-
- var status = await client.ServiceClient.deleteDataAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation(
- "delete data from {0}, server message is {1}",
- tsPathLst,
- status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting data"
- );
- }
- public async Task<int> InsertRecordAsync(string deviceId, RowRecord record)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps);
-
- var status = await client.ServiceClient.insertRecordAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting record"
- );
- }
- public async Task<int> InsertAlignedRecordAsync(string deviceId, RowRecord record)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps);
- req.IsAligned = true;
- // ASSERT that the insert plan is aligned
- System.Diagnostics.Debug.Assert(req.IsAligned == true);
-
- var status = await client.ServiceClient.insertRecordAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting record"
- );
- }
-
- public TSInsertStringRecordReq GenInsertStrRecordReq(string deviceId, List<string> measurements,
- List<string> values, long timestamp, long sessionId, bool isAligned = false)
- {
- if (values.Count() != measurements.Count())
- {
- throw new ArgumentException("length of data types does not equal to length of values!");
- }
-
- return new TSInsertStringRecordReq(sessionId, deviceId, measurements, values, timestamp)
- {
- IsAligned = isAligned
- };
- }
- public TSInsertStringRecordsReq GenInsertStringRecordsReq(List<string> deviceIds, List<List<string>> measurementsList,
- List<List<string>> valuesList, List<long> timestamps, long sessionId, bool isAligned = false)
- {
- if (valuesList.Count() != measurementsList.Count())
- {
- throw new ArgumentException("length of data types does not equal to length of values!");
- }
-
- return new TSInsertStringRecordsReq(sessionId, deviceIds, measurementsList, valuesList, timestamps)
- {
- IsAligned = isAligned
- };
- }
-
- public TSInsertRecordsReq GenInsertRecordsReq(List<string> deviceId, List<RowRecord> rowRecords,
- long sessionId)
- {
- var measurementLst = rowRecords.Select(x => x.Measurements).ToList();
- var timestampLst = rowRecords.Select(x => x.Timestamps).ToList();
- var valuesLstInBytes = rowRecords.Select(row => row.ToBytes()).ToList();
-
- return new TSInsertRecordsReq(sessionId, deviceId, measurementLst, valuesLstInBytes, timestampLst);
- }
- public async Task<int> InsertStringRecordAsync(string deviceId, List<string> measurements, List<string> values,
- long timestamp)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertStrRecordReq(deviceId, measurements, values, timestamp, client.SessionId);
-
- var status = await client.ServiceClient.insertStringRecordAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one string record to device {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting a string record"
- );
- }
- public async Task<int> InsertAlignedStringRecordAsync(string deviceId, List<string> measurements, List<string> values,
- long timestamp)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertStrRecordReq(deviceId, measurements, values, timestamp, client.SessionId, true);
-
- var status = await client.ServiceClient.insertStringRecordAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting a string record"
- );
- }
- public async Task<int> InsertStringRecordsAsync(List<string> deviceIds, List<List<string>> measurements, List<List<string>> values,
- List<long> timestamps)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertStringRecordsReq(deviceIds, measurements, values, timestamps, client.SessionId);
-
- var status = await client.ServiceClient.insertStringRecordsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert string records to devices {0}, server message: {1}", deviceIds, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting string records"
- );
- }
- public async Task<int> InsertAlignedStringRecordsAsync(List<string> deviceIds, List<List<string>> measurements, List<List<string>> values,
- List<long> timestamps)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertStringRecordsReq(deviceIds, measurements, values, timestamps, client.SessionId, true);
-
- var status = await client.ServiceClient.insertStringRecordsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert string records to devices {0}, server message: {1}", deviceIds, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting string records"
- );
- }
- public async Task<int> InsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId);
-
- var status = await client.ServiceClient.insertRecordsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting records"
- );
- }
- public async Task<int> InsertAlignedRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId);
- req.IsAligned = true;
- // ASSERT that the insert plan is aligned
- System.Diagnostics.Debug.Assert(req.IsAligned == true);
-
- var status = await client.ServiceClient.insertRecordsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting records"
- );
- }
- public TSInsertTabletReq GenInsertTabletReq(Tablet tablet, long sessionId)
- {
- return new TSInsertTabletReq(
- sessionId,
- tablet.InsertTargetName,
- tablet.Measurements,
- tablet.GetBinaryValues(),
- tablet.GetBinaryTimestamps(),
- tablet.GetDataTypes(),
- tablet.RowNumber);
- }
- public async Task<int> InsertTabletAsync(Tablet tablet)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletReq(tablet, client.SessionId);
-
- var status = await client.ServiceClient.insertTabletAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.InsertTargetName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting tablet"
- );
- }
- public async Task<int> InsertAlignedTabletAsync(Tablet tablet)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletReq(tablet, client.SessionId);
- req.IsAligned = true;
-
- var status = await client.ServiceClient.insertTabletAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one aligned tablet to device {0}, server message: {1}", tablet.InsertTargetName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting aligned tablet"
- );
- }
-
- protected internal async Task<int> InsertRelationalTabletAsync(Tablet tablet)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletReq(tablet, client.SessionId);
- req.ColumnCategories = tablet.GetColumnColumnCategories();
- req.WriteToTable = true;
-
- var status = await client.ServiceClient.insertTabletAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one tablet to table {0}, server message: {1}", tablet.InsertTargetName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting tablet"
- );
- }
-
- public TSInsertTabletsReq GenInsertTabletsReq(List<Tablet> tabletLst, long sessionId)
- {
- var deviceIdLst = new List<string>();
- var measurementsLst = new List<List<string>>();
- var valuesLst = new List<byte[]>();
- var timestampsLst = new List<byte[]>();
- var typeLst = new List<List<int>>();
- var sizeLst = new List<int>();
-
- foreach (var tablet in tabletLst)
- {
- var dataTypeValues = tablet.GetDataTypes();
- deviceIdLst.Add(tablet.InsertTargetName);
- measurementsLst.Add(tablet.Measurements);
- valuesLst.Add(tablet.GetBinaryValues());
- timestampsLst.Add(tablet.GetBinaryTimestamps());
- typeLst.Add(dataTypeValues);
- sizeLst.Add(tablet.RowNumber);
- }
-
- return new TSInsertTabletsReq(
- sessionId,
- deviceIdLst,
- measurementsLst,
- valuesLst,
- timestampsLst,
- typeLst,
- sizeLst);
- }
-
- public async Task<int> InsertTabletsAsync(List<Tablet> tabletLst)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletsReq(tabletLst, client.SessionId);
-
- var status = await client.ServiceClient.insertTabletsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert multiple tablets, message: {0}", status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting tablets"
- );
- }
- public async Task<int> InsertAlignedTabletsAsync(List<Tablet> tabletLst)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletsReq(tabletLst, client.SessionId);
- req.IsAligned = true;
-
- var status = await client.ServiceClient.insertTabletsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert multiple aligned tablets, message: {0}", status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting aligned tablets"
- );
- }
-
- public async Task<int> InsertRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords)
- {
- var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList();
- return await InsertRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords);
- }
- public async Task<int> InsertAlignedRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords)
- {
- var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList();
- return await InsertAlignedRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords);
- }
- public async Task<int> InsertStringRecordsOfOneDeviceAsync(string deviceId, List<long> timestamps,
- List<List<string>> measurementsList, List<List<string>> valuesList)
- {
- var joined = timestamps.Zip(measurementsList, (t, m) => new { t, m })
- .Zip(valuesList, (tm, v) => new { tm.t, tm.m, v })
- .OrderBy(x => x.t);
-
- var sortedTimestamps = joined.Select(x => x.t).ToList();
- var sortedMeasurementsList = joined.Select(x => x.m).ToList();
- var sortedValuesList = joined.Select(x => x.v).ToList();
-
- return await InsertStringRecordsOfOneDeviceSortedAsync(deviceId, sortedTimestamps, sortedMeasurementsList, sortedValuesList, false);
- }
- public async Task<int> InsertAlignedStringRecordsOfOneDeviceAsync(string deviceId, List<long> timestamps,
- List<List<string>> measurementsList, List<List<string>> valuesList)
- {
- var joined = timestamps.Zip(measurementsList, (t, m) => new { t, m })
- .Zip(valuesList, (tm, v) => new { tm.t, tm.m, v })
- .OrderBy(x => x.t);
-
- var sortedTimestamps = joined.Select(x => x.t).ToList();
- var sortedMeasurementsList = joined.Select(x => x.m).ToList();
- var sortedValuesList = joined.Select(x => x.v).ToList();
-
- return await InsertStringRecordsOfOneDeviceSortedAsync(deviceId, sortedTimestamps, sortedMeasurementsList, sortedValuesList, true);
- }
- public async Task<int> InsertStringRecordsOfOneDeviceSortedAsync(string deviceId, List<long> timestamps,
- List<List<string>> measurementsList, List<List<string>> valuesList, bool isAligned)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- if (!_utilFunctions.IsSorted(timestamps))
- {
- throw new ArgumentException("insert string records of one device error: timestamp not sorted");
- }
-
- var req = GenInsertStringRecordsOfOneDeviceReq(deviceId, timestamps, measurementsList, valuesList, client.SessionId, isAligned);
-
- var status = await client.ServiceClient.insertStringRecordsOfOneDeviceAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert string records of one device, message: {0}", status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting string records of one device"
- );
- }
- private TSInsertStringRecordsOfOneDeviceReq GenInsertStringRecordsOfOneDeviceReq(string deviceId,
- List<long> timestamps, List<List<string>> measurementsList, List<List<string>> valuesList,
- long sessionId, bool isAligned = false)
- {
- return new TSInsertStringRecordsOfOneDeviceReq(
- sessionId,
- deviceId,
- measurementsList,
- valuesList,
- timestamps)
- {
- IsAligned = isAligned
- };
- }
- private TSInsertRecordsOfOneDeviceReq GenInsertRecordsOfOneDeviceRequest(
- string deviceId,
- List<RowRecord> records,
- long sessionId)
- {
- var values = records.Select(row => row.ToBytes());
- var measurementsLst = records.Select(x => x.Measurements).ToList();
- var timestampLst = records.Select(x => x.Timestamps).ToList();
-
- return new TSInsertRecordsOfOneDeviceReq(
- sessionId,
- deviceId,
- measurementsLst,
- values.ToList(),
- timestampLst);
- }
- public async Task<int> InsertRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var timestampLst = rowRecords.Select(x => x.Timestamps).ToList();
-
- if (!_utilFunctions.IsSorted(timestampLst))
- {
- throw new ArgumentException("insert records of one device error: timestamp not sorted");
- }
-
- var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId);
-
- var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert records of one device, message: {0}", status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting records of one device"
- );
- }
- public async Task<int> InsertAlignedRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var timestampLst = rowRecords.Select(x => x.Timestamps).ToList();
-
- if (!_utilFunctions.IsSorted(timestampLst))
- {
- throw new ArgumentException("insert records of one device error: timestamp not sorted");
- }
-
- var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId);
- req.IsAligned = true;
-
- var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert aligned records of one device, message: {0}", status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when inserting aligned records of one device"
- );
- }
- public async Task<int> TestInsertRecordAsync(string deviceId, RowRecord record)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSInsertRecordReq(
- client.SessionId,
- deviceId,
- record.Measurements,
- record.ToBytes(),
- record.Timestamps);
-
- var status = await client.ServiceClient.testInsertRecordAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when test inserting one record"
- );
- }
- public async Task<int> TestInsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId);
-
- var status = await client.ServiceClient.testInsertRecordsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when test inserting multiple records"
- );
- }
- public async Task<int> TestInsertTabletAsync(Tablet tablet)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletReq(tablet, client.SessionId);
-
- var status = await client.ServiceClient.testInsertTabletAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.InsertTargetName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when test inserting one tablet"
- );
- }
- public async Task<int> TestInsertTabletsAsync(List<Tablet> tabletLst)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = GenInsertTabletsReq(tabletLst, client.SessionId);
-
- var status = await client.ServiceClient.testInsertTabletsAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("insert multiple tablets, message: {0}", status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when test inserting multiple tablets"
- );
- }
-
- public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
- {
- // default timeout is 60s
- return await ExecuteQueryStatementAsync(sql, 60 * 1000);
- }
-
- public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql, long timeoutInMs)
- {
- return await ExecuteClientOperationAsync<SessionDataSet>(
- async client =>
- {
- var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId)
- {
- FetchSize = _fetchSize,
- Timeout = timeoutInMs
- };
-
- var resp = await client.ServiceClient.executeQueryStatementAsync(req);
- var status = resp.Status;
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message));
- }
-
- return new SessionDataSet(sql, resp, client, _clients, client.StatementId)
- {
- FetchSize = _fetchSize,
- };
- },
- errMsg: "Error occurs when executing query statement",
- putClientBack: false
- );
- }
-
- public async Task<SessionDataSet> ExecuteStatementAsync(string sql, long timeout)
- {
- return await ExecuteClientOperationAsync<SessionDataSet>(
- async client =>
- {
- var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId)
- {
- FetchSize = _fetchSize,
- Timeout = timeout
- };
-
- var resp = await client.ServiceClient.executeStatementAsync(req);
- var status = resp.Status;
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message));
- }
-
- return new SessionDataSet(sql, resp, client, _clients, client.StatementId)
- {
- FetchSize = _fetchSize,
- };
- },
- errMsg: "Error occurs when executing query statement",
- putClientBack: false
- );
- }
-
-
- public async Task<int> ExecuteNonQueryStatementAsync(string sql)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- String previousDB = _database;
- var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId);
-
- var resp = await client.ServiceClient.executeUpdateStatementAsync(req);
- var status = resp.Status;
-
- if (resp.__isset.database)
- {
- this._database = resp.Database;
- }
- if (_database != previousDB)
- {
- // all client should switch to the same database
- foreach (var c in _clients.ClientQueue)
- {
+ var closeSessionRequest = new TSCloseSessionReq(client.SessionId);
try
{
- if (c != client)
- {
- var switchReq = new TSExecuteStatementReq(c.SessionId, sql, c.StatementId);
- await c.ServiceClient.executeUpdateStatementAsync(switchReq);
- }
+ await client.ServiceClient.closeSessionAsync(closeSessionRequest);
}
- catch (Exception e)
+ catch (TException e)
{
- _logger.LogError("switch database from {0} to {1} failed for {2}, error: {3}", previousDB, _database, c.SessionId, e.Message);
+ throw new TException("Error occurs when closing session at server. Maybe server is down", e);
}
- }
- _logger.LogInformation("switch database from {0} to {1}", previousDB, _database);
+ finally
+ {
+ _isClose = true;
+
+ client.Transport?.Close();
+ }
}
+ }
- if (_debugMode)
- {
- _logger.LogInformation("execute non-query statement {0} message: {1}", sql, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when executing non-query statement"
- );
- }
- public async Task<SessionDataSet> ExecuteRawDataQuery(List<string> paths, long startTime, long endTime)
- {
- return await ExecuteClientOperationAsync<SessionDataSet>(
- async client =>
- {
- var req = new TSRawDataQueryReq(client.SessionId, paths, startTime, endTime, client.StatementId)
- {
- FetchSize = _fetchSize,
- EnableRedirectQuery = false
- };
-
- var resp = await client.ServiceClient.executeRawDataQueryAsync(req);
- var status = resp.Status;
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("execute raw data query failed, message: {0}", status.Message));
- }
-
- return new SessionDataSet("", resp, client, _clients, client.StatementId)
- {
- FetchSize = _fetchSize,
- };
- },
- errMsg: "Error occurs when executing raw data query",
- putClientBack: false
- );
- }
- public async Task<SessionDataSet> ExecuteLastDataQueryAsync(List<string> paths, long lastTime)
- {
- return await ExecuteClientOperationAsync<SessionDataSet>(
- async client =>
- {
- var req = new TSLastDataQueryReq(client.SessionId, paths, lastTime, client.StatementId)
- {
- FetchSize = _fetchSize,
- EnableRedirectQuery = false
- };
-
- var resp = await client.ServiceClient.executeLastDataQueryAsync(req);
- var status = resp.Status;
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("execute last data query failed, message: {0}", status.Message));
- }
-
- return new SessionDataSet("", resp, client, _clients, client.StatementId)
- {
- FetchSize = _fetchSize,
- };
- },
- errMsg: "Error occurs when executing last data query",
- putClientBack: false
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<int> CreateSchemaTemplateAsync(Template template)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSCreateSchemaTemplateReq(client.SessionId, template.Name, template.ToBytes());
-
- var status = await client.ServiceClient.createSchemaTemplateAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("create schema template {0} message: {1}", template.Name, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when creating schema template"
- );
- }
-
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<int> DropSchemaTemplateAsync(string templateName)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSDropSchemaTemplateReq(client.SessionId, templateName);
-
- var status = await client.ServiceClient.dropSchemaTemplateAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("drop schema template {0} message: {1}", templateName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when dropping schema template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<int> SetSchemaTemplateAsync(string templateName, string prefixPath)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSSetSchemaTemplateReq(client.SessionId, templateName, prefixPath);
-
- var status = await client.ServiceClient.setSchemaTemplateAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("set schema template {0} message: {1}", templateName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when setting schema template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<int> UnsetSchemaTemplateAsync(string prefixPath, string templateName)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSUnsetSchemaTemplateReq(client.SessionId, prefixPath, templateName);
-
- var status = await client.ServiceClient.unsetSchemaTemplateAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("unset schema template {0} message: {1}", templateName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when unsetting schema template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<int> DeleteNodeInTemplateAsync(string templateName, string path)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSPruneSchemaTemplateReq(client.SessionId, templateName, path);
-
- var status = await client.ServiceClient.pruneSchemaTemplateAsync(req);
-
- if (_debugMode)
- {
- _logger.LogInformation("delete node in template {0} message: {1}", templateName, status.Message);
- }
-
- return _utilFunctions.VerifySuccess(status);
- },
- errMsg: "Error occurs when deleting node in template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<int> CountMeasurementsInTemplateAsync(string name)
- {
- return await ExecuteClientOperationAsync<int>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, name, (int)TemplateQueryType.COUNT_MEASUREMENTS);
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("count measurements in template {0} message: {1}", name, status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("count measurements in template failed, template name: {0}, message: {1}", name, status.Message));
- }
- return resp.Count;
- },
- errMsg: "Error occurs when counting measurements in template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<bool> IsMeasurementInTemplateAsync(string templateName, string path)
- {
- return await ExecuteClientOperationAsync<bool>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.IS_MEASUREMENT);
- req.Measurement = path;
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("is measurement in template {0} message: {1}", templateName, status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("is measurement in template failed, template name: {0}, message: {1}", templateName, status.Message));
- }
- return resp.Result;
- },
- errMsg: "Error occurs when checking measurement in template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<bool> IsPathExistInTemplateAsync(string templateName, string path)
- {
- return await ExecuteClientOperationAsync<bool>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.PATH_EXIST);
- req.Measurement = path;
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("is path exist in template {0} message: {1}", templateName, status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("is path exist in template failed, template name: {0}, message: {1}", templateName, status.Message));
- }
- return resp.Result;
- },
- errMsg: "Error occurs when checking path exist in template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<List<string>> ShowMeasurementsInTemplateAsync(string templateName, string pattern = "")
- {
- return await ExecuteClientOperationAsync<List<string>>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_MEASUREMENTS);
- req.Measurement = pattern;
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("get measurements in template {0} message: {1}", templateName, status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("show measurements in template failed, template name: {0}, pattern: {1}, message: {2}", templateName, pattern, status.Message));
- }
- return resp.Measurements;
- },
- errMsg: "Error occurs when showing measurements in template"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<List<string>> ShowAllTemplatesAsync()
- {
- return await ExecuteClientOperationAsync<List<string>>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, "", (int)TemplateQueryType.SHOW_TEMPLATES);
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("get all templates message: {0}", status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("show all templates failed, message: {0}", status.Message));
- }
- return resp.Measurements;
- },
- errMsg: "Error occurs when getting all templates"
- );
- }
-
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<List<string>> ShowPathsTemplateSetOnAsync(string templateName)
- {
- return await ExecuteClientOperationAsync<List<string>>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_SET_TEMPLATES);
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("get paths template set on {0} message: {1}", templateName, status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("show paths template set on failed, template name: {0}, message: {1}", templateName, status.Message));
- }
- return resp.Measurements;
- },
- errMsg: "Error occurs when getting paths template set on"
- );
- }
- [Obsolete("This method is obsolete. Use SQL instead.", false)]
- public async Task<List<string>> ShowPathsTemplateUsingOnAsync(string templateName)
- {
- return await ExecuteClientOperationAsync<List<string>>(
- async client =>
- {
- var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_USING_TEMPLATES);
-
- var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
- var status = resp.Status;
-
- if (_debugMode)
- {
- _logger.LogInformation("get paths template using on {0} message: {1}", templateName, status.Message);
- }
-
- if (_utilFunctions.VerifySuccess(status) == -1)
- {
- throw new Exception(string.Format("show paths template using on failed, template name: {0}, message: {1}", templateName, status.Message));
- }
- return resp.Measurements;
- },
- errMsg: "Error occurs when getting paths template using on"
- );
- }
-
- protected virtual void Dispose(bool disposing)
- {
- if (!disposedValue)
- {
- if (disposing)
+ public async Task SetTimeZone(string zoneId)
{
+ _zoneId = zoneId;
+
+ foreach (var client in _clients.ClientQueue.AsEnumerable())
+ {
+ var req = new TSSetTimeZoneReq(client.SessionId, zoneId);
+ try
+ {
+ var resp = await client.ServiceClient.setTimeZoneAsync(req);
+ if (_debugMode)
+ {
+ _logger.LogInformation("setting time zone_id as {0}, server message:{1}", zoneId, resp.Message);
+ }
+ }
+ catch (TException e)
+ {
+ throw new TException("could not set time zone", e);
+ }
+ }
+ }
+
+ public async Task<string> GetTimeZone()
+ {
+ if (_zoneId != "")
+ {
+ return _zoneId;
+ }
+
+ var client = _clients.Take();
+
+ try
+ {
+ var response = await client.ServiceClient.getTimeZoneAsync(client.SessionId);
+
+ return response?.TimeZone;
+ }
+ catch (TException e)
+ {
+ throw new TException("could not get time zone", e);
+ }
+ finally
+ {
+ _clients.Add(client);
+ }
+ }
+
+ private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCompression, int timeout, string sqlDialect, string database, CancellationToken cancellationToken = default)
+ {
+ var tcpClient = new TcpClient(host, port);
+ tcpClient.SendTimeout = timeout;
+ tcpClient.ReceiveTimeout = timeout;
+ var transport = new TFramedTransport(new TSocketTransport(tcpClient, null));
+
+ if (!transport.IsOpen)
+ {
+ await transport.OpenAsync(cancellationToken);
+ }
+
+ var client = enableRpcCompression ?
+ new IClientRPCService.Client(new TCompactProtocol(transport)) :
+ new IClientRPCService.Client(new TBinaryProtocol(transport));
+
+ var openReq = new TSOpenSessionReq(ProtocolVersion, _zoneId, _username)
+ {
+ Password = _password,
+ };
+ if (openReq.Configuration == null)
+ {
+ openReq.Configuration = new Dictionary<string, string>();
+ }
+ openReq.Configuration.Add("sql_dialect", sqlDialect);
+ if (!String.IsNullOrEmpty(database))
+ {
+ openReq.Configuration.Add("db", database);
+ }
+
+ try
+ {
+ var openResp = await client.openSessionAsync(openReq, cancellationToken);
+
+ if (openResp.ServerProtocolVersion != ProtocolVersion)
+ {
+ throw new TException($"Protocol Differ, Client version is {ProtocolVersion} but Server version is {openResp.ServerProtocolVersion}", null);
+ }
+
+ if (openResp.ServerProtocolVersion == 0)
+ {
+ throw new TException("Protocol not supported", null);
+ }
+
+ var sessionId = openResp.SessionId;
+ var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken);
+
+ var endpoint = new TEndPoint(host, port);
+
+ var returnClient = new Client(
+ client,
+ sessionId,
+ statementId,
+ transport,
+ endpoint);
+
+ return returnClient;
+ }
+ catch (Exception)
+ {
+ transport.Close();
+
+ throw;
+ }
+ }
+
+ public async Task<int> CreateDatabase(string dbName)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, dbName);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("create database {0} successfully, server message is {1}", dbName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when creating database"
+ );
+ }
+
+ [Obsolete("This method is deprecated, please use createDatabase instead.")]
+ public async Task<int> SetStorageGroup(string groupName)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, groupName);
+ if (_debugMode)
+ {
+ _logger.LogInformation("set storage group {0} successfully, server message is {1}", groupName, status.Message);
+ }
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when setting storage group"
+ );
+ }
+ public async Task<int> CreateTimeSeries(
+ string tsPath,
+ TSDataType dataType,
+ TSEncoding encoding,
+ Compressor compressor)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSCreateTimeseriesReq(
+ client.SessionId,
+ tsPath,
+ (int)dataType,
+ (int)encoding,
+ (int)compressor);
+
+ var status = await client.ServiceClient.createTimeseriesAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("creating time series {0} successfully, server message is {1}", tsPath, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when creating time series"
+ );
+ }
+ public async Task<int> CreateAlignedTimeseriesAsync(
+ string prefixPath,
+ List<string> measurements,
+ List<TSDataType> dataTypeLst,
+ List<TSEncoding> encodingLst,
+ List<Compressor> compressorLst)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var dataTypes = dataTypeLst.ConvertAll(x => (int)x);
+ var encodings = encodingLst.ConvertAll(x => (int)x);
+ var compressors = compressorLst.ConvertAll(x => (int)x);
+
+ var req = new TSCreateAlignedTimeseriesReq(
+ client.SessionId,
+ prefixPath,
+ measurements,
+ dataTypes,
+ encodings,
+ compressors);
+
+ var status = await client.ServiceClient.createAlignedTimeseriesAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("creating aligned time series {0} successfully, server message is {1}", prefixPath, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when creating aligned time series"
+ );
+ }
+ public async Task<int> DeleteDatabaseAsync(string dbName)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, new List<string> { dbName });
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("delete database {0} successfully, server message is {1}", dbName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting database"
+ );
+ }
+ [Obsolete("This method is deprecated, please use DeleteDatabaseAsync instead.")]
+ public async Task<int> DeleteStorageGroupAsync(string groupName)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, new List<string> { groupName });
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("delete storage group {0} successfully, server message is {1}", groupName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting storage group"
+ );
+ }
+ public async Task<int> DeleteDatabasesAsync(List<string> dbNames)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, dbNames);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("delete database(s) {0} successfully, server message is {1}", dbNames, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting database(s)"
+ );
+ }
+ [Obsolete("This method is deprecated, please use DeleteDatabasesAsync instead.")]
+ public async Task<int> DeleteStorageGroupsAsync(List<string> groupNames)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, groupNames);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("delete storage group(s) {0} successfully, server message is {1}", groupNames, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting storage group(s)"
+ );
+ }
+ public async Task<int> CreateMultiTimeSeriesAsync(
+ List<string> tsPathLst,
+ List<TSDataType> dataTypeLst,
+ List<TSEncoding> encodingLst,
+ List<Compressor> compressorLst)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var dataTypes = dataTypeLst.ConvertAll(x => (int)x);
+ var encodings = encodingLst.ConvertAll(x => (int)x);
+ var compressors = compressorLst.ConvertAll(x => (int)x);
+
+ var req = new TSCreateMultiTimeseriesReq(client.SessionId, tsPathLst, dataTypes, encodings, compressors);
+
+ var status = await client.ServiceClient.createMultiTimeseriesAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("creating multiple time series {0}, server message is {1}", tsPathLst, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when creating multiple time series"
+ );
+ }
+ public async Task<int> DeleteTimeSeriesAsync(List<string> pathList)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var status = await client.ServiceClient.deleteTimeseriesAsync(client.SessionId, pathList);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("deleting multiple time series {0}, server message is {1}", pathList, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting multiple time series"
+ );
+ }
+
+ public async Task<int> DeleteTimeSeriesAsync(string tsPath)
+ {
+ return await DeleteTimeSeriesAsync(new List<string> { tsPath });
+ }
+
+ public async Task<bool> CheckTimeSeriesExistsAsync(string tsPath)
+ {
+ try
+ {
+ var sql = "SHOW TIMESERIES " + tsPath;
+ var sessionDataset = await ExecuteQueryStatementAsync(sql);
+ bool timeSeriesExists = sessionDataset.HasNext();
+ await sessionDataset.Close(); // be sure to close the sessionDataset to put the client back to the pool
+ return timeSeriesExists;
+ }
+ catch (TException e)
+ {
+ throw new TException("could not check if certain time series exists", e);
+ }
+ }
+ public async Task<int> DeleteDataAsync(List<string> tsPathLst, long startTime, long endTime)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSDeleteDataReq(client.SessionId, tsPathLst, startTime, endTime);
+
+ var status = await client.ServiceClient.deleteDataAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation(
+ "delete data from {0}, server message is {1}",
+ tsPathLst,
+ status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting data"
+ );
+ }
+ public async Task<int> InsertRecordAsync(string deviceId, RowRecord record)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps);
+
+ var status = await client.ServiceClient.insertRecordAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting record"
+ );
+ }
+ public async Task<int> InsertAlignedRecordAsync(string deviceId, RowRecord record)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps);
+ req.IsAligned = true;
+ // ASSERT that the insert plan is aligned
+ System.Diagnostics.Debug.Assert(req.IsAligned == true);
+
+ var status = await client.ServiceClient.insertRecordAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting record"
+ );
+ }
+
+ public TSInsertStringRecordReq GenInsertStrRecordReq(string deviceId, List<string> measurements,
+ List<string> values, long timestamp, long sessionId, bool isAligned = false)
+ {
+ if (values.Count() != measurements.Count())
+ {
+ throw new ArgumentException("length of data types does not equal to length of values!");
+ }
+
+ return new TSInsertStringRecordReq(sessionId, deviceId, measurements, values, timestamp)
+ {
+ IsAligned = isAligned
+ };
+ }
+ public TSInsertStringRecordsReq GenInsertStringRecordsReq(List<string> deviceIds, List<List<string>> measurementsList,
+ List<List<string>> valuesList, List<long> timestamps, long sessionId, bool isAligned = false)
+ {
+ if (valuesList.Count() != measurementsList.Count())
+ {
+ throw new ArgumentException("length of data types does not equal to length of values!");
+ }
+
+ return new TSInsertStringRecordsReq(sessionId, deviceIds, measurementsList, valuesList, timestamps)
+ {
+ IsAligned = isAligned
+ };
+ }
+
+ public TSInsertRecordsReq GenInsertRecordsReq(List<string> deviceId, List<RowRecord> rowRecords,
+ long sessionId)
+ {
+ var measurementLst = rowRecords.Select(x => x.Measurements).ToList();
+ var timestampLst = rowRecords.Select(x => x.Timestamps).ToList();
+ var valuesLstInBytes = rowRecords.Select(row => row.ToBytes()).ToList();
+
+ return new TSInsertRecordsReq(sessionId, deviceId, measurementLst, valuesLstInBytes, timestampLst);
+ }
+ public async Task<int> InsertStringRecordAsync(string deviceId, List<string> measurements, List<string> values,
+ long timestamp)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertStrRecordReq(deviceId, measurements, values, timestamp, client.SessionId);
+
+ var status = await client.ServiceClient.insertStringRecordAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one string record to device {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting a string record"
+ );
+ }
+ public async Task<int> InsertAlignedStringRecordAsync(string deviceId, List<string> measurements, List<string> values,
+ long timestamp)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertStrRecordReq(deviceId, measurements, values, timestamp, client.SessionId, true);
+
+ var status = await client.ServiceClient.insertStringRecordAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting a string record"
+ );
+ }
+ public async Task<int> InsertStringRecordsAsync(List<string> deviceIds, List<List<string>> measurements, List<List<string>> values,
+ List<long> timestamps)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertStringRecordsReq(deviceIds, measurements, values, timestamps, client.SessionId);
+
+ var status = await client.ServiceClient.insertStringRecordsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert string records to devices {0}, server message: {1}", deviceIds, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting string records"
+ );
+ }
+ public async Task<int> InsertAlignedStringRecordsAsync(List<string> deviceIds, List<List<string>> measurements, List<List<string>> values,
+ List<long> timestamps)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertStringRecordsReq(deviceIds, measurements, values, timestamps, client.SessionId, true);
+
+ var status = await client.ServiceClient.insertStringRecordsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert string records to devices {0}, server message: {1}", deviceIds, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting string records"
+ );
+ }
+ public async Task<int> InsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId);
+
+ var status = await client.ServiceClient.insertRecordsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting records"
+ );
+ }
+ public async Task<int> InsertAlignedRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId);
+ req.IsAligned = true;
+ // ASSERT that the insert plan is aligned
+ System.Diagnostics.Debug.Assert(req.IsAligned == true);
+
+ var status = await client.ServiceClient.insertRecordsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting records"
+ );
+ }
+ public TSInsertTabletReq GenInsertTabletReq(Tablet tablet, long sessionId)
+ {
+ return new TSInsertTabletReq(
+ sessionId,
+ tablet.InsertTargetName,
+ tablet.Measurements,
+ tablet.GetBinaryValues(),
+ tablet.GetBinaryTimestamps(),
+ tablet.GetDataTypes(),
+ tablet.RowNumber);
+ }
+ public async Task<int> InsertTabletAsync(Tablet tablet)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletReq(tablet, client.SessionId);
+
+ var status = await client.ServiceClient.insertTabletAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.InsertTargetName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting tablet"
+ );
+ }
+ public async Task<int> InsertAlignedTabletAsync(Tablet tablet)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletReq(tablet, client.SessionId);
+ req.IsAligned = true;
+
+ var status = await client.ServiceClient.insertTabletAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one aligned tablet to device {0}, server message: {1}", tablet.InsertTargetName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting aligned tablet"
+ );
+ }
+
+ protected internal async Task<int> InsertRelationalTabletAsync(Tablet tablet)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletReq(tablet, client.SessionId);
+ req.ColumnCategories = tablet.GetColumnColumnCategories();
+ req.WriteToTable = true;
+
+ var status = await client.ServiceClient.insertTabletAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one tablet to table {0}, server message: {1}", tablet.InsertTargetName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting tablet"
+ );
+ }
+
+ public TSInsertTabletsReq GenInsertTabletsReq(List<Tablet> tabletLst, long sessionId)
+ {
+ var deviceIdLst = new List<string>();
+ var measurementsLst = new List<List<string>>();
+ var valuesLst = new List<byte[]>();
+ var timestampsLst = new List<byte[]>();
+ var typeLst = new List<List<int>>();
+ var sizeLst = new List<int>();
+
+ foreach (var tablet in tabletLst)
+ {
+ var dataTypeValues = tablet.GetDataTypes();
+ deviceIdLst.Add(tablet.InsertTargetName);
+ measurementsLst.Add(tablet.Measurements);
+ valuesLst.Add(tablet.GetBinaryValues());
+ timestampsLst.Add(tablet.GetBinaryTimestamps());
+ typeLst.Add(dataTypeValues);
+ sizeLst.Add(tablet.RowNumber);
+ }
+
+ return new TSInsertTabletsReq(
+ sessionId,
+ deviceIdLst,
+ measurementsLst,
+ valuesLst,
+ timestampsLst,
+ typeLst,
+ sizeLst);
+ }
+
+ public async Task<int> InsertTabletsAsync(List<Tablet> tabletLst)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletsReq(tabletLst, client.SessionId);
+
+ var status = await client.ServiceClient.insertTabletsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert multiple tablets, message: {0}", status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting tablets"
+ );
+ }
+ public async Task<int> InsertAlignedTabletsAsync(List<Tablet> tabletLst)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletsReq(tabletLst, client.SessionId);
+ req.IsAligned = true;
+
+ var status = await client.ServiceClient.insertTabletsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert multiple aligned tablets, message: {0}", status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting aligned tablets"
+ );
+ }
+
+ public async Task<int> InsertRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords)
+ {
+ var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList();
+ return await InsertRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords);
+ }
+ public async Task<int> InsertAlignedRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords)
+ {
+ var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList();
+ return await InsertAlignedRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords);
+ }
+ public async Task<int> InsertStringRecordsOfOneDeviceAsync(string deviceId, List<long> timestamps,
+ List<List<string>> measurementsList, List<List<string>> valuesList)
+ {
+ var joined = timestamps.Zip(measurementsList, (t, m) => new { t, m })
+ .Zip(valuesList, (tm, v) => new { tm.t, tm.m, v })
+ .OrderBy(x => x.t);
+
+ var sortedTimestamps = joined.Select(x => x.t).ToList();
+ var sortedMeasurementsList = joined.Select(x => x.m).ToList();
+ var sortedValuesList = joined.Select(x => x.v).ToList();
+
+ return await InsertStringRecordsOfOneDeviceSortedAsync(deviceId, sortedTimestamps, sortedMeasurementsList, sortedValuesList, false);
+ }
+ public async Task<int> InsertAlignedStringRecordsOfOneDeviceAsync(string deviceId, List<long> timestamps,
+ List<List<string>> measurementsList, List<List<string>> valuesList)
+ {
+ var joined = timestamps.Zip(measurementsList, (t, m) => new { t, m })
+ .Zip(valuesList, (tm, v) => new { tm.t, tm.m, v })
+ .OrderBy(x => x.t);
+
+ var sortedTimestamps = joined.Select(x => x.t).ToList();
+ var sortedMeasurementsList = joined.Select(x => x.m).ToList();
+ var sortedValuesList = joined.Select(x => x.v).ToList();
+
+ return await InsertStringRecordsOfOneDeviceSortedAsync(deviceId, sortedTimestamps, sortedMeasurementsList, sortedValuesList, true);
+ }
+ public async Task<int> InsertStringRecordsOfOneDeviceSortedAsync(string deviceId, List<long> timestamps,
+ List<List<string>> measurementsList, List<List<string>> valuesList, bool isAligned)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ if (!_utilFunctions.IsSorted(timestamps))
+ {
+ throw new ArgumentException("insert string records of one device error: timestamp not sorted");
+ }
+
+ var req = GenInsertStringRecordsOfOneDeviceReq(deviceId, timestamps, measurementsList, valuesList, client.SessionId, isAligned);
+
+ var status = await client.ServiceClient.insertStringRecordsOfOneDeviceAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert string records of one device, message: {0}", status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting string records of one device"
+ );
+ }
+ private TSInsertStringRecordsOfOneDeviceReq GenInsertStringRecordsOfOneDeviceReq(string deviceId,
+ List<long> timestamps, List<List<string>> measurementsList, List<List<string>> valuesList,
+ long sessionId, bool isAligned = false)
+ {
+ return new TSInsertStringRecordsOfOneDeviceReq(
+ sessionId,
+ deviceId,
+ measurementsList,
+ valuesList,
+ timestamps)
+ {
+ IsAligned = isAligned
+ };
+ }
+ private TSInsertRecordsOfOneDeviceReq GenInsertRecordsOfOneDeviceRequest(
+ string deviceId,
+ List<RowRecord> records,
+ long sessionId)
+ {
+ var values = records.Select(row => row.ToBytes());
+ var measurementsLst = records.Select(x => x.Measurements).ToList();
+ var timestampLst = records.Select(x => x.Timestamps).ToList();
+
+ return new TSInsertRecordsOfOneDeviceReq(
+ sessionId,
+ deviceId,
+ measurementsLst,
+ values.ToList(),
+ timestampLst);
+ }
+ public async Task<int> InsertRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var timestampLst = rowRecords.Select(x => x.Timestamps).ToList();
+
+ if (!_utilFunctions.IsSorted(timestampLst))
+ {
+ throw new ArgumentException("insert records of one device error: timestamp not sorted");
+ }
+
+ var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId);
+
+ var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert records of one device, message: {0}", status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting records of one device"
+ );
+ }
+ public async Task<int> InsertAlignedRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var timestampLst = rowRecords.Select(x => x.Timestamps).ToList();
+
+ if (!_utilFunctions.IsSorted(timestampLst))
+ {
+ throw new ArgumentException("insert records of one device error: timestamp not sorted");
+ }
+
+ var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId);
+ req.IsAligned = true;
+
+ var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert aligned records of one device, message: {0}", status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when inserting aligned records of one device"
+ );
+ }
+ public async Task<int> TestInsertRecordAsync(string deviceId, RowRecord record)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSInsertRecordReq(
+ client.SessionId,
+ deviceId,
+ record.Measurements,
+ record.ToBytes(),
+ record.Timestamps);
+
+ var status = await client.ServiceClient.testInsertRecordAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when test inserting one record"
+ );
+ }
+ public async Task<int> TestInsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId);
+
+ var status = await client.ServiceClient.testInsertRecordsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when test inserting multiple records"
+ );
+ }
+ public async Task<int> TestInsertTabletAsync(Tablet tablet)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletReq(tablet, client.SessionId);
+
+ var status = await client.ServiceClient.testInsertTabletAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.InsertTargetName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when test inserting one tablet"
+ );
+ }
+ public async Task<int> TestInsertTabletsAsync(List<Tablet> tabletLst)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = GenInsertTabletsReq(tabletLst, client.SessionId);
+
+ var status = await client.ServiceClient.testInsertTabletsAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("insert multiple tablets, message: {0}", status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when test inserting multiple tablets"
+ );
+ }
+
+ public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
+ {
+ // default timeout is 60s
+ return await ExecuteQueryStatementAsync(sql, 60 * 1000);
+ }
+
+ public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql, long timeoutInMs)
+ {
+ return await ExecuteClientOperationAsync<SessionDataSet>(
+ async client =>
+ {
+ var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ Timeout = timeoutInMs
+ };
+
+ var resp = await client.ServiceClient.executeQueryStatementAsync(req);
+ var status = resp.Status;
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message));
+ }
+
+ return new SessionDataSet(sql, resp, client, _clients, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ };
+ },
+ errMsg: "Error occurs when executing query statement",
+ putClientBack: false
+ );
+ }
+
+ public async Task<SessionDataSet> ExecuteStatementAsync(string sql, long timeout)
+ {
+ return await ExecuteClientOperationAsync<SessionDataSet>(
+ async client =>
+ {
+ var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ Timeout = timeout
+ };
+
+ var resp = await client.ServiceClient.executeStatementAsync(req);
+ var status = resp.Status;
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message));
+ }
+
+ return new SessionDataSet(sql, resp, client, _clients, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ };
+ },
+ errMsg: "Error occurs when executing query statement",
+ putClientBack: false
+ );
+ }
+
+
+ public async Task<int> ExecuteNonQueryStatementAsync(string sql)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ String previousDB = _database;
+ var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId);
+
+ var resp = await client.ServiceClient.executeUpdateStatementAsync(req);
+ var status = resp.Status;
+
+ if (resp.__isset.database)
+ {
+ this._database = resp.Database;
+ }
+ if (_database != previousDB)
+ {
+ // all client should switch to the same database
+ foreach (var c in _clients.ClientQueue)
+ {
+ try
+ {
+ if (c != client)
+ {
+ var switchReq = new TSExecuteStatementReq(c.SessionId, sql, c.StatementId);
+ await c.ServiceClient.executeUpdateStatementAsync(switchReq);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError("switch database from {0} to {1} failed for {2}, error: {3}", previousDB, _database, c.SessionId, e.Message);
+ }
+ }
+ _logger.LogInformation("switch database from {0} to {1}", previousDB, _database);
+ }
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("execute non-query statement {0} message: {1}", sql, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when executing non-query statement"
+ );
+ }
+ public async Task<SessionDataSet> ExecuteRawDataQuery(List<string> paths, long startTime, long endTime)
+ {
+ return await ExecuteClientOperationAsync<SessionDataSet>(
+ async client =>
+ {
+ var req = new TSRawDataQueryReq(client.SessionId, paths, startTime, endTime, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ EnableRedirectQuery = false
+ };
+
+ var resp = await client.ServiceClient.executeRawDataQueryAsync(req);
+ var status = resp.Status;
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("execute raw data query failed, message: {0}", status.Message));
+ }
+
+ return new SessionDataSet("", resp, client, _clients, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ };
+ },
+ errMsg: "Error occurs when executing raw data query",
+ putClientBack: false
+ );
+ }
+ public async Task<SessionDataSet> ExecuteLastDataQueryAsync(List<string> paths, long lastTime)
+ {
+ return await ExecuteClientOperationAsync<SessionDataSet>(
+ async client =>
+ {
+ var req = new TSLastDataQueryReq(client.SessionId, paths, lastTime, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ EnableRedirectQuery = false
+ };
+
+ var resp = await client.ServiceClient.executeLastDataQueryAsync(req);
+ var status = resp.Status;
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("execute last data query failed, message: {0}", status.Message));
+ }
+
+ return new SessionDataSet("", resp, client, _clients, client.StatementId)
+ {
+ FetchSize = _fetchSize,
+ };
+ },
+ errMsg: "Error occurs when executing last data query",
+ putClientBack: false
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<int> CreateSchemaTemplateAsync(Template template)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSCreateSchemaTemplateReq(client.SessionId, template.Name, template.ToBytes());
+
+ var status = await client.ServiceClient.createSchemaTemplateAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("create schema template {0} message: {1}", template.Name, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when creating schema template"
+ );
+ }
+
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<int> DropSchemaTemplateAsync(string templateName)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSDropSchemaTemplateReq(client.SessionId, templateName);
+
+ var status = await client.ServiceClient.dropSchemaTemplateAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("drop schema template {0} message: {1}", templateName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when dropping schema template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<int> SetSchemaTemplateAsync(string templateName, string prefixPath)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSSetSchemaTemplateReq(client.SessionId, templateName, prefixPath);
+
+ var status = await client.ServiceClient.setSchemaTemplateAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("set schema template {0} message: {1}", templateName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when setting schema template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<int> UnsetSchemaTemplateAsync(string prefixPath, string templateName)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSUnsetSchemaTemplateReq(client.SessionId, prefixPath, templateName);
+
+ var status = await client.ServiceClient.unsetSchemaTemplateAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("unset schema template {0} message: {1}", templateName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when unsetting schema template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<int> DeleteNodeInTemplateAsync(string templateName, string path)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSPruneSchemaTemplateReq(client.SessionId, templateName, path);
+
+ var status = await client.ServiceClient.pruneSchemaTemplateAsync(req);
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("delete node in template {0} message: {1}", templateName, status.Message);
+ }
+
+ return _utilFunctions.VerifySuccess(status);
+ },
+ errMsg: "Error occurs when deleting node in template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<int> CountMeasurementsInTemplateAsync(string name)
+ {
+ return await ExecuteClientOperationAsync<int>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, name, (int)TemplateQueryType.COUNT_MEASUREMENTS);
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("count measurements in template {0} message: {1}", name, status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("count measurements in template failed, template name: {0}, message: {1}", name, status.Message));
+ }
+ return resp.Count;
+ },
+ errMsg: "Error occurs when counting measurements in template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<bool> IsMeasurementInTemplateAsync(string templateName, string path)
+ {
+ return await ExecuteClientOperationAsync<bool>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.IS_MEASUREMENT);
+ req.Measurement = path;
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("is measurement in template {0} message: {1}", templateName, status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("is measurement in template failed, template name: {0}, message: {1}", templateName, status.Message));
+ }
+ return resp.Result;
+ },
+ errMsg: "Error occurs when checking measurement in template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<bool> IsPathExistInTemplateAsync(string templateName, string path)
+ {
+ return await ExecuteClientOperationAsync<bool>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.PATH_EXIST);
+ req.Measurement = path;
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("is path exist in template {0} message: {1}", templateName, status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("is path exist in template failed, template name: {0}, message: {1}", templateName, status.Message));
+ }
+ return resp.Result;
+ },
+ errMsg: "Error occurs when checking path exist in template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<List<string>> ShowMeasurementsInTemplateAsync(string templateName, string pattern = "")
+ {
+ return await ExecuteClientOperationAsync<List<string>>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_MEASUREMENTS);
+ req.Measurement = pattern;
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("get measurements in template {0} message: {1}", templateName, status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("show measurements in template failed, template name: {0}, pattern: {1}, message: {2}", templateName, pattern, status.Message));
+ }
+ return resp.Measurements;
+ },
+ errMsg: "Error occurs when showing measurements in template"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<List<string>> ShowAllTemplatesAsync()
+ {
+ return await ExecuteClientOperationAsync<List<string>>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, "", (int)TemplateQueryType.SHOW_TEMPLATES);
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("get all templates message: {0}", status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("show all templates failed, message: {0}", status.Message));
+ }
+ return resp.Measurements;
+ },
+ errMsg: "Error occurs when getting all templates"
+ );
+ }
+
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<List<string>> ShowPathsTemplateSetOnAsync(string templateName)
+ {
+ return await ExecuteClientOperationAsync<List<string>>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_SET_TEMPLATES);
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("get paths template set on {0} message: {1}", templateName, status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("show paths template set on failed, template name: {0}, message: {1}", templateName, status.Message));
+ }
+ return resp.Measurements;
+ },
+ errMsg: "Error occurs when getting paths template set on"
+ );
+ }
+ [Obsolete("This method is obsolete. Use SQL instead.", false)]
+ public async Task<List<string>> ShowPathsTemplateUsingOnAsync(string templateName)
+ {
+ return await ExecuteClientOperationAsync<List<string>>(
+ async client =>
+ {
+ var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_USING_TEMPLATES);
+
+ var resp = await client.ServiceClient.querySchemaTemplateAsync(req);
+ var status = resp.Status;
+
+ if (_debugMode)
+ {
+ _logger.LogInformation("get paths template using on {0} message: {1}", templateName, status.Message);
+ }
+
+ if (_utilFunctions.VerifySuccess(status) == -1)
+ {
+ throw new Exception(string.Format("show paths template using on failed, template name: {0}, message: {1}", templateName, status.Message));
+ }
+ return resp.Measurements;
+ },
+ errMsg: "Error occurs when getting paths template using on"
+ );
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposedValue)
+ {
+ if (disposing)
+ {
#if NET461_OR_GREATER || NETSTANDARD2_0
#else
- _clients.ClientQueue.Clear();
+ _clients.ClientQueue.Clear();
#endif
+ }
+ _clients = null;
+ disposedValue = true;
+ }
}
- _clients = null;
- disposedValue = true;
- }
- }
- public void Dispose()
- {
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
+ public void Dispose()
+ {
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
}
- }
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/TableSessionPool.Builder.cs b/src/Apache.IoTDB/TableSessionPool.Builder.cs
index 0c32f4b..fdeb78e 100644
--- a/src/Apache.IoTDB/TableSessionPool.Builder.cs
+++ b/src/Apache.IoTDB/TableSessionPool.Builder.cs
@@ -26,121 +26,121 @@
public partial class TableSessionPool
{
- public class Builder
- {
- private string _host = "localhost";
- private int _port = 6667;
- private string _username = "root";
- private string _password = "root";
- private int _fetchSize = 1024;
- private string _zoneId = "UTC+08:00";
- private int _poolSize = 8;
- private bool _enableRpcCompression = false;
- private int _connectionTimeoutInMs = 500;
- private string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
- private string _database = "";
- private List<string> _nodeUrls = new List<string>();
-
- public Builder SetHost(string host)
+ public class Builder
{
- _host = host;
- return this;
- }
+ private string _host = "localhost";
+ private int _port = 6667;
+ private string _username = "root";
+ private string _password = "root";
+ private int _fetchSize = 1024;
+ private string _zoneId = "UTC+08:00";
+ private int _poolSize = 8;
+ private bool _enableRpcCompression = false;
+ private int _connectionTimeoutInMs = 500;
+ private string _sqlDialect = IoTDBConstant.TREE_SQL_DIALECT;
+ private string _database = "";
+ private List<string> _nodeUrls = new List<string>();
- public Builder SetPort(int port)
- {
- _port = port;
- return this;
- }
+ public Builder SetHost(string host)
+ {
+ _host = host;
+ return this;
+ }
- public Builder SetUsername(string username)
- {
- _username = username;
- return this;
- }
+ public Builder SetPort(int port)
+ {
+ _port = port;
+ return this;
+ }
- public Builder SetPassword(string password)
- {
- _password = password;
- return this;
- }
+ public Builder SetUsername(string username)
+ {
+ _username = username;
+ return this;
+ }
- public Builder SetFetchSize(int fetchSize)
- {
- _fetchSize = fetchSize;
- return this;
- }
+ public Builder SetPassword(string password)
+ {
+ _password = password;
+ return this;
+ }
- public Builder SetZoneId(string zoneId)
- {
- _zoneId = zoneId;
- return this;
- }
+ public Builder SetFetchSize(int fetchSize)
+ {
+ _fetchSize = fetchSize;
+ return this;
+ }
- public Builder SetPoolSize(int poolSize)
- {
- _poolSize = poolSize;
- return this;
- }
+ public Builder SetZoneId(string zoneId)
+ {
+ _zoneId = zoneId;
+ return this;
+ }
- public Builder SetEnableRpcCompression(bool enableRpcCompression)
- {
- _enableRpcCompression = enableRpcCompression;
- return this;
- }
+ public Builder SetPoolSize(int poolSize)
+ {
+ _poolSize = poolSize;
+ return this;
+ }
- public Builder SetConnectionTimeoutInMs(int timeout)
- {
- _connectionTimeoutInMs = timeout;
- return this;
- }
+ public Builder SetEnableRpcCompression(bool enableRpcCompression)
+ {
+ _enableRpcCompression = enableRpcCompression;
+ return this;
+ }
- public Builder SetNodeUrls(List<string> nodeUrls)
- {
- _nodeUrls = nodeUrls;
- return this;
- }
+ public Builder SetConnectionTimeoutInMs(int timeout)
+ {
+ _connectionTimeoutInMs = timeout;
+ return this;
+ }
- protected internal Builder SetSqlDialect(string sqlDialect)
- {
- _sqlDialect = sqlDialect;
- return this;
- }
+ public Builder SetNodeUrls(List<string> nodeUrls)
+ {
+ _nodeUrls = nodeUrls;
+ return this;
+ }
- public Builder SetDatabase(string database)
- {
- _database = database;
- return this;
- }
+ protected internal Builder SetSqlDialect(string sqlDialect)
+ {
+ _sqlDialect = sqlDialect;
+ return this;
+ }
- public Builder()
- {
- _host = "localhost";
- _port = 6667;
- _username = "root";
- _password = "root";
- _fetchSize = 1024;
- _zoneId = "UTC+08:00";
- _poolSize = 8;
- _enableRpcCompression = false;
- _connectionTimeoutInMs = 500;
- _sqlDialect = IoTDBConstant.TABLE_SQL_DIALECT;
- _database = "";
- }
+ public Builder SetDatabase(string database)
+ {
+ _database = database;
+ return this;
+ }
- public TableSessionPool Build()
- {
- SessionPool sessionPool;
- // if nodeUrls is not empty, use nodeUrls to create session pool
- if (_nodeUrls.Count > 0)
- {
- sessionPool = new SessionPool(_nodeUrls, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
- }
- else
- {
- sessionPool = new SessionPool(_host, _port, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
- }
- return new TableSessionPool(sessionPool);
+ public Builder()
+ {
+ _host = "localhost";
+ _port = 6667;
+ _username = "root";
+ _password = "root";
+ _fetchSize = 1024;
+ _zoneId = "UTC+08:00";
+ _poolSize = 8;
+ _enableRpcCompression = false;
+ _connectionTimeoutInMs = 500;
+ _sqlDialect = IoTDBConstant.TABLE_SQL_DIALECT;
+ _database = "";
+ }
+
+ public TableSessionPool Build()
+ {
+ SessionPool sessionPool;
+ // if nodeUrls is not empty, use nodeUrls to create session pool
+ if (_nodeUrls.Count > 0)
+ {
+ sessionPool = new SessionPool(_nodeUrls, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
+ }
+ else
+ {
+ sessionPool = new SessionPool(_host, _port, _username, _password, _fetchSize, _zoneId, _poolSize, _enableRpcCompression, _connectionTimeoutInMs, _sqlDialect, _database);
+ }
+ return new TableSessionPool(sessionPool);
+ }
}
- }
}
diff --git a/src/Apache.IoTDB/TableSessionPool.cs b/src/Apache.IoTDB/TableSessionPool.cs
index ddc977f..921aba1 100644
--- a/src/Apache.IoTDB/TableSessionPool.cs
+++ b/src/Apache.IoTDB/TableSessionPool.cs
@@ -28,50 +28,50 @@
public partial class TableSessionPool
{
- private SessionPool sessionPool;
+ private SessionPool sessionPool;
- TableSessionPool(SessionPool sessionPool)
- {
- this.sessionPool = sessionPool;
- }
+ TableSessionPool(SessionPool sessionPool)
+ {
+ this.sessionPool = sessionPool;
+ }
- public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default)
- {
- await sessionPool.Open(enableRpcCompression, cancellationToken);
- }
+ public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default)
+ {
+ await sessionPool.Open(enableRpcCompression, cancellationToken);
+ }
- public async Task Open(CancellationToken cancellationToken = default)
- {
- await sessionPool.Open(cancellationToken);
- }
+ public async Task Open(CancellationToken cancellationToken = default)
+ {
+ await sessionPool.Open(cancellationToken);
+ }
- public async Task<int> InsertAsync(Tablet tablet)
- {
- return await sessionPool.InsertRelationalTabletAsync(tablet);
- }
+ public async Task<int> InsertAsync(Tablet tablet)
+ {
+ return await sessionPool.InsertRelationalTabletAsync(tablet);
+ }
- public async Task<int> ExecuteNonQueryStatementAsync(string sql)
- {
- return await sessionPool.ExecuteNonQueryStatementAsync(sql);
- }
+ public async Task<int> ExecuteNonQueryStatementAsync(string sql)
+ {
+ return await sessionPool.ExecuteNonQueryStatementAsync(sql);
+ }
- public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
- {
- return await sessionPool.ExecuteQueryStatementAsync(sql);
- }
+ public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
+ {
+ return await sessionPool.ExecuteQueryStatementAsync(sql);
+ }
- public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql, long timeoutInMs)
- {
- return await sessionPool.ExecuteQueryStatementAsync(sql, timeoutInMs);
- }
+ public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql, long timeoutInMs)
+ {
+ return await sessionPool.ExecuteQueryStatementAsync(sql, timeoutInMs);
+ }
- public void OpenDebugMode(Action<ILoggingBuilder> configure)
- {
- sessionPool.OpenDebugMode(configure);
- }
+ public void OpenDebugMode(Action<ILoggingBuilder> configure)
+ {
+ sessionPool.OpenDebugMode(configure);
+ }
- public async Task Close()
- {
- await sessionPool.Close();
- }
+ public async Task Close()
+ {
+ await sessionPool.Close();
+ }
}
diff --git a/src/Apache.IoTDB/Template/InternalNode.cs b/src/Apache.IoTDB/Template/InternalNode.cs
index f08ecf6..50a4cc4 100644
--- a/src/Apache.IoTDB/Template/InternalNode.cs
+++ b/src/Apache.IoTDB/Template/InternalNode.cs
@@ -58,4 +58,4 @@
return this.shareTime;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/Template/MeasurementNode.cs b/src/Apache.IoTDB/Template/MeasurementNode.cs
index 790ca0f..bed6604 100644
--- a/src/Apache.IoTDB/Template/MeasurementNode.cs
+++ b/src/Apache.IoTDB/Template/MeasurementNode.cs
@@ -73,4 +73,4 @@
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/Template/Template.cs b/src/Apache.IoTDB/Template/Template.cs
index 74830a2..b14e51f 100644
--- a/src/Apache.IoTDB/Template/Template.cs
+++ b/src/Apache.IoTDB/Template/Template.cs
@@ -141,4 +141,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/Template/TemplateNode.cs b/src/Apache.IoTDB/Template/TemplateNode.cs
index 325e43c..c2614e5 100644
--- a/src/Apache.IoTDB/Template/TemplateNode.cs
+++ b/src/Apache.IoTDB/Template/TemplateNode.cs
@@ -57,4 +57,4 @@
return null;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/Utils.cs b/src/Apache.IoTDB/Utils.cs
index d50a08d..627314c 100644
--- a/src/Apache.IoTDB/Utils.cs
+++ b/src/Apache.IoTDB/Utils.cs
@@ -106,15 +106,15 @@
public static int ParseDateToInt(DateTime? dateTime)
{
- if (dateTime == null)
- {
- throw new ArgumentException("Date expression is none or empty.");
- }
- if(dateTime.Value.Year<1000)
- {
- throw new ArgumentException("Year must be between 1000 and 9999.");
- }
- return dateTime.Value.Year * 10000 + dateTime.Value.Month * 100 + dateTime.Value.Day;
+ if (dateTime == null)
+ {
+ throw new ArgumentException("Date expression is none or empty.");
+ }
+ if (dateTime.Value.Year < 1000)
+ {
+ throw new ArgumentException("Year must be between 1000 and 9999.");
+ }
+ return dateTime.Value.Year * 10000 + dateTime.Value.Month * 100 + dateTime.Value.Day;
}
public static string ByteArrayToHexString(byte[] bytes)
@@ -122,4 +122,4 @@
return "0x" + BitConverter.ToString(bytes).Replace("-", "").ToLowerInvariant();
}
}
-}
\ No newline at end of file
+}
diff --git a/tests/Apache.IoTDB.Integration.Tests/Tests.cs b/tests/Apache.IoTDB.Integration.Tests/Tests.cs
index 19b3dec..b180ab6 100644
--- a/tests/Apache.IoTDB.Integration.Tests/Tests.cs
+++ b/tests/Apache.IoTDB.Integration.Tests/Tests.cs
@@ -34,4 +34,4 @@
Assert.Pass();
}
}
-}
\ No newline at end of file
+}
diff --git a/tests/Apache.IoTDB.Tests/Tests.cs b/tests/Apache.IoTDB.Tests/Tests.cs
index 6a78663..d796fa7 100644
--- a/tests/Apache.IoTDB.Tests/Tests.cs
+++ b/tests/Apache.IoTDB.Tests/Tests.cs
@@ -34,4 +34,4 @@
Assert.Pass();
}
}
-}
\ No newline at end of file
+}