_username
:用户名_password
:密码_zoneId
:时区_host
:主机ip_port
:端口号_fetchSize
:单次请求数据大小_poolSize
:线程池大小(默认为4)该数据结构对客户端连接进行了封装,维护一个客户端实例TSIService.Client、线程码SessionId与状态码StatementId以及分帧传输流TFramedTransport。
public Client(TSIService.Client client, long sessionId, long statementId, TFramedTransport transport) { ServiceClient = client; SessionId = sessionId; StatementId = statementId; Transport = transport; }
该数据结构封装了一个高效的线程安全队列,用于维护客户端与服务器的多个Client连接。
public void Add(Client client) { Monitor.Enter(ClientQueue); ClientQueue.Enqueue(client); Monitor.Pulse(ClientQueue); Monitor.Exit(ClientQueue); }
通过System.Threading.Monitor类实现多线程对ConcurrentQueue的同步访问,以确保数据的安全性。
当请求发生时,系统会在ConcurrentClientQueue中寻找一个空闲的Client连接,即调用 ConcurrentClientQueue 的**Take()**函数,代码如下:
public Client Take() { Monitor.Enter(ClientQueue); if (ClientQueue.IsEmpty) { Monitor.Wait(ClientQueue); } ClientQueue.TryDequeue(out var client); Monitor.Exit(ClientQueue); return client; }
如果请求时ConcurrentClientQueue中没有空闲Client连接时,系统会调用 Monitor 类中的 Wait() 方法让线程等待,直到队列不为空时,弹出空闲Client连接。
获取到空闲Client连接后,系统便在此连接上进行数据操作,示例如下:
public async Task<int> InsertRecordAsync(string deviceId, RowRecord record) { var client = _clients.Take(); // 获取空闲的Client连接 var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), record.Timestamps); try { var status = await client.ServiceClient.insertRecordAsync(req); if (_debugMode) { _logger.Info("insert one record to device {0}, server message: {1}", deviceId, status.Message); } return _utilFunctions.verify_success(status, SuccessCode); } catch (TException e) { throw new TException("Record insertion failed", e); } finally { _clients.Add(client); } }
当操作结束后,系统会回收该空闲连接,通过 ConcurrentClientQueue.Add() 函数将该连接重新加入队列,并在在添加后会通过 Pulse() 方法通知其他处于等待状态的线程。考虑到操作过程中可能出现异常,所有操作都被放在try-catch块中,即使捕获到了异常也会将该Client连接放回队列中,防止连接丢失。
测试环境:
- 操作系统:macOS
- 处理器:2.3GHz 八核 Intel Core i9
- IoTDB版本:0.12.0
测试环境:
- 本地:
- 操作系统:macOS
- 处理器:2.3GHz 八核 Intel Core i9
- 服务器:
- IoTDB版本:0.12.1