OPC UA 协议

OPC UA 订阅数据

本功能支持用户以 OPC UA 协议从 IoTDB 中订阅数据,订阅数据的通信模式支持 Client/Server 和 Pub/Sub 两种。

注意:本功能并非从外部 OPC Server 中采集数据写入 IoTDB

OPC 服务启动方式

语法

启动 OPC UA 协议的语法:

create pipe p1 
    with source (...) 
    with processor (...) 
    with sink ('sink' = 'opc-ua-sink', 
               'sink.opcua.tcp.port' = '12686', 
               'sink.opcua.https.port' = '8443', 
               'sink.user' = 'root', 
               'sink.password' = 'root', 
               'sink.opcua.security.dir' = '...'
              )

参数

参数描述取值范围是否必填默认值
sinkOPC UA SINKString: opc-ua-sink必填
sink.opcua.modelOPC UA 使用的模式String: client-server / pub-sub选填pub-sub
sink.opcua.tcp.portOPC UA 的 TCP 端口Integer: [0, 65536]选填12686
sink.opcua.https.portOPC UA 的 HTTPS 端口Integer: [0, 65536]选填8443
sink.opcua.security.dirOPC UA 的密钥及证书目录String: Path,支持绝对及相对目录选填iotdb 相关 DataNode 的 conf 目录下的 opc_security 文件夹 /<httpsPort:tcpPort>
如无 iotdb 的 conf 目录(例如 IDEA 中启动 DataNode),则为用户主目录下的 iotdb_opc_security 文件夹 /<httpsPort:tcpPort>
sink.opcua.enable-anonymous-accessOPC UA 是否允许匿名访问Boolean选填true
sink.user用户,这里指 OPC UA 的允许用户String选填root
sink.password密码,这里指 OPC UA 的允许密码String选填root

示例

create pipe p1 
    with sink ('sink' = 'opc-ua-sink'
               'sink.user' = 'root', 
               'sink.password' = 'root');
start pipe p1;

使用限制

  1. 启动协议之后需要写入数据,才能建立连接,且仅能订阅建立连接之后的数据。
  2. 推荐在单机模式下使用。在分布式模式下,每一个 IoTDB DataNode 都作为一个独立的 OPC Server 提供数据,需要单独订阅。

两种通信模式示例

Client / Server 模式

在这种模式下,IoTDB 的流处理引擎通过 OPC UA Sink 与 OPC UA 服务器(Server)建立连接。OPC UA 服务器在其地址空间(Address Space) 中维护数据,IoTDB可以请求并获取这些数据。同时,其他OPC UA客户端(Client)也能访问服务器上的数据。

  • 特性:
    • OPC UA 将从 Sink 收到的设备信息,按照树形模型整理到 Objects folder 下的文件夹中。
    • 每个测点都被记录为一个变量节点,并记录当前数据库中的最新值。
    • OPC UA 无法删除数据或者改变数据类型的设置

准备工作

  1. 此处以UAExpert客户端为例,下载 UAExpert 客户端:https://www.unified-automation.com/downloads/opc-ua-clients.html

  2. 安装 UAExpert,填写自身的证书等信息。

快速开始

  1. 使用如下 sql,创建并启动 client-server 模式的 OPC UA Sink。详细语法参见上文:IoTDB OPC Server语法
create pipe p1 with sink ('sink'='opc-ua-sink');
  1. 写入部分数据。
insert into root.test.db(time, s2) values(now(), 2)

​ 此处自动创建元数据开启。

  1. 在 UAExpert 中配置 iotdb 的连接,其中 password 填写为上述参数配置中 sink.password 中设定的密码(此处以默认密码root为例):
  1. 信任服务器的证书后,在左侧 Objects folder 即可看到写入的数据。
  1. 可以将左侧节点拖动到中间,并展示该节点的最新值:

Pub / Sub 模式

在这种模式下,IoTDB的流处理引擎通过 OPC UA Sink 向OPC UA 服务器(Server)发送数据变更事件。这些事件被发布到服务器的消息队列中,并通过事件节点 (Event Node) 进行管理。其他OPC UA客户端(Client)可以订阅这些事件节点,以便在数据变更时接收通知。

  • 特性:

    • 每个测点会被 OPC UA 包装成一个事件节点(EventNode)。

    • 相关字段及其对应含义如下:

      字段含义类型(Milo)示例
      Time时间戳DateTime1698907326198
      SourceName测点对应完整路径Stringroot.test.opc.sensor0
      SourceNode测点数据类型NodeIdInt32
      Message数据LocalizedText3.0
    • Event 仅会发送给所有已经监听的客户端,客户端未连接则会忽略该 Event。

    • 如果数据被删除,信息则无法推送给客户端。

准备工作

该代码位于 iotdb-example 包下的 opc-ua-sink 文件夹

代码中包含:

  • 主类(ClientTest)
  • Client 证书相关的逻辑(IoTDBKeyStoreLoaderClient)
  • Client 的配置及启动逻辑(ClientExampleRunner)
  • ClientTest 的父类(ClientExample)

快速开始

使用步骤为:

  1. 打开 IoTDB 并写入部分数据。
insert into root.a.b(time, c, d) values(now(), 1, 2);

​ 此处自动创建元数据开启。

  1. 使用如下 sql,创建并启动 Pub-Sub 模式的 OPC UA Sink。详细语法参见上文:IoTDB OPC Server语法
create pipe p1 with sink ('sink'='opc-ua-sink', 
                          'sink.opcua.model'='pub-sub');
start pipe p1;

​ 此时能看到服务器的 conf 目录下创建了 opc 证书相关的目录。

  1. 直接运行 Client 连接,此时 Client 证书被服务器拒收。
  1. 进入服务器的 sink.opcua.security.dir 目录下,进入 pki 的 rejected 目录,此时 Client 的证书应该已经在该目录下生成。
  1. 将客户端的证书移入(不是复制) 同目录下 trusted 目录的 certs 文件夹中。
  1. 再次打开 Client 连接,此时服务器的证书应该被 Client 拒收。
  1. 进入客户端的 <java.io.tmpdir>/client/security 目录下,进入 pki 的 rejected 目录,将服务器的证书移入(不是复制)trusted 目录。
  1. 打开 Client,此时建立双向信任成功, Client 能够连接到服务器。

  2. 向服务器中写入数据,此时 Client 中能够打印出收到的数据。

注意事项

  1. 单机与集群:建议使用1C1D单机版,如果集群中有多个 DataNode,可能数据会分散发送在各个 DataNode 上,无法收听到全量数据。

  2. 无需操作根目录下证书:在证书操作过程中,无需操作 IoTDB security 根目录下的 iotdb-server.pfx 证书和 client security 目录下的 example-client.pfx 目录。Client 和 Server 双向连接时,会将根目录下的证书发给对方,对方如果第一次看见此证书,就会放入 reject dir,如果该证书在 trusted/certs 里面,则能够信任对方。

  3. 建议使用 Java 17+:在 JVM 8 的版本中,可能会存在密钥长度限制,报 Illegal key size 错误。对于特定版本(如 jdk.1.8u151+),可以在 ClientExampleRunner 的 create client 里加入 Security.setProperty("crypto.policy", "unlimited"); 解决,也可以下载无限制的包 local_policy.jarUS_export_policy 解决替换 JDK/jre/lib/security 目录下的包解决,下载网址:https://www.oracle.com/java/technologies/javase-jce8-downloads.html。