blob: 95e6b29de7b96687ef8df647e4cb4e5a4c845849 [file] [log] [blame]
import{_ as o,r as e,o as c,c as l,b as n,d as s,a as p,e as a}from"./app-Bx8hKGcu.js";const i={},u=a(`<h1 id="flink-sql-iotdb-connector" tabindex="-1"><a class="header-anchor" href="#flink-sql-iotdb-connector"><span>flink-sql-iotdb-connector</span></a></h1><p>flink-sql-iotdb-connector 将 Flink SQL 或者 Flink Table 与 IoTDB 无缝衔接了起来,使得在 Flink 的任务中可以对 IoTDB 进行实时读写,具体可以应用到如下场景中:</p><ol><li>实时数据同步:将数据从一个数据库实时同步到另一个数据库。</li><li>实时数据管道:构建实时数据处理管道,处理和分析数据库中的数据。</li><li>实时数据分析:实时分析数据库中的数据,提供实时的业务洞察。</li><li>实时应用:将数据库中的数据实时应用于实时应用程序,如实时报表、实时推荐等。</li><li>实时监控:实时监控数据库中的数据,检测异常和错误。</li></ol><h2 id="读写模式" tabindex="-1"><a class="header-anchor" href="#读写模式"><span>读写模式</span></a></h2><table><thead><tr><th>读模式(Source)</th><th>写模式(Sink)</th></tr></thead><tbody><tr><td>Bounded Scan, Lookup, CDC</td><td>Streaming Sink, Batch Sink</td></tr></tbody></table><h3 id="读模式-source" tabindex="-1"><a class="header-anchor" href="#读模式-source"><span>读模式(Source)</span></a></h3><ul><li><p><strong>Bounded Scan:</strong> bounded scan 的主要实现方式是通过指定 <code>时间序列</code> 以及 <code>查询条件的上下界(可选)</code>来进行查询,并且查询结果通常为多行数据。这种查询无法获取到查询之后更新的数据。</p></li><li><p><strong>Lookup:</strong> lookup 查询模式与 scan 查询模式不同,bounded scan 是对一个时间范围内的数据进行查询,而 <code>lookup</code> 查询只会对一个精确的时间点进行查询,所以查询结果只有一行数据。另外只有 <code>lookup join</code> 的右表才能使用 lookup 查询模式。</p></li><li><p><strong>CDC:</strong> 主要用于 Flink 的 <code>ETL</code> 任务当中。当 IoTDB 中的数据发生变化时,flink 会通过我们提供的 <code>CDC connector</code> 感知到,我们可以将感知到的变化数据转发给其他的外部数据源,以此达到 ETL 的目的。</p></li></ul><h3 id="写模式-sink" tabindex="-1"><a class="header-anchor" href="#写模式-sink"><span>写模式(Sink)</span></a></h3><ul><li><p><strong>Streaming sink:</strong> 用于 Flink 的 streaming mode 中,会将 Flink 中 Dynamic Table 的增删改记录实时的同步到 IoTDB 中。</p></li><li><p><strong>Batch sink:</strong> 用于 Flink 的 batch mode 中,用于将 Flink 的批量计算结果一次性写入 IoTDB 中。</p></li></ul><h2 id="使用方式" tabindex="-1"><a class="header-anchor" href="#使用方式"><span>使用方式</span></a></h2><p>我们提供的 flink-sql-iotdb-connector 总共提供两种使用方式,一种是在项目开发过程中通过 Maven 的方式引用,另外一种是在 Flink 的 sql-client 中使用。我们将分别介绍这两种使用方式。</p><blockquote><p>📌注:flink 版本要求 1.17.0 及以上</p></blockquote><h3 id="maven" tabindex="-1"><a class="header-anchor" href="#maven"><span>Maven</span></a></h3><p>我们只需要在项目的 pom 文件中添加以下依赖即可:</p><div class="language-xml line-numbers-mode" data-ext="xml" data-title="xml"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.iotdb<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>flink-sql-iotdb-connector<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>\${iotdb.version}<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="sql-client" tabindex="-1"><a class="header-anchor" href="#sql-client"><span>sql-client</span></a></h3><p>如果需要在 sql-client 中使用 flink-sql-iotdb-connector,先通过以下步骤来配置环境:</p>`,17),d={href:"https://iotdb.apache.org/Download/",target:"_blank",rel:"noopener noreferrer"},k=n("li",null,[n("p",null,[s("将 jar 包拷贝到 "),n("code",null,"$FLINK_HOME/lib"),s(" 目录下。")])],-1),r=n("li",null,[n("p",null,"启动 Flink 集群。")],-1),m=n("li",null,[n("p",null,"启动 sql-client。")],-1),v=a(`<p>此时就可以在 sql-client 中使用 flink-sql-iotdb-connector 了。</p><h2 id="表结构规范" tabindex="-1"><a class="header-anchor" href="#表结构规范"><span>表结构规范</span></a></h2><p>无论使用哪种类型的连接器,都需要满足以下的表结构规范:</p><ul><li>所有使用 <code>IoTDB connector</code> 的表,第一列的列名必须是 <code>Time_</code>,而且数据类型必须是 <code>BIGINT</code> 类型。</li><li>除了 <code>Time_</code> 列以外的列名必须以 <code>root.</code> 开头。另外列名中的任意节点不能是纯数字,如果有纯数字,或者其他非法字符,必须使用反引号扩起来。比如:路径 root.sg.d0.123 是一个非法路径,但是 root.sg.d0.\`123\` 就是一个合法路径。</li><li>无论使用 <code>pattern</code> 或者 <code>sql</code> 从 IoTDB 中查询数据,查询结果的时间序列名需要包含 Flink 中除了 <code>Time_</code> 以外的所有列名。如果没有查询结果中没有相应的列名,则该列将用 null 去填充。</li><li>flink-sql-iotdb-connector 中支持的数据类型有:<code>INT</code>, <code>BIGINT</code>, <code>FLOAT</code>, <code>DOUBLE</code>, <code>BOOLEAN</code>, <code>STRING</code>。Flink Table 中每一列的数据类型与其 IoTDB 中对应的时间序列类型都要匹配上,否则将会报错,并退出 Flink 任务。</li></ul><p>以下用几个例子来说明 IoTDB 中的时间序列与 Flink Table 中列的对应关系。</p><h2 id="读模式-source-1" tabindex="-1"><a class="header-anchor" href="#读模式-source-1"><span>读模式(Source)</span></a></h2><h3 id="scan-table-bounded" tabindex="-1"><a class="header-anchor" href="#scan-table-bounded"><span>Scan Table (Bounded)</span></a></h3><h4 id="参数" tabindex="-1"><a class="header-anchor" href="#参数"><span>参数</span></a></h4><table><thead><tr><th>参数</th><th>必填</th><th>默认</th><th>类型</th><th>描述</th></tr></thead><tbody><tr><td>nodeUrls</td><td>否</td><td>127.0.0.1:6667</td><td>String</td><td>用来指定 IoTDB 的 datanode 地址,如果 IoTDB 是用集群模式搭建的话,可以指定多个地址,每个节点用逗号隔开。</td></tr><tr><td>user</td><td>否</td><td>root</td><td>String</td><td>IoTDB 用户名</td></tr><tr><td>password</td><td>否</td><td>root</td><td>String</td><td>IoTDB 密码</td></tr><tr><td>scan.bounded.lower-bound</td><td>否</td><td>-1L</td><td>Long</td><td>bounded 的 scan 查询时的时间戳下界(包括),参数大于<code>0</code>时有效。</td></tr><tr><td>scan.bounded.upper-bound</td><td>否</td><td>-1L</td><td>Long</td><td>bounded 的 scan 查询时的时间戳下界(包括),参数大于<code>0</code>时有效。</td></tr><tr><td>sql</td><td>是</td><td>无</td><td>String</td><td>用于在 IoTDB 端做查询。</td></tr></tbody></table><h4 id="示例" tabindex="-1"><a class="header-anchor" href="#示例"><span>示例</span></a></h4><p>该示例演示了如何在一个 Flink Table Job 中从 IoTDB 中通过<code>scan table</code>的方式读取数据:<br> 当前 IoTDB 中的数据如下:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.028s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token operator">*</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">BoundedScanTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token comment">// setup table environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// setup schema</span>
<span class="token class-name">Schema</span> iotdbTableSchema <span class="token operator">=</span>
<span class="token class-name">Schema</span><span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register table</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span>
<span class="token class-name">TableDescriptor</span><span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>iotdbTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;nodeUrls&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;127.0.0.1:6667&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;sql&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;select ** from root&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// output table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>执行完以上任务后,Flink 的控制台中输出的表如下:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | Time_ | root.sg.d0.s0 | root.sg.d1.s0 | root.sg.d1.s1 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 1 | 1.0833644 | 2.34874 | 1.2414109 |
| +I | 2 | 4.929185 | 3.1885583 | 4.6980085 |
| +I | 3 | 3.5206156 | 3.5600138 | 4.8080945 |
| +I | 4 | 1.3449302 | 2.8781595 | 3.3195343 |
| +I | 5 | 3.3079383 | 3.3840187 | 3.7278645 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="lookup-point" tabindex="-1"><a class="header-anchor" href="#lookup-point"><span>Lookup Point</span></a></h3><h4 id="参数-1" tabindex="-1"><a class="header-anchor" href="#参数-1"><span>参数</span></a></h4><table><thead><tr><th>参数</th><th>必填</th><th>默认</th><th>类型</th><th>描述</th></tr></thead><tbody><tr><td>nodeUrls</td><td>否</td><td>127.0.0.1:6667</td><td>String</td><td>用来指定 IoTDB 的 datanode 地址,如果 IoTDB 是用集群模式搭建的话,可以指定多个地址,每个节点用逗号隔开。</td></tr><tr><td>user</td><td>否</td><td>root</td><td>String</td><td>IoTDB 用户名</td></tr><tr><td>password</td><td>否</td><td>root</td><td>String</td><td>IoTDB 密码</td></tr><tr><td>lookup.cache.max-rows</td><td>否</td><td>-1</td><td>Integer</td><td>lookup 查询时,缓存表的最大行数,参数大于<code>0</code>时生效。</td></tr><tr><td>lookup.cache.ttl-sec</td><td>否</td><td>-1</td><td>Integer</td><td>lookup 查询时,单点数据的丢弃时间,单位为<code>秒</code>。</td></tr><tr><td>sql</td><td>是</td><td>无</td><td>String</td><td>用于在 IoTDB 端做查询。</td></tr></tbody></table><h4 id="示例-1" tabindex="-1"><a class="header-anchor" href="#示例-1"><span>示例</span></a></h4><p>该示例演示了如何将 IoTDB 中的<code>device</code>作为维度表进行<code>lookup</code>查询:</p><ul><li>使用 <code>datagen connector</code> 生成两个字段作为 <code>Lookup Join</code> 的左表。第一个字段为自增字段,用来表示时间戳。第二个字段为随机字段,用来表示一个<br> measurement 产生的时间序列。</li><li>通过 <code>IoTDB connector</code> 注册一个表作为 <code>Lookup Join</code> 的右表。</li><li>将两个表 join 起来。</li></ul><p>当前 IoTDB 中的数据如下:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.028s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">DataTypes</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">EnvironmentSettings</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Schema</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableDescriptor</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">LookupTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register left table</span>
<span class="token class-name">Schema</span> dataGenTableSchema <span class="token operator">=</span>
<span class="token class-name">Schema</span><span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">INT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> datagenDescriptor <span class="token operator">=</span>
<span class="token class-name">TableDescriptor</span><span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;datagen&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>dataGenTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.kind&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;sequence&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.start&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.end&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.s0.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.s0.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;leftTable&quot;</span><span class="token punctuation">,</span> datagenDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register right table</span>
<span class="token class-name">Schema</span> iotdbTableSchema <span class="token operator">=</span>
<span class="token class-name">Schema</span><span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span>
<span class="token class-name">TableDescriptor</span><span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>iotdbTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;sql&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;select ** from root&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;rightTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// join</span>
<span class="token class-name">String</span> sql <span class="token operator">=</span>
<span class="token string">&quot;SELECT l.Time_, l.s0,r.\`root.sg.d0.s0\`, r.\`root.sg.d1.s0\`, r.\`root.sg.d1.s1\`&quot;</span>
<span class="token operator">+</span> <span class="token string">&quot;FROM (select *,PROCTIME() as proc_time from leftTable) AS l &quot;</span>
<span class="token operator">+</span> <span class="token string">&quot;JOIN rightTable FOR SYSTEM_TIME AS OF l.proc_time AS r &quot;</span>
<span class="token operator">+</span> <span class="token string">&quot;ON l.Time_ = r.Time_&quot;</span><span class="token punctuation">;</span>
<span class="token comment">// output table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">sqlQuery</span><span class="token punctuation">(</span>sql<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>执行完以上任务后,Flink 的控制台中输出的表如下:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>+----+----------------------+-------------+---------------+----------------------+--------------------------------+
| op | Time_ | s0 | root.sg.d0.s0 | root.sg.d1.s0 | root.sg.d1.s1 |
+----+----------------------+-------------+---------------+----------------------+--------------------------------+
| +I | 5 | 1 | 3.3079383 | 3.3840187 | 3.7278645 |
| +I | 2 | 1 | 4.929185 | 3.1885583 | 4.6980085 |
| +I | 1 | 1 | 1.0833644 | 2.34874 | 1.2414109 |
| +I | 4 | 1 | 1.3449302 | 2.8781595 | 3.3195343 |
| +I | 3 | 1 | 3.5206156 | 3.5600138 | 4.8080945 |
+----+----------------------+-------------+---------------+----------------------+--------------------------------+
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="cdc" tabindex="-1"><a class="header-anchor" href="#cdc"><span>CDC</span></a></h3><h4 id="参数-2" tabindex="-1"><a class="header-anchor" href="#参数-2"><span>参数</span></a></h4>`,28),b=n("thead",null,[n("tr",null,[n("th",null,"参数"),n("th",null,"必填"),n("th",null,"默认"),n("th",null,"类型"),n("th",null,"描述")])],-1),g=n("tr",null,[n("td",null,"nodeUrls"),n("td",null,"否"),n("td",null,"127.0.0.1:6667"),n("td",null,"String"),n("td",null,"用来指定 IoTDB 的 datanode 地址,如果 IoTDB 是用集群模式搭建的话,可以指定多个地址,每个节点用逗号隔开。")],-1),h=n("tr",null,[n("td",null,"user"),n("td",null,"否"),n("td",null,"root"),n("td",null,"String"),n("td",null,"IoTDB 用户名")],-1),T=n("tr",null,[n("td",null,"password"),n("td",null,"否"),n("td",null,"root"),n("td",null,"String"),n("td",null,"IoTDB 密码")],-1),f=n("tr",null,[n("td",null,"mode"),n("td",null,"是"),n("td",null,"BOUNDED"),n("td",null,"ENUM"),n("td",null,[n("strong",null,[s("必须将此参数设置为 "),n("code",null,"CDC"),s(" 才能启动")])])],-1),q=n("tr",null,[n("td",null,"sql"),n("td",null,"是"),n("td",null,"无"),n("td",null,"String"),n("td",null,"用于在 IoTDB 端做查询。")],-1),D=n("tr",null,[n("td",null,"cdc.port"),n("td",null,"否"),n("td",null,"8080"),n("td",null,"Integer"),n("td",null,"在 IoTDB 端提供 CDC 服务的端口号。")],-1),I={href:"http://cdc.task.name",target:"_blank",rel:"noopener noreferrer"},y=n("td",null,"是",-1),S=n("td",null,"无",-1),B=n("td",null,"String",-1),x=n("td",null,"当 mode 参数设置为 CDC 时是必填项。用于在 IoTDB 端创建 Pipe 任务。",-1),w=n("tr",null,[n("td",null,"cdc.pattern"),n("td",null,"是"),n("td",null,"无"),n("td",null,"String"),n("td",null,"当 mode 参数设置为 CDC 时是必填项。用于在 IoTDB 端作为发送数据的过滤条件。")],-1),_=a(`<h4 id="示例-2" tabindex="-1"><a class="header-anchor" href="#示例-2"><span>示例</span></a></h4><p>该示例演示了如何通过 <code>CDC Connector</code> 去获取 IoTDB 中指定路径下的变化数据:</p><ul><li>通过 <code>CDC Connector</code> 创建一张 <code>CDC</code> 表。</li><li>将 <code>CDC</code> 表打印出来。</li></ul><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token operator">*</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CDCTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// setup schema</span>
<span class="token class-name">Schema</span> iotdbTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register table</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>iotdbTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;mode&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;CDC&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;cdc.task.name&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;test&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;cdc.pattern&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;root.sg&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// output table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>运行以上的 Flink CDC 任务,然后在 IoTDB-cli 中执行以下 SQL:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d1<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">,</span>s1<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d1<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">,</span>s1<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d1<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">,</span>s1<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">2.0</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d0<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">2.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>然后,Flink 的控制台中将打印该条数据:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | Time_ | root.sg.d0.s0 | root.sg.d1.s0 | root.sg.d1.s1 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 7 | &lt;NULL&gt; | 1.0 | 1.0 |
| +I | 6 | &lt;NULL&gt; | 1.0 | 1.0 |
| +I | 6 | &lt;NULL&gt; | 2.0 | 1.0 |
| +I | 7 | 2.0 | &lt;NULL&gt; | &lt;NULL&gt; |
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="写模式-sink-1" tabindex="-1"><a class="header-anchor" href="#写模式-sink-1"><span>写模式(Sink)</span></a></h2><h3 id="streaming-sink" tabindex="-1"><a class="header-anchor" href="#streaming-sink"><span>Streaming Sink</span></a></h3><h4 id="参数-3" tabindex="-1"><a class="header-anchor" href="#参数-3"><span>参数</span></a></h4><table><thead><tr><th>参数</th><th>必填</th><th>默认</th><th>类型</th><th>描述</th></tr></thead><tbody><tr><td>nodeUrls</td><td>否</td><td>127.0.0.1:6667</td><td>String</td><td>用来指定 IoTDB 的 datanode 地址,如果 IoTDB 是用集群模式搭建的话,可以指定多个地址,每个节点用逗号隔开。</td></tr><tr><td>user</td><td>否</td><td>root</td><td>String</td><td>IoTDB 用户名</td></tr><tr><td>password</td><td>否</td><td>root</td><td>String</td><td>IoTDB 密码</td></tr><tr><td>aligned</td><td>否</td><td>false</td><td>Boolean</td><td>向 IoTDB 写入数据时是否调用<code>aligned</code>接口。</td></tr></tbody></table><h4 id="示例-3" tabindex="-1"><a class="header-anchor" href="#示例-3"><span>示例</span></a></h4><p>该示例演示了如何在一个 Flink Table 的 Streaming Job 中如何将数据写入到 IoTDB 中:</p><ul><li>通过 <code>datagen connector</code> 生成一张源数据表。</li><li>通过 <code>IoTDB connector</code> 注册一个输出表。</li><li>将数据源表的数据插入到输出表中。</li></ul><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">DataTypes</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">EnvironmentSettings</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Schema</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Table</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableDescriptor</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">StreamingSinkTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// create data source table</span>
<span class="token class-name">Schema</span> dataGenTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> descriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;datagen&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>dataGenTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;rows-per-second&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.kind&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;sequence&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.start&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.end&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d0.s0.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d0.s0.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s0.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s0.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s1.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s1.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register source table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;dataGenTable&quot;</span><span class="token punctuation">,</span> descriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">Table</span> dataGenTable <span class="token operator">=</span> tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;dataGenTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// create iotdb sink table</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>dataGenTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbSinkTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// insert data</span>
dataGenTable<span class="token punctuation">.</span><span class="token function">executeInsert</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbSinkTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>上述任务执行完成后,在 IoTDB 的 cli 中查询结果如下:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.054s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="batch-sink" tabindex="-1"><a class="header-anchor" href="#batch-sink"><span>Batch Sink</span></a></h3><h4 id="参数-4" tabindex="-1"><a class="header-anchor" href="#参数-4"><span>参数</span></a></h4><table><thead><tr><th>参数</th><th>必填</th><th>默认</th><th>类型</th><th>描述</th></tr></thead><tbody><tr><td>nodeUrls</td><td>否</td><td>127.0.0.1:6667</td><td>String</td><td>用来指定 IoTDB 的 datanode 地址,如果 IoTDB 是用集群模式搭建的话,可以指定多个地址,每个节点用逗号隔开。</td></tr><tr><td>user</td><td>否</td><td>root</td><td>String</td><td>IoTDB 用户名</td></tr><tr><td>password</td><td>否</td><td>root</td><td>String</td><td>IoTDB 密码</td></tr><tr><td>aligned</td><td>否</td><td>false</td><td>Boolean</td><td>向 IoTDB 写入数据时是否调用<code>aligned</code>接口。</td></tr></tbody></table><h4 id="示例-4" tabindex="-1"><a class="header-anchor" href="#示例-4"><span>示例</span></a></h4><p>该示例演示了如何在一个 Flink Table 的 Batch Job 中如何将数据写入到 IoTDB 中:</p><ul><li>通过 <code>IoTDB connector</code> 生成一张源数据表。</li><li>通过 <code>IoTDB connector</code> 注册一个输出表。</li><li>将原数据表中的列重命名后写入写回 IoTDB。</li></ul><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">DataTypes</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">EnvironmentSettings</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Schema</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Table</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableDescriptor</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token keyword">static</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Expressions</span><span class="token punctuation">.</span>$<span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">BatchSinkTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inBatchMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// create source table</span>
<span class="token class-name">Schema</span> sourceTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> sourceTableDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>sourceTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;sql&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;select ** from root.sg.d0,root.sg.d1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;sourceTable&quot;</span><span class="token punctuation">,</span> sourceTableDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">Table</span> sourceTable <span class="token operator">=</span> tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;sourceTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register sink table</span>
<span class="token class-name">Schema</span> sinkTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d2.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> sinkTableDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>sinkTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;sinkTable&quot;</span><span class="token punctuation">,</span> sinkTableDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// insert data</span>
sourceTable<span class="token punctuation">.</span><span class="token function">renameColumns</span><span class="token punctuation">(</span>
$<span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">as</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d2.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
$<span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">as</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
$<span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">as</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">insertInto</span><span class="token punctuation">(</span><span class="token string">&quot;sinkTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>上述任务执行完成后,在 IoTDB 的 cli 中查询结果如下:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|root.sg.d2.s0|root.sg.d3.s0|root.sg.d3.s1|
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.015s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div>`,27);function E(L,F){const t=e("ExternalLinkIcon");return c(),l("div",null,[u,n("ol",null,[n("li",null,[n("p",null,[s("在 "),n("a",d,[s("官网"),p(t)]),s(" 下载带依赖的 flink-sql-iotdb-connector 的 jar 包。")])]),k,r,m]),v,n("table",null,[b,n("tbody",null,[g,h,T,f,q,D,n("tr",null,[n("td",null,[n("a",I,[s("cdc.task.name"),p(t)])]),y,S,B,x]),w])]),_])}const O=o(i,[["render",E],["__file","Flink-SQL-IoTDB.html.vue"]]),N=JSON.parse('{"path":"/zh/UserGuide/V1.2.x/Ecosystem-Integration/Flink-SQL-IoTDB.html","title":"flink-sql-iotdb-connector","lang":"zh-CN","frontmatter":{"description":"flink-sql-iotdb-connector flink-sql-iotdb-connector 将 Flink SQL 或者 Flink Table 与 IoTDB 无缝衔接了起来,使得在 Flink 的任务中可以对 IoTDB 进行实时读写,具体可以应用到如下场景中: 实时数据同步:将数据从一个数据库实时同步到另一个数据库。 实时数据管道:构...","head":[["link",{"rel":"alternate","hreflang":"en-us","href":"https://iotdb.apache.org/UserGuide/V1.2.x/Ecosystem-Integration/Flink-SQL-IoTDB.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/zh/UserGuide/V1.2.x/Ecosystem-Integration/Flink-SQL-IoTDB.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"flink-sql-iotdb-connector"}],["meta",{"property":"og:description","content":"flink-sql-iotdb-connector flink-sql-iotdb-connector 将 Flink SQL 或者 Flink Table 与 IoTDB 无缝衔接了起来,使得在 Flink 的任务中可以对 IoTDB 进行实时读写,具体可以应用到如下场景中: 实时数据同步:将数据从一个数据库实时同步到另一个数据库。 实时数据管道:构..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:locale","content":"zh-CN"}],["meta",{"property":"og:locale:alternate","content":"en-US"}],["meta",{"property":"og:updated_time","content":"2024-01-10T02:47:40.000Z"}],["meta",{"property":"article:modified_time","content":"2024-01-10T02:47:40.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"flink-sql-iotdb-connector\\",\\"image\\":[\\"\\"],\\"dateModified\\":\\"2024-01-10T02:47:40.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"读写模式","slug":"读写模式","link":"#读写模式","children":[{"level":3,"title":"读模式(Source)","slug":"读模式-source","link":"#读模式-source","children":[]},{"level":3,"title":"写模式(Sink)","slug":"写模式-sink","link":"#写模式-sink","children":[]}]},{"level":2,"title":"使用方式","slug":"使用方式","link":"#使用方式","children":[{"level":3,"title":"Maven","slug":"maven","link":"#maven","children":[]},{"level":3,"title":"sql-client","slug":"sql-client","link":"#sql-client","children":[]}]},{"level":2,"title":"表结构规范","slug":"表结构规范","link":"#表结构规范","children":[]},{"level":2,"title":"读模式(Source)","slug":"读模式-source-1","link":"#读模式-source-1","children":[{"level":3,"title":"Scan Table (Bounded)","slug":"scan-table-bounded","link":"#scan-table-bounded","children":[]},{"level":3,"title":"Lookup Point","slug":"lookup-point","link":"#lookup-point","children":[]},{"level":3,"title":"CDC","slug":"cdc","link":"#cdc","children":[]}]},{"level":2,"title":"写模式(Sink)","slug":"写模式-sink-1","link":"#写模式-sink-1","children":[{"level":3,"title":"Streaming Sink","slug":"streaming-sink","link":"#streaming-sink","children":[]},{"level":3,"title":"Batch Sink","slug":"batch-sink","link":"#batch-sink","children":[]}]}],"git":{"createdTime":1694658523000,"updatedTime":1704854860000,"contributors":[{"name":"Xuan Ronaldo","email":"xuanronaldo@qq.com","commits":2},{"name":"wanghui42","email":"105700158+wanghui42@users.noreply.github.com","commits":1}]},"readingTime":{"minutes":10.18,"words":3054},"filePathRelative":"zh/UserGuide/V1.2.x/Ecosystem-Integration/Flink-SQL-IoTDB.md","localizedDate":"2023年9月14日","autoDesc":true}');export{O as comp,N as data};