本功能支持用户以 OPC UA 协议从 IoTDB 中订阅数据,订阅数据的通信模式支持 Client/Server 和 Pub/Sub 两种。
注意:本功能并非从外部 OPC Server 中采集数据写入 IoTDB
启动 OPC UA 协议的语法:
create pipe p1 with source (...) with processor (...) with sink ('sink' = 'opc-ua-sink', 'sink.opcua.tcp.port' = '12686', 'sink.opcua.https.port' = '8443', 'sink.user' = 'root', 'sink.password' = 'root', 'sink.opcua.security.dir' = '...' )
| 参数 | 描述 | 取值范围 | 是否必填 | 默认值 |
|---|---|---|---|---|
| sink | OPC UA SINK | String: opc-ua-sink | 必填 | |
| sink.opcua.model | OPC UA 使用的模式 | String: client-server / pub-sub | 选填 | pub-sub |
| sink.opcua.tcp.port | OPC UA 的 TCP 端口 | Integer: [0, 65536] | 选填 | 12686 |
| sink.opcua.https.port | OPC UA 的 HTTPS 端口 | Integer: [0, 65536] | 选填 | 8443 |
| sink.opcua.security.dir | OPC UA 的密钥及证书目录 | String: Path,支持绝对及相对目录 | 选填 | iotdb 相关 DataNode 的 conf 目录下的 opc_security 文件夹 /<httpsPort:tcpPort>。如无 iotdb 的 conf 目录(例如 IDEA 中启动 DataNode),则为用户主目录下的 iotdb_opc_security 文件夹 / <httpsPort:tcpPort> |
| sink.opcua.enable-anonymous-access | OPC UA 是否允许匿名访问 | Boolean | 选填 | true |
| sink.user | 用户,这里指 OPC UA 的允许用户 | String | 选填 | root |
| sink.password | 密码,这里指 OPC UA 的允许密码 | String | 选填 | root |
create pipe p1 with sink ('sink' = 'opc-ua-sink', 'sink.user' = 'root', 'sink.password' = 'root'); start pipe p1;
在这种模式下,IoTDB 的流处理引擎通过 OPC UA Sink 与 OPC UA 服务器(Server)建立连接。OPC UA 服务器在其地址空间(Address Space) 中维护数据,IoTDB可以请求并获取这些数据。同时,其他OPC UA客户端(Client)也能访问服务器上的数据。
此处以UAExpert客户端为例,下载 UAExpert 客户端:https://www.unified-automation.com/downloads/opc-ua-clients.html
安装 UAExpert,填写自身的证书等信息。
create pipe p1 with sink ('sink'='opc-ua-sink');
insert into root.test.db(time, s2) values(now(), 2)
此处自动创建元数据开启。
在这种模式下,IoTDB的流处理引擎通过 OPC UA Sink 向OPC UA 服务器(Server)发送数据变更事件。这些事件被发布到服务器的消息队列中,并通过事件节点 (Event Node) 进行管理。其他OPC UA客户端(Client)可以订阅这些事件节点,以便在数据变更时接收通知。
特性:
每个测点会被 OPC UA 包装成一个事件节点(EventNode)。
相关字段及其对应含义如下:
| 字段 | 含义 | 类型(Milo) | 示例 |
|---|---|---|---|
| Time | 时间戳 | DateTime | 1698907326198 |
| SourceName | 测点对应完整路径 | String | root.test.opc.sensor0 |
| SourceNode | 测点数据类型 | NodeId | Int32 |
| Message | 数据 | LocalizedText | 3.0 |
Event 仅会发送给所有已经监听的客户端,客户端未连接则会忽略该 Event。
如果数据被删除,信息则无法推送给客户端。
该代码位于 iotdb-example 包下的 opc-ua-sink 文件夹中
代码中包含:
使用步骤为:
insert into root.a.b(time, c, d) values(now(), 1, 2);
此处自动创建元数据开启。
create pipe p1 with sink ('sink'='opc-ua-sink', 'sink.opcua.model'='pub-sub'); start pipe p1;
此时能看到服务器的 conf 目录下创建了 opc 证书相关的目录。
打开 Client,此时建立双向信任成功, Client 能够连接到服务器。
向服务器中写入数据,此时 Client 中能够打印出收到的数据。
单机与集群:建议使用1C1D单机版,如果集群中有多个 DataNode,可能数据会分散发送在各个 DataNode 上,无法收听到全量数据。
无需操作根目录下证书:在证书操作过程中,无需操作 IoTDB security 根目录下的 iotdb-server.pfx 证书和 client security 目录下的 example-client.pfx 目录。Client 和 Server 双向连接时,会将根目录下的证书发给对方,对方如果第一次看见此证书,就会放入 reject dir,如果该证书在 trusted/certs 里面,则能够信任对方。
建议使用 Java 17+:在 JVM 8 的版本中,可能会存在密钥长度限制,报 Illegal key size 错误。对于特定版本(如 jdk.1.8u151+),可以在 ClientExampleRunner 的 create client 里加入 Security.setProperty("crypto.policy", "unlimited"); 解决,也可以下载无限制的包 local_policy.jar 与 US_export_policy 解决替换 JDK/jre/lib/security 目录下的包解决,下载网址:https://www.oracle.com/java/technologies/javase-jce8-downloads.html。