| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Net.Sockets; |
| using System.Numerics; |
| using System.Threading; |
| using System.Threading.Tasks; |
| using Apache.IoTDB.DataStructure; |
| using Microsoft.Extensions.Configuration; |
| using Microsoft.Extensions.Logging; |
| using Thrift; |
| using Thrift.Protocol; |
| using Thrift.Transport; |
| using Thrift.Transport.Client; |
| |
| namespace Apache.IoTDB |
| { |
| |
| public partial class SessionPool : IDisposable |
| { |
| private static readonly TSProtocolVersion ProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; |
| |
| private readonly string _username; |
| private readonly string _password; |
| private bool _enableRpcCompression; |
| private string _zoneId; |
| private readonly List<string> _nodeUrls = new(); |
| private readonly List<TEndPoint> _endPoints = new(); |
| private readonly string _host; |
| private readonly int _port; |
| private readonly int _fetchSize; |
| private readonly int _timeout; |
| private readonly int _poolSize = 4; |
| private readonly Utils _utilFunctions = new(); |
| private const int RetryNum = 3; |
| private bool _debugMode; |
| private bool _isClose = true; |
| private ConcurrentClientQueue _clients; |
| private ILogger _logger; |
| public delegate Task<TResult> AsyncOperation<TResult>(Client client); |
| |
| |
| public SessionPool(string host, int port, int poolSize) |
| : this(host, port, "root", "root", 1024, "UTC+08:00", poolSize, true, 60) |
| { |
| } |
| |
| public SessionPool(string host, int port, string username, string password) |
| : this(host, port, username, password, 1024, "UTC+08:00", 8, true, 60) |
| { |
| } |
| |
| public SessionPool(string host, int port, string username, string password, int fetchSize) |
| : this(host, port, username, password, fetchSize, "UTC+08:00", 8, true, 60) |
| { |
| |
| } |
| |
| public SessionPool(string host, int port) : this(host, port, "root", "root", 1024, "UTC+08:00", 8, true, 60) |
| { |
| } |
| public SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout) |
| { |
| _host = host; |
| _port = port; |
| _username = username; |
| _password = password; |
| _zoneId = zoneId; |
| _fetchSize = fetchSize; |
| _debugMode = false; |
| _poolSize = poolSize; |
| _enableRpcCompression = enableRpcCompression; |
| _timeout = timeout; |
| } |
| /// <summary> |
| /// Initializes a new instance of the <see cref="SessionPool"/> class. |
| /// </summary> |
| /// <param name="nodeUrls">The list of node URLs to connect to, multiple ip:rpcPort eg.127.0.0.1:9001</param> |
| /// <param name="poolSize">The size of the session pool.</param> |
| public SessionPool(List<string> nodeUrls, int poolSize) |
| : this(nodeUrls, "root", "root", 1024, "UTC+08:00", poolSize, true, 60) |
| { |
| } |
| public SessionPool(List<string> nodeUrls, string username, string password) |
| : this(nodeUrls, username, password, 1024, "UTC+08:00", 8, true, 60) |
| { |
| } |
| public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize) |
| : this(nodeUrls, username, password, fetchSize, "UTC+08:00", 8, true, 60) |
| { |
| } |
| public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId) |
| : this(nodeUrls, username, password, fetchSize, zoneId, 8, true, 60) |
| { |
| } |
| public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout) |
| { |
| if (nodeUrls.Count == 0) |
| { |
| throw new ArgumentException("nodeUrls shouldn't be empty."); |
| } |
| _nodeUrls = nodeUrls; |
| _endPoints = _utilFunctions.ParseSeedNodeUrls(nodeUrls); |
| _username = username; |
| _password = password; |
| _zoneId = zoneId; |
| _fetchSize = fetchSize; |
| _debugMode = false; |
| _poolSize = poolSize; |
| _enableRpcCompression = enableRpcCompression; |
| _timeout = timeout; |
| } |
| public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string errMsg, bool retryOnFailure = true, bool putClientBack = true) |
| { |
| Client client = _clients.Take(); |
| try |
| { |
| var resp = await operation(client); |
| return resp; |
| } |
| catch (TException ex) |
| { |
| if (retryOnFailure) |
| { |
| try |
| { |
| client = await Reconnect(client); |
| return await operation(client); |
| } |
| catch (TException retryEx) |
| { |
| throw new TException(errMsg, retryEx); |
| } |
| } |
| else |
| { |
| throw new TException(errMsg, ex); |
| } |
| } |
| catch (Exception ex) |
| { |
| if (retryOnFailure) |
| { |
| try |
| { |
| client = await Reconnect(client); |
| return await operation(client); |
| } |
| catch (TException retryEx) |
| { |
| throw new TException(errMsg, retryEx); |
| } |
| } |
| else |
| { |
| throw new TException(errMsg, ex); |
| } |
| } |
| finally |
| { |
| if (putClientBack) |
| { |
| _clients.Add(client); |
| } |
| } |
| } |
| /// <summary> |
| /// Gets or sets the amount of time a Session will wait for a send operation to complete successfully. |
| /// </summary> |
| /// <remarks> The send time-out value, in milliseconds. The default is 10000.</remarks> |
| public int TimeOut { get; set; } = 10000; |
| |
| ILoggerFactory factory; |
| private bool disposedValue; |
| |
| public void OpenDebugMode(Action<ILoggingBuilder> configure) |
| { |
| _debugMode = true; |
| factory = LoggerFactory.Create(configure); |
| _logger = factory.CreateLogger(nameof(Apache.IoTDB)); |
| } |
| |
| public void CloseDebugMode() |
| { |
| _debugMode = false; |
| } |
| |
| public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default) |
| { |
| _enableRpcCompression = enableRpcCompression; |
| await Open(cancellationToken); |
| } |
| |
| public async Task Open(CancellationToken cancellationToken = default) |
| { |
| _clients = new ConcurrentClientQueue(); |
| _clients.Timeout = _timeout * 5; |
| |
| if (_nodeUrls.Count == 0) |
| { |
| for (var index = 0; index < _poolSize; index++) |
| { |
| try |
| { |
| _clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, cancellationToken)); |
| } |
| catch (Exception e) |
| { |
| if (_debugMode) |
| { |
| _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", _host, _port); |
| } |
| } |
| } |
| } |
| else |
| { |
| int startIndex = 0; |
| for (var index = 0; index < _poolSize; index++) |
| { |
| bool isConnected = false; |
| for (int i = 0; i < _endPoints.Count; i++) |
| { |
| var endPointIndex = (startIndex + i) % _endPoints.Count; |
| var endPoint = _endPoints[endPointIndex]; |
| try |
| { |
| var client = await CreateAndOpen(endPoint.Ip, endPoint.Port, _enableRpcCompression, _timeout, cancellationToken); |
| _clients.Add(client); |
| isConnected = true; |
| startIndex = (endPointIndex + 1) % _endPoints.Count; |
| break; |
| } |
| catch (Exception e) |
| { |
| if (_debugMode) |
| { |
| _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", endPoint.Ip, endPoint.Port); |
| } |
| } |
| } |
| if (!isConnected) // current client could not connect to any endpoint |
| { |
| throw new TException("Error occurs when opening session pool. Could not connect to any server", null); |
| } |
| } |
| } |
| |
| if (_clients.ClientQueue.Count != _poolSize) |
| { |
| throw new TException(string.Format("Error occurs when opening session pool. Client pool size is not equal to the expected size. Client pool size: {0}, expected size: {1}, Please check the server status", _clients.ClientQueue.Count, _poolSize), null); |
| } |
| _isClose = false; |
| } |
| |
| |
| public async Task<Client> Reconnect(Client originalClient = null, CancellationToken cancellationToken = default) |
| { |
| originalClient?.Transport.Close(); |
| |
| if (_nodeUrls.Count == 0) |
| { |
| for (int attempt = 1; attempt <= RetryNum; attempt++) |
| { |
| try |
| { |
| var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, cancellationToken); |
| return client; |
| } |
| catch (Exception e) |
| { |
| if (_debugMode) |
| { |
| _logger.LogWarning(e, "Attempt reconnecting to {0}:{1} failed", _host, _port); |
| } |
| } |
| } |
| } |
| else |
| { |
| int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port); |
| if (startIndex == -1) |
| { |
| throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}"); |
| } |
| |
| for (int attempt = 1; attempt <= RetryNum; attempt++) |
| { |
| for (int i = 0; i < _endPoints.Count; i++) |
| { |
| int j = (startIndex + i) % _endPoints.Count; |
| try |
| { |
| var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken); |
| return client; |
| } |
| catch (Exception e) |
| { |
| if (_debugMode) |
| { |
| _logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port); |
| } |
| } |
| } |
| } |
| } |
| |
| throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null); |
| } |
| |
| |
| public bool IsOpen() => !_isClose; |
| |
| public async Task Close() |
| { |
| if (_isClose) |
| { |
| return; |
| } |
| |
| foreach (var client in _clients.ClientQueue.AsEnumerable()) |
| { |
| var closeSessionRequest = new TSCloseSessionReq(client.SessionId); |
| try |
| { |
| await client.ServiceClient.closeSessionAsync(closeSessionRequest); |
| } |
| catch (TException e) |
| { |
| throw new TException("Error occurs when closing session at server. Maybe server is down", e); |
| } |
| finally |
| { |
| _isClose = true; |
| |
| client.Transport?.Close(); |
| } |
| } |
| } |
| |
| public async Task SetTimeZone(string zoneId) |
| { |
| _zoneId = zoneId; |
| |
| foreach (var client in _clients.ClientQueue.AsEnumerable()) |
| { |
| var req = new TSSetTimeZoneReq(client.SessionId, zoneId); |
| try |
| { |
| var resp = await client.ServiceClient.setTimeZoneAsync(req); |
| if (_debugMode) |
| { |
| _logger.LogInformation("setting time zone_id as {0}, server message:{1}", zoneId, resp.Message); |
| } |
| } |
| catch (TException e) |
| { |
| throw new TException("could not set time zone", e); |
| } |
| } |
| } |
| |
| public async Task<string> GetTimeZone() |
| { |
| if (_zoneId != "") |
| { |
| return _zoneId; |
| } |
| |
| var client = _clients.Take(); |
| |
| try |
| { |
| var response = await client.ServiceClient.getTimeZoneAsync(client.SessionId); |
| |
| return response?.TimeZone; |
| } |
| catch (TException e) |
| { |
| throw new TException("could not get time zone", e); |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCompression, int timeout, CancellationToken cancellationToken = default) |
| { |
| var tcpClient = new TcpClient(host, port); |
| tcpClient.SendTimeout = timeout; |
| tcpClient.ReceiveTimeout = timeout; |
| var transport = new TFramedTransport(new TSocketTransport(tcpClient, null)); |
| |
| if (!transport.IsOpen) |
| { |
| await transport.OpenAsync(cancellationToken); |
| } |
| |
| var client = enableRpcCompression ? |
| new IClientRPCService.Client(new TCompactProtocol(transport)) : |
| new IClientRPCService.Client(new TBinaryProtocol(transport)); |
| |
| var openReq = new TSOpenSessionReq(ProtocolVersion, _zoneId, _username) |
| { |
| Password = _password, |
| }; |
| |
| try |
| { |
| var openResp = await client.openSessionAsync(openReq, cancellationToken); |
| |
| if (openResp.ServerProtocolVersion != ProtocolVersion) |
| { |
| throw new TException($"Protocol Differ, Client version is {ProtocolVersion} but Server version is {openResp.ServerProtocolVersion}", null); |
| } |
| |
| if (openResp.ServerProtocolVersion == 0) |
| { |
| throw new TException("Protocol not supported", null); |
| } |
| |
| var sessionId = openResp.SessionId; |
| var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken); |
| |
| var endpoint = new TEndPoint(host, port); |
| |
| var returnClient = new Client( |
| client, |
| sessionId, |
| statementId, |
| transport, |
| endpoint); |
| |
| return returnClient; |
| } |
| catch (Exception) |
| { |
| transport.Close(); |
| |
| throw; |
| } |
| } |
| |
| public async Task<int> CreateDatabase(string dbName) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, dbName); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("create database {0} successfully, server message is {1}", dbName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when creating database" |
| ); |
| } |
| |
| [Obsolete("This method is deprecated, please use createDatabase instead.")] |
| public async Task<int> SetStorageGroup(string groupName) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, groupName); |
| if (_debugMode) |
| { |
| _logger.LogInformation("set storage group {0} successfully, server message is {1}", groupName, status.Message); |
| } |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when setting storage group" |
| ); |
| } |
| public async Task<int> CreateTimeSeries( |
| string tsPath, |
| TSDataType dataType, |
| TSEncoding encoding, |
| Compressor compressor) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSCreateTimeseriesReq( |
| client.SessionId, |
| tsPath, |
| (int)dataType, |
| (int)encoding, |
| (int)compressor); |
| |
| var status = await client.ServiceClient.createTimeseriesAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating time series {0} successfully, server message is {1}", tsPath, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when creating time series" |
| ); |
| } |
| public async Task<int> CreateAlignedTimeseriesAsync( |
| string prefixPath, |
| List<string> measurements, |
| List<TSDataType> dataTypeLst, |
| List<TSEncoding> encodingLst, |
| List<Compressor> compressorLst) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var dataTypes = dataTypeLst.ConvertAll(x => (int)x); |
| var encodings = encodingLst.ConvertAll(x => (int)x); |
| var compressors = compressorLst.ConvertAll(x => (int)x); |
| |
| var req = new TSCreateAlignedTimeseriesReq( |
| client.SessionId, |
| prefixPath, |
| measurements, |
| dataTypes, |
| encodings, |
| compressors); |
| |
| var status = await client.ServiceClient.createAlignedTimeseriesAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating aligned time series {0} successfully, server message is {1}", prefixPath, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when creating aligned time series" |
| ); |
| } |
| public async Task<int> DeleteDatabaseAsync(string dbName) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, new List<string> { dbName }); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete database {0} successfully, server message is {1}", dbName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting database" |
| ); |
| } |
| [Obsolete("This method is deprecated, please use DeleteDatabaseAsync instead.")] |
| public async Task<int> DeleteStorageGroupAsync(string groupName) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, new List<string> { groupName }); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete storage group {0} successfully, server message is {1}", groupName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting storage group" |
| ); |
| } |
| public async Task<int> DeleteDatabasesAsync(List<string> dbNames) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, dbNames); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete database(s) {0} successfully, server message is {1}", dbNames, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting database(s)" |
| ); |
| } |
| [Obsolete("This method is deprecated, please use DeleteDatabasesAsync instead.")] |
| public async Task<int> DeleteStorageGroupsAsync(List<string> groupNames) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, groupNames); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete storage group(s) {0} successfully, server message is {1}", groupNames, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting storage group(s)" |
| ); |
| } |
| public async Task<int> CreateMultiTimeSeriesAsync( |
| List<string> tsPathLst, |
| List<TSDataType> dataTypeLst, |
| List<TSEncoding> encodingLst, |
| List<Compressor> compressorLst) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var dataTypes = dataTypeLst.ConvertAll(x => (int)x); |
| var encodings = encodingLst.ConvertAll(x => (int)x); |
| var compressors = compressorLst.ConvertAll(x => (int)x); |
| |
| var req = new TSCreateMultiTimeseriesReq(client.SessionId, tsPathLst, dataTypes, encodings, compressors); |
| |
| var status = await client.ServiceClient.createMultiTimeseriesAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating multiple time series {0}, server message is {1}", tsPathLst, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when creating multiple time series" |
| ); |
| } |
| public async Task<int> DeleteTimeSeriesAsync(List<string> pathList) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var status = await client.ServiceClient.deleteTimeseriesAsync(client.SessionId, pathList); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("deleting multiple time series {0}, server message is {1}", pathList, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting multiple time series" |
| ); |
| } |
| |
| public async Task<int> DeleteTimeSeriesAsync(string tsPath) |
| { |
| return await DeleteTimeSeriesAsync(new List<string> { tsPath }); |
| } |
| |
| public async Task<bool> CheckTimeSeriesExistsAsync(string tsPath) |
| { |
| try |
| { |
| var sql = "SHOW TIMESERIES " + tsPath; |
| var sessionDataset = await ExecuteQueryStatementAsync(sql); |
| bool timeSeriesExists = sessionDataset.HasNext(); |
| await sessionDataset.Close(); // be sure to close the sessionDataset to put the client back to the pool |
| return timeSeriesExists; |
| } |
| catch (TException e) |
| { |
| throw new TException("could not check if certain time series exists", e); |
| } |
| } |
| public async Task<int> DeleteDataAsync(List<string> tsPathLst, long startTime, long endTime) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSDeleteDataReq(client.SessionId, tsPathLst, startTime, endTime); |
| |
| var status = await client.ServiceClient.deleteDataAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation( |
| "delete data from {0}, server message is {1}", |
| tsPathLst, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting data" |
| ); |
| } |
| public async Task<int> InsertRecordAsync(string deviceId, RowRecord record) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps); |
| |
| var status = await client.ServiceClient.insertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting record" |
| ); |
| } |
| public async Task<int> InsertAlignedRecordAsync(string deviceId, RowRecord record) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps); |
| req.IsAligned = true; |
| // ASSERT that the insert plan is aligned |
| System.Diagnostics.Debug.Assert(req.IsAligned == true); |
| |
| var status = await client.ServiceClient.insertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting record" |
| ); |
| } |
| |
| public TSInsertStringRecordReq GenInsertStrRecordReq(string deviceId, List<string> measurements, |
| List<string> values, long timestamp, long sessionId, bool isAligned = false) |
| { |
| if (values.Count() != measurements.Count()) |
| { |
| throw new ArgumentException("length of data types does not equal to length of values!"); |
| } |
| |
| return new TSInsertStringRecordReq(sessionId, deviceId, measurements, values, timestamp) |
| { |
| IsAligned = isAligned |
| }; |
| } |
| public TSInsertStringRecordsReq GenInsertStringRecordsReq(List<string> deviceIds, List<List<string>> measurementsList, |
| List<List<string>> valuesList, List<long> timestamps, long sessionId, bool isAligned = false) |
| { |
| if (valuesList.Count() != measurementsList.Count()) |
| { |
| throw new ArgumentException("length of data types does not equal to length of values!"); |
| } |
| |
| return new TSInsertStringRecordsReq(sessionId, deviceIds, measurementsList, valuesList, timestamps) |
| { |
| IsAligned = isAligned |
| }; |
| } |
| |
| public TSInsertRecordsReq GenInsertRecordsReq(List<string> deviceId, List<RowRecord> rowRecords, |
| long sessionId) |
| { |
| var measurementLst = rowRecords.Select(x => x.Measurements).ToList(); |
| var timestampLst = rowRecords.Select(x => x.Timestamps).ToList(); |
| var valuesLstInBytes = rowRecords.Select(row => row.ToBytes()).ToList(); |
| |
| return new TSInsertRecordsReq(sessionId, deviceId, measurementLst, valuesLstInBytes, timestampLst); |
| } |
| public async Task<int> InsertStringRecordAsync(string deviceId, List<string> measurements, List<string> values, |
| long timestamp) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertStrRecordReq(deviceId, measurements, values, timestamp, client.SessionId); |
| |
| var status = await client.ServiceClient.insertStringRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one string record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting a string record" |
| ); |
| } |
| public async Task<int> InsertAlignedStringRecordAsync(string deviceId, List<string> measurements, List<string> values, |
| long timestamp) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertStrRecordReq(deviceId, measurements, values, timestamp, client.SessionId, true); |
| |
| var status = await client.ServiceClient.insertStringRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting a string record" |
| ); |
| } |
| public async Task<int> InsertStringRecordsAsync(List<string> deviceIds, List<List<string>> measurements, List<List<string>> values, |
| List<long> timestamps) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertStringRecordsReq(deviceIds, measurements, values, timestamps, client.SessionId); |
| |
| var status = await client.ServiceClient.insertStringRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert string records to devices {0}, server message: {1}", deviceIds, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting string records" |
| ); |
| } |
| public async Task<int> InsertAlignedStringRecordsAsync(List<string> deviceIds, List<List<string>> measurements, List<List<string>> values, |
| List<long> timestamps) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertStringRecordsReq(deviceIds, measurements, values, timestamps, client.SessionId, true); |
| |
| var status = await client.ServiceClient.insertStringRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert string records to devices {0}, server message: {1}", deviceIds, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting string records" |
| ); |
| } |
| public async Task<int> InsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId); |
| |
| var status = await client.ServiceClient.insertRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting records" |
| ); |
| } |
| public async Task<int> InsertAlignedRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId); |
| req.IsAligned = true; |
| // ASSERT that the insert plan is aligned |
| System.Diagnostics.Debug.Assert(req.IsAligned == true); |
| |
| var status = await client.ServiceClient.insertRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting records" |
| ); |
| } |
| public TSInsertTabletReq GenInsertTabletReq(Tablet tablet, long sessionId) |
| { |
| return new TSInsertTabletReq( |
| sessionId, |
| tablet.DeviceId, |
| tablet.Measurements, |
| tablet.GetBinaryValues(), |
| tablet.GetBinaryTimestamps(), |
| tablet.GetDataTypes(), |
| tablet.RowNumber); |
| } |
| public async Task<int> InsertTabletAsync(Tablet tablet) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertTabletReq(tablet, client.SessionId); |
| |
| var status = await client.ServiceClient.insertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting tablet" |
| ); |
| } |
| public async Task<int> InsertAlignedTabletAsync(Tablet tablet) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertTabletReq(tablet, client.SessionId); |
| req.IsAligned = true; |
| |
| var status = await client.ServiceClient.insertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one aligned tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting aligned tablet" |
| ); |
| } |
| |
| public TSInsertTabletsReq GenInsertTabletsReq(List<Tablet> tabletLst, long sessionId) |
| { |
| var deviceIdLst = new List<string>(); |
| var measurementsLst = new List<List<string>>(); |
| var valuesLst = new List<byte[]>(); |
| var timestampsLst = new List<byte[]>(); |
| var typeLst = new List<List<int>>(); |
| var sizeLst = new List<int>(); |
| |
| foreach (var tablet in tabletLst) |
| { |
| var dataTypeValues = tablet.GetDataTypes(); |
| deviceIdLst.Add(tablet.DeviceId); |
| measurementsLst.Add(tablet.Measurements); |
| valuesLst.Add(tablet.GetBinaryValues()); |
| timestampsLst.Add(tablet.GetBinaryTimestamps()); |
| typeLst.Add(dataTypeValues); |
| sizeLst.Add(tablet.RowNumber); |
| } |
| |
| return new TSInsertTabletsReq( |
| sessionId, |
| deviceIdLst, |
| measurementsLst, |
| valuesLst, |
| timestampsLst, |
| typeLst, |
| sizeLst); |
| } |
| |
| public async Task<int> InsertTabletsAsync(List<Tablet> tabletLst) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertTabletsReq(tabletLst, client.SessionId); |
| |
| var status = await client.ServiceClient.insertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting tablets" |
| ); |
| } |
| public async Task<int> InsertAlignedTabletsAsync(List<Tablet> tabletLst) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertTabletsReq(tabletLst, client.SessionId); |
| req.IsAligned = true; |
| |
| var status = await client.ServiceClient.insertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple aligned tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting aligned tablets" |
| ); |
| } |
| |
| public async Task<int> InsertRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList(); |
| return await InsertRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords); |
| } |
| public async Task<int> InsertAlignedRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList(); |
| return await InsertAlignedRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords); |
| } |
| public async Task<int> InsertStringRecordsOfOneDeviceAsync(string deviceId, List<long> timestamps, |
| List<List<string>> measurementsList, List<List<string>> valuesList) |
| { |
| var joined = timestamps.Zip(measurementsList, (t, m) => new { t, m }) |
| .Zip(valuesList, (tm, v) => new { tm.t, tm.m, v }) |
| .OrderBy(x => x.t); |
| |
| var sortedTimestamps = joined.Select(x => x.t).ToList(); |
| var sortedMeasurementsList = joined.Select(x => x.m).ToList(); |
| var sortedValuesList = joined.Select(x => x.v).ToList(); |
| |
| return await InsertStringRecordsOfOneDeviceSortedAsync(deviceId, sortedTimestamps, sortedMeasurementsList, sortedValuesList, false); |
| } |
| public async Task<int> InsertAlignedStringRecordsOfOneDeviceAsync(string deviceId, List<long> timestamps, |
| List<List<string>> measurementsList, List<List<string>> valuesList) |
| { |
| var joined = timestamps.Zip(measurementsList, (t, m) => new { t, m }) |
| .Zip(valuesList, (tm, v) => new { tm.t, tm.m, v }) |
| .OrderBy(x => x.t); |
| |
| var sortedTimestamps = joined.Select(x => x.t).ToList(); |
| var sortedMeasurementsList = joined.Select(x => x.m).ToList(); |
| var sortedValuesList = joined.Select(x => x.v).ToList(); |
| |
| return await InsertStringRecordsOfOneDeviceSortedAsync(deviceId, sortedTimestamps, sortedMeasurementsList, sortedValuesList, true); |
| } |
| public async Task<int> InsertStringRecordsOfOneDeviceSortedAsync(string deviceId, List<long> timestamps, |
| List<List<string>> measurementsList, List<List<string>> valuesList, bool isAligned) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| if (!_utilFunctions.IsSorted(timestamps)) |
| { |
| throw new ArgumentException("insert string records of one device error: timestamp not sorted"); |
| } |
| |
| var req = GenInsertStringRecordsOfOneDeviceReq(deviceId, timestamps, measurementsList, valuesList, client.SessionId, isAligned); |
| |
| var status = await client.ServiceClient.insertStringRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert string records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting string records of one device" |
| ); |
| } |
| private TSInsertStringRecordsOfOneDeviceReq GenInsertStringRecordsOfOneDeviceReq(string deviceId, |
| List<long> timestamps, List<List<string>> measurementsList, List<List<string>> valuesList, |
| long sessionId, bool isAligned = false) |
| { |
| return new TSInsertStringRecordsOfOneDeviceReq( |
| sessionId, |
| deviceId, |
| measurementsList, |
| valuesList, |
| timestamps) |
| { |
| IsAligned = isAligned |
| }; |
| } |
| private TSInsertRecordsOfOneDeviceReq GenInsertRecordsOfOneDeviceRequest( |
| string deviceId, |
| List<RowRecord> records, |
| long sessionId) |
| { |
| var values = records.Select(row => row.ToBytes()); |
| var measurementsLst = records.Select(x => x.Measurements).ToList(); |
| var timestampLst = records.Select(x => x.Timestamps).ToList(); |
| |
| return new TSInsertRecordsOfOneDeviceReq( |
| sessionId, |
| deviceId, |
| measurementsLst, |
| values.ToList(), |
| timestampLst); |
| } |
| public async Task<int> InsertRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var timestampLst = rowRecords.Select(x => x.Timestamps).ToList(); |
| |
| if (!_utilFunctions.IsSorted(timestampLst)) |
| { |
| throw new ArgumentException("insert records of one device error: timestamp not sorted"); |
| } |
| |
| var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId); |
| |
| var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting records of one device" |
| ); |
| } |
| public async Task<int> InsertAlignedRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var timestampLst = rowRecords.Select(x => x.Timestamps).ToList(); |
| |
| if (!_utilFunctions.IsSorted(timestampLst)) |
| { |
| throw new ArgumentException("insert records of one device error: timestamp not sorted"); |
| } |
| |
| var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId); |
| req.IsAligned = true; |
| |
| var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert aligned records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when inserting aligned records of one device" |
| ); |
| } |
| public async Task<int> TestInsertRecordAsync(string deviceId, RowRecord record) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSInsertRecordReq( |
| client.SessionId, |
| deviceId, |
| record.Measurements, |
| record.ToBytes(), |
| record.Timestamps); |
| |
| var status = await client.ServiceClient.testInsertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when test inserting one record" |
| ); |
| } |
| public async Task<int> TestInsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId); |
| |
| var status = await client.ServiceClient.testInsertRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when test inserting multiple records" |
| ); |
| } |
| public async Task<int> TestInsertTabletAsync(Tablet tablet) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertTabletReq(tablet, client.SessionId); |
| |
| var status = await client.ServiceClient.testInsertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when test inserting one tablet" |
| ); |
| } |
| public async Task<int> TestInsertTabletsAsync(List<Tablet> tabletLst) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = GenInsertTabletsReq(tabletLst, client.SessionId); |
| |
| var status = await client.ServiceClient.testInsertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when test inserting multiple tablets" |
| ); |
| } |
| |
| public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) |
| { |
| return await ExecuteClientOperationAsync<SessionDataSet>( |
| async client => |
| { |
| var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId) |
| { |
| FetchSize = _fetchSize |
| }; |
| |
| var resp = await client.ServiceClient.executeQueryStatementAsync(req); |
| var status = resp.Status; |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message)); |
| } |
| |
| return new SessionDataSet(sql, resp, client, _clients, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| }; |
| }, |
| errMsg: "Error occurs when executing query statement", |
| putClientBack: false |
| ); |
| } |
| public async Task<SessionDataSet> ExecuteStatementAsync(string sql, long timeout) |
| { |
| return await ExecuteClientOperationAsync<SessionDataSet>( |
| async client => |
| { |
| var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| Timeout = timeout |
| }; |
| |
| var resp = await client.ServiceClient.executeStatementAsync(req); |
| var status = resp.Status; |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message)); |
| } |
| |
| return new SessionDataSet(sql, resp, client, _clients, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| }; |
| }, |
| errMsg: "Error occurs when executing query statement", |
| putClientBack: false |
| ); |
| } |
| |
| |
| public async Task<int> ExecuteNonQueryStatementAsync(string sql) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId); |
| |
| var resp = await client.ServiceClient.executeUpdateStatementAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("execute non-query statement {0} message: {1}", sql, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when executing non-query statement" |
| ); |
| } |
| public async Task<SessionDataSet> ExecuteRawDataQuery(List<string> paths, long startTime, long endTime) |
| { |
| return await ExecuteClientOperationAsync<SessionDataSet>( |
| async client => |
| { |
| var req = new TSRawDataQueryReq(client.SessionId, paths, startTime, endTime, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| EnableRedirectQuery = false |
| }; |
| |
| var resp = await client.ServiceClient.executeRawDataQueryAsync(req); |
| var status = resp.Status; |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("execute raw data query failed, message: {0}", status.Message)); |
| } |
| |
| return new SessionDataSet("", resp, client, _clients, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| }; |
| }, |
| errMsg: "Error occurs when executing raw data query", |
| putClientBack: false |
| ); |
| } |
| public async Task<SessionDataSet> ExecuteLastDataQueryAsync(List<string> paths, long lastTime) |
| { |
| return await ExecuteClientOperationAsync<SessionDataSet>( |
| async client => |
| { |
| var req = new TSLastDataQueryReq(client.SessionId, paths, lastTime, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| EnableRedirectQuery = false |
| }; |
| |
| var resp = await client.ServiceClient.executeLastDataQueryAsync(req); |
| var status = resp.Status; |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("execute last data query failed, message: {0}", status.Message)); |
| } |
| |
| return new SessionDataSet("", resp, client, _clients, client.StatementId) |
| { |
| FetchSize = _fetchSize, |
| }; |
| }, |
| errMsg: "Error occurs when executing last data query", |
| putClientBack: false |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<int> CreateSchemaTemplateAsync(Template template) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSCreateSchemaTemplateReq(client.SessionId, template.Name, template.ToBytes()); |
| |
| var status = await client.ServiceClient.createSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("create schema template {0} message: {1}", template.Name, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when creating schema template" |
| ); |
| } |
| |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<int> DropSchemaTemplateAsync(string templateName) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSDropSchemaTemplateReq(client.SessionId, templateName); |
| |
| var status = await client.ServiceClient.dropSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("drop schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when dropping schema template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<int> SetSchemaTemplateAsync(string templateName, string prefixPath) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSSetSchemaTemplateReq(client.SessionId, templateName, prefixPath); |
| |
| var status = await client.ServiceClient.setSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("set schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when setting schema template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<int> UnsetSchemaTemplateAsync(string prefixPath, string templateName) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSUnsetSchemaTemplateReq(client.SessionId, prefixPath, templateName); |
| |
| var status = await client.ServiceClient.unsetSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("unset schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when unsetting schema template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<int> DeleteNodeInTemplateAsync(string templateName, string path) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSPruneSchemaTemplateReq(client.SessionId, templateName, path); |
| |
| var status = await client.ServiceClient.pruneSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete node in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status); |
| }, |
| errMsg: "Error occurs when deleting node in template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<int> CountMeasurementsInTemplateAsync(string name) |
| { |
| return await ExecuteClientOperationAsync<int>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, name, (int)TemplateQueryType.COUNT_MEASUREMENTS); |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("count measurements in template {0} message: {1}", name, status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("count measurements in template failed, template name: {0}, message: {1}", name, status.Message)); |
| } |
| return resp.Count; |
| }, |
| errMsg: "Error occurs when counting measurements in template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<bool> IsMeasurementInTemplateAsync(string templateName, string path) |
| { |
| return await ExecuteClientOperationAsync<bool>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.IS_MEASUREMENT); |
| req.Measurement = path; |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("is measurement in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("is measurement in template failed, template name: {0}, message: {1}", templateName, status.Message)); |
| } |
| return resp.Result; |
| }, |
| errMsg: "Error occurs when checking measurement in template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<bool> IsPathExistInTemplateAsync(string templateName, string path) |
| { |
| return await ExecuteClientOperationAsync<bool>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.PATH_EXIST); |
| req.Measurement = path; |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("is path exist in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("is path exist in template failed, template name: {0}, message: {1}", templateName, status.Message)); |
| } |
| return resp.Result; |
| }, |
| errMsg: "Error occurs when checking path exist in template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<List<string>> ShowMeasurementsInTemplateAsync(string templateName, string pattern = "") |
| { |
| return await ExecuteClientOperationAsync<List<string>>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_MEASUREMENTS); |
| req.Measurement = pattern; |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("get measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("show measurements in template failed, template name: {0}, pattern: {1}, message: {2}", templateName, pattern, status.Message)); |
| } |
| return resp.Measurements; |
| }, |
| errMsg: "Error occurs when showing measurements in template" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<List<string>> ShowAllTemplatesAsync() |
| { |
| return await ExecuteClientOperationAsync<List<string>>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, "", (int)TemplateQueryType.SHOW_TEMPLATES); |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("get all templates message: {0}", status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("show all templates failed, message: {0}", status.Message)); |
| } |
| return resp.Measurements; |
| }, |
| errMsg: "Error occurs when getting all templates" |
| ); |
| } |
| |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<List<string>> ShowPathsTemplateSetOnAsync(string templateName) |
| { |
| return await ExecuteClientOperationAsync<List<string>>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_SET_TEMPLATES); |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("get paths template set on {0} message: {1}", templateName, status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("show paths template set on failed, template name: {0}, message: {1}", templateName, status.Message)); |
| } |
| return resp.Measurements; |
| }, |
| errMsg: "Error occurs when getting paths template set on" |
| ); |
| } |
| [Obsolete("This method is obsolete. Use SQL instead.", false)] |
| public async Task<List<string>> ShowPathsTemplateUsingOnAsync(string templateName) |
| { |
| return await ExecuteClientOperationAsync<List<string>>( |
| async client => |
| { |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_USING_TEMPLATES); |
| |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("get paths template using on {0} message: {1}", templateName, status.Message); |
| } |
| |
| if (_utilFunctions.VerifySuccess(status) == -1) |
| { |
| throw new Exception(string.Format("show paths template using on failed, template name: {0}, message: {1}", templateName, status.Message)); |
| } |
| return resp.Measurements; |
| }, |
| errMsg: "Error occurs when getting paths template using on" |
| ); |
| } |
| |
| protected virtual void Dispose(bool disposing) |
| { |
| if (!disposedValue) |
| { |
| if (disposing) |
| { |
| #if NET461_OR_GREATER || NETSTANDARD2_0 |
| #else |
| _clients.ClientQueue.Clear(); |
| #endif |
| } |
| _clients = null; |
| disposedValue = true; |
| } |
| } |
| |
| public void Dispose() |
| { |
| Dispose(disposing: true); |
| GC.SuppressFinalize(this); |
| } |
| } |
| } |