| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Net.Sockets; |
| using System.Numerics;
|
| using System.Threading; |
| using System.Threading.Tasks; |
| using Apache.IoTDB.DataStructure; |
| using Microsoft.Extensions.Configuration;
|
| using Microsoft.Extensions.Logging;
|
| using Thrift; |
| using Thrift.Protocol; |
| using Thrift.Transport; |
| using Thrift.Transport.Client; |
| |
| namespace Apache.IoTDB |
| { |
|
|
| public class SessionPool:IDisposable |
| { |
| private static int SuccessCode => 200; |
| private static readonly TSProtocolVersion ProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; |
| |
| private readonly string _username; |
| private readonly string _password; |
| private bool _enableRpcCompression; |
| private string _zoneId; |
| private readonly string _host; |
| private readonly int _port; |
| private readonly int _fetchSize; |
| private readonly int _timeout; |
| private readonly int _poolSize = 4; |
| private readonly Utils _utilFunctions = new Utils(); |
| private bool _debugMode; |
| private bool _isClose = true; |
| private ConcurrentClientQueue _clients; |
| private ILogger _logger; |
| |
| public SessionPool(string host, int port, int poolSize) |
| : this(host, port, "root", "root", poolSize, "UTC+08:00", 8, true, 60) |
| { |
| }
|
| |
| public SessionPool(string host, int port, string username, string password) |
| : this(host, port, username, password, 1024, "UTC+08:00", 8, true, 60) |
| { |
| } |
| |
| public SessionPool(string host, int port, string username, string password, int fetchSize)
|
| : this(host, port, username, password, fetchSize, "UTC+08:00", 8, true, 60)
|
| {
|
|
|
| } |
| |
| public SessionPool(string host, int port) : this(host, port, "root", "root", 1024, "UTC+08:00", 8, true, 60) |
| { |
| }
|
| public SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout) |
| { |
| _host = host; |
| _port = port; |
| _username = username; |
| _password = password; |
| _zoneId = zoneId; |
| _fetchSize = fetchSize; |
| _debugMode = false; |
| _poolSize = poolSize; |
| _enableRpcCompression = enableRpcCompression; |
| _timeout = timeout; |
| }
|
|
|
| /// <summary>
|
| /// Gets or sets the amount of time a Session will wait for a send operation to complete successfully.
|
| /// </summary>
|
| /// <remarks> The send time-out value, in milliseconds. The default is 10000.</remarks>
|
| public int TimeOut { get; set; } = 10000;
|
|
|
| ILoggerFactory factory;
|
| private bool disposedValue;
|
|
|
| public void OpenDebugMode(Action<ILoggingBuilder> configure) |
| { |
| _debugMode = true;
|
| factory= LoggerFactory.Create(configure); |
| _logger = factory.CreateLogger(nameof(Apache.IoTDB)); |
| } |
| |
| public void CloseDebugMode() |
| { |
| _debugMode = false; |
| }
|
|
|
| public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default) |
| {
|
| _enableRpcCompression = enableRpcCompression;
|
| await Open(cancellationToken);
|
| }
|
|
|
| public async Task Open(CancellationToken cancellationToken = default) |
| { |
| _clients = new ConcurrentClientQueue();
|
| _clients.Timeout = _timeout*5;
|
| for (var index = 0; index < _poolSize; index++) |
| { |
| _clients.Add(await CreateAndOpen(_enableRpcCompression,_timeout, cancellationToken)); |
| } |
| } |
| |
| public bool IsOpen() => !_isClose; |
| |
| public async Task Close() |
| { |
| if (_isClose) |
| { |
| return; |
| } |
| |
| foreach (var client in _clients.ClientQueue.AsEnumerable()) |
| { |
| var closeSessionRequest = new TSCloseSessionReq(client.SessionId); |
| try |
| { |
| await client.ServiceClient.closeSessionAsync(closeSessionRequest); |
| } |
| catch (TException e) |
| { |
| throw new TException("Error occurs when closing session at server. Maybe server is down", e); |
| } |
| finally |
| { |
| _isClose = true; |
| |
| client.Transport?.Close(); |
| } |
| } |
| } |
| |
| public async Task SetTimeZone(string zoneId) |
| { |
| _zoneId = zoneId; |
| |
| foreach (var client in _clients.ClientQueue.AsEnumerable()) |
| { |
| var req = new TSSetTimeZoneReq(client.SessionId, zoneId); |
| try |
| { |
| var resp = await client.ServiceClient.setTimeZoneAsync(req); |
| if (_debugMode) |
| { |
| _logger.LogInformation("setting time zone_id as {0}, server message:{1}", zoneId, resp.Message); |
| } |
| } |
| catch (TException e) |
| { |
| throw new TException("could not set time zone", e); |
| } |
| } |
| } |
| |
| public async Task<string> GetTimeZone() |
| { |
| if (_zoneId != "") |
| { |
| return _zoneId; |
| } |
| |
| var client = _clients.Take(); |
| |
| try |
| { |
| var response = await client.ServiceClient.getTimeZoneAsync(client.SessionId); |
| |
| return response?.TimeZone; |
| } |
| catch (TException e) |
| { |
| throw new TException("could not get time zone", e); |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| private async Task<Client> CreateAndOpen(bool enableRpcCompression,int timeout, CancellationToken cancellationToken = default) |
| { |
| var tcpClient = new TcpClient(_host, _port); |
| tcpClient.SendTimeout = timeout; |
| tcpClient.ReceiveTimeout = timeout; |
| var transport = new TFramedTransport(new TSocketTransport(tcpClient, null)); |
| |
| if (!transport.IsOpen) |
| { |
| await transport.OpenAsync(cancellationToken); |
| } |
| |
| var client = enableRpcCompression ? |
| new TSIService.Client(new TCompactProtocol(transport)) : |
| new TSIService.Client(new TBinaryProtocol(transport)); |
| |
| var openReq = new TSOpenSessionReq(ProtocolVersion, _zoneId) |
| { |
| Username = _username, |
| Password = _password |
| }; |
| |
| try |
| { |
| var openResp = await client.openSessionAsync(openReq, cancellationToken); |
| |
| if (openResp.ServerProtocolVersion != ProtocolVersion) |
| { |
| throw new TException($"Protocol Differ, Client version is {ProtocolVersion} but Server version is {openResp.ServerProtocolVersion}", null); |
| } |
| |
| if (openResp.ServerProtocolVersion == 0) |
| { |
| throw new TException("Protocol not supported", null); |
| } |
| |
| var sessionId = openResp.SessionId; |
| var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken); |
| |
| _isClose = false; |
| |
| var returnClient = new Client( |
| client, |
| sessionId, |
| statementId, |
| transport); |
| |
| return returnClient; |
| } |
| catch (Exception) |
| { |
| transport.Close(); |
| |
| throw; |
| } |
| } |
| |
| public async Task<int> SetStorageGroup(string groupName) |
| { |
| var client = _clients.Take(); |
| |
| try |
| { |
| var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, groupName); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("set storage group {0} successfully, server message is {1}", groupName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| // try to reconnect |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| try |
| { |
| var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, groupName); |
| if (_debugMode) |
| { |
| _logger.LogInformation("set storage group {0} successfully, server message is {1}", groupName, status.Message); |
| } |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when setting storage group", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> CreateTimeSeries( |
| string tsPath, |
| TSDataType dataType, |
| TSEncoding encoding, |
| Compressor compressor) |
| { |
| var client = _clients.Take(); |
| var req = new TSCreateTimeseriesReq( |
| client.SessionId, |
| tsPath, |
| (int)dataType, |
| (int)encoding, |
| (int)compressor); |
| try |
| { |
| var status = await client.ServiceClient.createTimeseriesAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating time series {0} successfully, server message is {1}", tsPath, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.createTimeseriesAsync(req); |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating time series {0} successfully, server message is {1}", tsPath, status.Message); |
| } |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when creating time series", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| |
| } |
| |
| public async Task<int> CreateAlignedTimeseriesAsync( |
| string prefixPath, |
| List<string> measurements, |
| List<TSDataType> dataTypeLst, |
| List<TSEncoding> encodingLst, |
| List<Compressor> compressorLst) |
| { |
| var client = _clients.Take(); |
| var dataTypes = dataTypeLst.ConvertAll(x => (int)x); |
| var encodings = encodingLst.ConvertAll(x => (int)x); |
| var compressors = compressorLst.ConvertAll(x => (int)x); |
| |
| var req = new TSCreateAlignedTimeseriesReq( |
| client.SessionId, |
| prefixPath, |
| measurements, |
| dataTypes, |
| encodings, |
| compressors); |
| try |
| { |
| var status = await client.ServiceClient.createAlignedTimeseriesAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating aligned time series {0} successfully, server message is {1}", prefixPath, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.createAlignedTimeseriesAsync(req); |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating aligned time series {0} successfully, server message is {1}", prefixPath, status.Message); |
| } |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when creating aligned time series", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> DeleteStorageGroupAsync(string groupName) |
| { |
| var client = _clients.Take(); |
| try |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync( |
| client.SessionId, |
| new List<string> { groupName }); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation($"delete storage group {groupName} successfully, server message is {status?.Message}"); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| try |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync( |
| client.SessionId, |
| new List<string> { groupName }); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation($"delete storage group {groupName} successfully, server message is {status?.Message}"); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when deleting storage group", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> DeleteStorageGroupsAsync(List<string> groupNames) |
| { |
| var client = _clients.Take(); |
| |
| try |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, groupNames); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation( |
| "delete storage group(s) {0} successfully, server message is {1}", |
| groupNames, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| try |
| { |
| var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, groupNames); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation( |
| "delete storage group(s) {0} successfully, server message is {1}", |
| groupNames, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when deleting storage group(s)", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> CreateMultiTimeSeriesAsync( |
| List<string> tsPathLst, |
| List<TSDataType> dataTypeLst, |
| List<TSEncoding> encodingLst, |
| List<Compressor> compressorLst) |
| { |
| var client = _clients.Take(); |
| var dataTypes = dataTypeLst.ConvertAll(x => (int)x); |
| var encodings = encodingLst.ConvertAll(x => (int)x); |
| var compressors = compressorLst.ConvertAll(x => (int)x); |
| |
| var req = new TSCreateMultiTimeseriesReq(client.SessionId, tsPathLst, dataTypes, encodings, compressors); |
| |
| try |
| { |
| var status = await client.ServiceClient.createMultiTimeseriesAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating multiple time series {0}, server message is {1}", tsPathLst, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.createMultiTimeseriesAsync(req); |
| if (_debugMode) |
| { |
| _logger.LogInformation("creating multiple time series {0}, server message is {1}", tsPathLst, status.Message); |
| } |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when creating multiple time series", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> DeleteTimeSeriesAsync(List<string> pathList) |
| { |
| var client = _clients.Take(); |
| |
| try |
| { |
| var status = await client.ServiceClient.deleteTimeseriesAsync(client.SessionId, pathList); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("deleting multiple time series {0}, server message is {1}", pathList, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| try |
| { |
| var status = await client.ServiceClient.deleteTimeseriesAsync(client.SessionId, pathList); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("deleting multiple time series {0}, server message is {1}", pathList, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when deleting multiple time series", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> DeleteTimeSeriesAsync(string tsPath) |
| { |
| return await DeleteTimeSeriesAsync(new List<string> { tsPath }); |
| } |
| |
| public async Task<bool> CheckTimeSeriesExistsAsync(string tsPath) |
| { |
| // TBD by dalong |
| try |
| { |
| var sql = "SHOW TIMESERIES " + tsPath; |
| var sessionDataset = await ExecuteQueryStatementAsync(sql); |
| return sessionDataset.HasNext(); |
| } |
| catch (TException e) |
| { |
| throw new TException("could not check if certain time series exists", e); |
| } |
| } |
| |
| public async Task<int> DeleteDataAsync(List<string> tsPathLst, long startTime, long endTime) |
| { |
| var client = _clients.Take(); |
| var req = new TSDeleteDataReq(client.SessionId, tsPathLst, startTime, endTime); |
| |
| try |
| { |
| var status = await client.ServiceClient.deleteDataAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation( |
| "delete data from {0}, server message is {1}", |
| tsPathLst, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.deleteDataAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation( |
| "delete data from {0}, server message is {1}", |
| tsPathLst, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when deleting data", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> InsertRecordAsync(string deviceId, RowRecord record) |
| { |
| // TBD by Luzhan |
| var client = _clients.Take(); |
| var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), |
| record.Timestamps); |
| try |
| { |
| var status = await client.ServiceClient.insertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting record", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<int> InsertAlignedRecordAsync(string deviceId, RowRecord record) |
| { |
| var client = _clients.Take(); |
| var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), |
| record.Timestamps); |
| req.IsAligned = true; |
| // ASSERT that the insert plan is aligned |
| System.Diagnostics.Debug.Assert(req.IsAligned == true); |
| try |
| { |
| var status = await client.ServiceClient.insertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting record", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public TSInsertStringRecordReq GenInsertStrRecordReq(string deviceId, List<string> measurements, |
| List<string> values, long timestamp, long sessionId) |
| { |
| if (values.Count() != measurements.Count()) |
| { |
| throw new TException("length of data types does not equal to length of values!", null); |
| } |
| |
| return new TSInsertStringRecordReq(sessionId, deviceId, measurements, values, timestamp); |
| } |
| |
| public TSInsertRecordsReq GenInsertRecordsReq(List<string> deviceId, List<RowRecord> rowRecords, |
| long sessionId) |
| { |
| //TODO |
| var measurementLst = rowRecords.Select(x => x.Measurements).ToList(); |
| var timestampLst = rowRecords.Select(x => x.Timestamps).ToList(); |
| var valuesLstInBytes = rowRecords.Select(row => row.ToBytes()).ToList(); |
| |
| return new TSInsertRecordsReq(sessionId, deviceId, measurementLst, valuesLstInBytes, timestampLst); |
| } |
| |
| public async Task<int> InsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords) |
| { |
| var client = _clients.Take(); |
| |
| var request = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsAsync(request); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| request.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsAsync(request); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting records", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<int> InsertAlignedRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords) |
| { |
| var client = _clients.Take(); |
| |
| var request = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId); |
| request.IsAligned = true; |
| // ASSERT that the insert plan is aligned |
| System.Diagnostics.Debug.Assert(request.IsAligned == true); |
| |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsAsync(request); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| request.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsAsync(request); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting records", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public TSInsertTabletReq GenInsertTabletReq(Tablet tablet, long sessionId) |
| { |
| return new TSInsertTabletReq( |
| sessionId, |
| tablet.DeviceId, |
| tablet.Measurements, |
| tablet.GetBinaryValues(), |
| tablet.GetBinaryTimestamps(), |
| tablet.GetDataTypes(), |
| tablet.RowNumber); |
| } |
| |
| public async Task<int> InsertTabletAsync(Tablet tablet) |
| { |
| var client = _clients.Take(); |
| var req = GenInsertTabletReq(tablet, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.insertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting tablet", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<int> InsertAlignedTabletAsync(Tablet tablet) |
| { |
| var client = _clients.Take(); |
| var req = GenInsertTabletReq(tablet, client.SessionId); |
| req.IsAligned = true; |
| |
| try |
| { |
| var status = await client.ServiceClient.insertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one aligned tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one aligned tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting tablet", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| |
| public TSInsertTabletsReq GenInsertTabletsReq(List<Tablet> tabletLst, long sessionId) |
| { |
| var deviceIdLst = new List<string>(); |
| var measurementsLst = new List<List<string>>(); |
| var valuesLst = new List<byte[]>(); |
| var timestampsLst = new List<byte[]>(); |
| var typeLst = new List<List<int>>(); |
| var sizeLst = new List<int>(); |
| |
| foreach (var tablet in tabletLst) |
| { |
| var dataTypeValues = tablet.GetDataTypes(); |
| deviceIdLst.Add(tablet.DeviceId); |
| measurementsLst.Add(tablet.Measurements); |
| valuesLst.Add(tablet.GetBinaryValues()); |
| timestampsLst.Add(tablet.GetBinaryTimestamps()); |
| typeLst.Add(dataTypeValues); |
| sizeLst.Add(tablet.RowNumber); |
| } |
| |
| return new TSInsertTabletsReq( |
| sessionId, |
| deviceIdLst, |
| measurementsLst, |
| valuesLst, |
| timestampsLst, |
| typeLst, |
| sizeLst); |
| } |
| |
| public async Task<int> InsertTabletsAsync(List<Tablet> tabletLst) |
| { |
| var client = _clients.Take(); |
| var req = GenInsertTabletsReq(tabletLst, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.insertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting tablets", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> InsertAlignedTabletsAsync(List<Tablet> tabletLst) |
| { |
| var client = _clients.Take(); |
| var req = GenInsertTabletsReq(tabletLst, client.SessionId); |
| req.IsAligned = true; |
| |
| try |
| { |
| var status = await client.ServiceClient.insertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple aligned tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple aligned tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting tablets", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> InsertRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList(); |
| return await InsertRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords); |
| } |
| public async Task<int> InsertAlignedRecordsOfOneDeviceAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| var sortedRowRecords = rowRecords.OrderBy(x => x.Timestamps).ToList(); |
| return await InsertAlignedRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords); |
| } |
| |
| private TSInsertRecordsOfOneDeviceReq GenInsertRecordsOfOneDeviceRequest( |
| string deviceId, |
| List<RowRecord> records, |
| long sessionId) |
| { |
| var values = records.Select(row => row.ToBytes()); |
| var measurementsLst = records.Select(x => x.Measurements).ToList(); |
| var timestampLst = records.Select(x => x.Timestamps).ToList(); |
| |
| return new TSInsertRecordsOfOneDeviceReq( |
| sessionId, |
| deviceId, |
| measurementsLst, |
| values.ToList(), |
| timestampLst); |
| } |
| |
| public async Task<int> InsertRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| var client = _clients.Take(); |
| |
| var timestampLst = rowRecords.Select(x => x.Timestamps).ToList(); |
| |
| if (!_utilFunctions.IsSorted(timestampLst)) |
| { |
| throw new TException("insert records of one device error: timestamp not sorted", null); |
| } |
| |
| var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting records of one device", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<int> InsertAlignedRecordsOfOneDeviceSortedAsync(string deviceId, List<RowRecord> rowRecords) |
| { |
| var client = _clients.Take(); |
| |
| var timestampLst = rowRecords.Select(x => x.Timestamps).ToList(); |
| |
| if (!_utilFunctions.IsSorted(timestampLst)) |
| { |
| throw new TException("insert records of one device error: timestamp not sorted", null); |
| } |
| |
| var req = GenInsertRecordsOfOneDeviceRequest(deviceId, rowRecords, client.SessionId); |
| req.IsAligned = true; |
| |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert aligned records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert aligned records of one device, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when inserting aligned records of one device", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> TestInsertRecordAsync(string deviceId, RowRecord record) |
| { |
| var client = _clients.Take(); |
| |
| var req = new TSInsertRecordReq( |
| client.SessionId, |
| deviceId, |
| record.Measurements, |
| record.ToBytes(), |
| record.Timestamps); |
| |
| try |
| { |
| var status = await client.ServiceClient.testInsertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.testInsertRecordAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when test inserting one record", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> TestInsertRecordsAsync(List<string> deviceId, List<RowRecord> rowRecords) |
| { |
| var client = _clients.Take(); |
| var req = GenInsertRecordsReq(deviceId, rowRecords, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.testInsertRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.testInsertRecordsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when test inserting multiple records", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> TestInsertTabletAsync(Tablet tablet) |
| { |
| var client = _clients.Take(); |
| |
| var req = GenInsertTabletReq(tablet, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.testInsertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.testInsertTabletAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, |
| status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when test inserting one tablet", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> TestInsertTabletsAsync(List<Tablet> tabletLst) |
| { |
| var client = _clients.Take(); |
| |
| var req = GenInsertTabletsReq(tabletLst, client.SessionId); |
| |
| try |
| { |
| var status = await client.ServiceClient.testInsertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.testInsertTabletsAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("insert multiple tablets, message: {0}", status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when test inserting multiple tablets", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) |
| { |
| TSExecuteStatementResp resp; |
| TSStatus status; |
| var client = _clients.Take(); |
| var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId) |
| { |
| FetchSize = _fetchSize |
| }; |
| try |
| { |
| resp = await client.ServiceClient.executeQueryStatementAsync(req); |
| status = resp.Status; |
| } |
| catch (TException e) |
| { |
| _clients.Add(client); |
| |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| req.StatementId = client.StatementId; |
| try |
| { |
| resp = await client.ServiceClient.executeQueryStatementAsync(req); |
| status = resp.Status; |
| } |
| catch (TException ex) |
| { |
| _clients.Add(client); |
| throw new TException("Error occurs when executing query statement", ex); |
| } |
| } |
| |
| if (_utilFunctions.VerifySuccess(status, SuccessCode) == -1) |
| { |
| _clients.Add(client); |
| |
| throw new TException("execute query failed", null); |
| } |
| |
| _clients.Add(client); |
| |
| var sessionDataset = new SessionDataSet(sql, resp, _clients) |
| { |
| FetchSize = _fetchSize |
| }; |
| |
| return sessionDataset; |
| } |
| |
| public async Task<int> ExecuteNonQueryStatementAsync(string sql) |
| { |
| var client = _clients.Take(); |
| var req = new TSExecuteStatementReq(client.SessionId, sql, client.StatementId); |
| |
| try |
| { |
| var resp = await client.ServiceClient.executeUpdateStatementAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("execute non-query statement {0} message: {1}", sql, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| req.StatementId = client.StatementId; |
| try |
| { |
| var resp = await client.ServiceClient.executeUpdateStatementAsync(req); |
| var status = resp.Status; |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("execute non-query statement {0} message: {1}", sql, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when executing non-query statement", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> CreateSchemaTemplateAsync(Template template) |
| { |
| var client = _clients.Take(); |
| var req = new TSCreateSchemaTemplateReq(client.SessionId, template.Name, template.ToBytes()); |
| try |
| { |
| var status = await client.ServiceClient.createSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("create schema template {0} message: {1}", template.Name, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.createSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("create schema template {0} message: {1}", template.Name, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when creating schema template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> DropSchemaTemplateAsync(string templateName) |
| { |
| var client = _clients.Take(); |
| var req = new TSDropSchemaTemplateReq(client.SessionId, templateName); |
| try |
| { |
| var status = await client.ServiceClient.dropSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("drop schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.dropSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("drop schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when dropping schema template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> SetSchemaTemplateAsync(string templateName, string prefixPath) |
| { |
| var client = _clients.Take(); |
| var req = new TSSetSchemaTemplateReq(client.SessionId, templateName, prefixPath); |
| try |
| { |
| var status = await client.ServiceClient.setSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("set schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.setSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("set schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when setting schema template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> UnsetSchemaTemplateAsync(string prefixPath, string templateName) |
| { |
| var client = _clients.Take(); |
| var req = new TSUnsetSchemaTemplateReq(client.SessionId, prefixPath, templateName); |
| try |
| { |
| var status = await client.ServiceClient.unsetSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("unset schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.unsetSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("unset schema template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when unsetting schema template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<int> AddAlignedMeasurementsInTemplateAsync(string templateName, List<MeasurementNode> measurementNodes) |
| { |
| var client = _clients.Take(); |
| var measurements = measurementNodes.ConvertAll(m => m.Name); |
| var dataTypes = measurementNodes.ConvertAll(m => (int)m.DataType); |
| var encodings = measurementNodes.ConvertAll(m => (int)m.Encoding); |
| var compressors = measurementNodes.ConvertAll(m => (int)m.Compressor); |
| var req = new TSAppendSchemaTemplateReq(client.SessionId, templateName, true, measurements, dataTypes, encodings, compressors); |
| try |
| { |
| var status = await client.ServiceClient.appendSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("add aligned measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.appendSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("add aligned measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when adding aligned measurements in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> AddUnalignedMeasurementsInTemplateAsync(string templateName, List<MeasurementNode> measurementNodes) |
| { |
| var client = _clients.Take(); |
| var measurements = measurementNodes.ConvertAll(m => m.Name); |
| var dataTypes = measurementNodes.ConvertAll(m => (int)m.DataType); |
| var encodings = measurementNodes.ConvertAll(m => (int)m.Encoding); |
| var compressors = measurementNodes.ConvertAll(m => (int)m.Compressor); |
| var req = new TSAppendSchemaTemplateReq(client.SessionId, templateName, false, measurements, dataTypes, encodings, compressors); |
| try |
| { |
| var status = await client.ServiceClient.appendSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("add unaligned measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.appendSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("add unaligned measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when adding unaligned measurements in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<int> DeleteNodeInTemplateAsync(string templateName, string path) |
| { |
| var client = _clients.Take(); |
| var req = new TSPruneSchemaTemplateReq(client.SessionId, templateName, path); |
| try |
| { |
| var status = await client.ServiceClient.pruneSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete node in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var status = await client.ServiceClient.pruneSchemaTemplateAsync(req); |
| |
| if (_debugMode) |
| { |
| _logger.LogInformation("delete node in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| return _utilFunctions.VerifySuccess(status, SuccessCode); |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when deleting node in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<int> CountMeasurementsInTemplateAsync(string name) |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, name, (int)TemplateQueryType.COUNT_MEASUREMENTS); |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("count measurements in template {0} message: {1}", name, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Count; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("count measurements in template {0} message: {1}", name, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Count; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when counting measurements in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<bool> IsMeasurementInTemplateAsync(string templateName, string path) |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.IS_MEASUREMENT); |
| req.Measurement = path; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("is measurement in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Result; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("is measurement in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Result; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when checking measurement in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<bool> IsPathExistInTemplate(string templateName, string path) |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.PATH_EXIST); |
| req.Measurement = path; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("is path exist in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Result; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("is path exist in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Result; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when checking path exist in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| |
| public async Task<List<string>> ShowMeasurementsInTemplateAsync(string templateName, string pattern = "") |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_MEASUREMENTS); |
| req.Measurement = pattern; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get measurements in template {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when getting measurements in template", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<List<string>> ShowAllTemplatesAsync() |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, "", (int)TemplateQueryType.SHOW_TEMPLATES); |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get all templates message: {0}", status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get all templates message: {0}", status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when getting all templates", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<List<string>> ShowPathsTemplateSetOnAsync(string templateName) |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_SET_TEMPLATES); |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get paths template set on {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get paths template set on {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when getting paths template set on", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| } |
| public async Task<List<string>> ShowPathsTemplateUsingOnAsync(string templateName) |
| { |
| var client = _clients.Take(); |
| var req = new TSQueryTemplateReq(client.SessionId, templateName, (int)TemplateQueryType.SHOW_USING_TEMPLATES); |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get paths template using on {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException e) |
| { |
| await Open(_enableRpcCompression); |
| client = _clients.Take(); |
| req.SessionId = client.SessionId; |
| try |
| { |
| var resp = await client.ServiceClient.querySchemaTemplateAsync(req); |
| var status = resp.Status; |
| if (_debugMode) |
| { |
| _logger.LogInformation("get paths template using on {0} message: {1}", templateName, status.Message); |
| } |
| |
| _utilFunctions.VerifySuccess(status, SuccessCode); |
| return resp.Measurements; |
| } |
| catch (TException ex) |
| { |
| throw new TException("Error occurs when getting paths template using on", ex); |
| } |
| } |
| finally |
| { |
| _clients.Add(client); |
| } |
| }
|
|
|
| protected virtual void Dispose(bool disposing)
|
| {
|
| if (!disposedValue)
|
| {
|
| if (disposing)
|
| {
|
| #if NET461_OR_GREATER || NETSTANDARD2_0
|
| #else
|
| _clients.ClientQueue.Clear();
|
| #endif
|
| }
|
| _clients = null;
|
| disposedValue = true;
|
| }
|
| }
|
|
|
| public void Dispose()
|
| {
|
| Dispose(disposing: true);
|
| GC.SuppressFinalize(this);
|
| }
|
| } |
| } |