blob: ba636a0dd0882ac229ccf568d0adbb2030b7d8e0 [file] [log] [blame]
import{_ as p,r as o,o as c,c as i,b as n,d as s,a as e,e as a}from"./app-Bx8hKGcu.js";const l={},u=a(`<h1 id="flink-sql-iotdb-connector" tabindex="-1"><a class="header-anchor" href="#flink-sql-iotdb-connector"><span>flink-sql-iotdb-connector</span></a></h1><p>The flink-sql-iotdb-connector seamlessly connects Flink SQL or Flink Table with IoTDB, enabling real-time read and write operations on IoTDB within Flink tasks. It can be applied to the following scenarios:</p><ol><li>Real-time data synchronization: Real-time synchronization of data from one database to another.</li><li>Real-time data pipeline: Building real-time data processing pipelines to process and analyze data in databases.</li><li>Real-time data analysis: Real-time analysis of data in databases, providing real-time business insights.</li><li>Real-time applications: Real-time application of database data in real-time applications such as real-time reporting and real-time recommendations.</li><li>Real-time monitoring: Real-time monitoring of database data, detecting anomalies and errors.</li></ol><h2 id="read-and-write-modes" tabindex="-1"><a class="header-anchor" href="#read-and-write-modes"><span>Read and Write Modes</span></a></h2><table><thead><tr><th>Read Modes (Source)</th><th>Write Modes (Sink)</th></tr></thead><tbody><tr><td>Bounded Scan, Lookup, CDC</td><td>Streaming Sink, Batch Sink</td></tr></tbody></table><h3 id="read-modes-source" tabindex="-1"><a class="header-anchor" href="#read-modes-source"><span>Read Modes (Source)</span></a></h3><ul><li><p><strong>Bounded Scan:</strong> Bounded scan is primarily implemented by specifying the <code>time series</code> and optional <code>upper and lower bounds of the query conditions</code> to query data, and the query result usually consists of multiple rows of data. This type of query cannot retrieve data that is updated after the query.</p></li><li><p><strong>Lookup:</strong> The lookup query mode differs from the scan query mode. While bounded scan queries data within a time range, the <code>lookup</code> query mode only queries data at a precise time point, resulting in a single row of data. Additionally, only the right table of a <code>lookup join</code> can use the lookup query mode.</p></li><li><p><strong>CDC:</strong> CDC is mainly used in Flink&#39;s ETL tasks. When data in IoTDB changes, Flink can detect it through our provided CDC connector, and we can forward the detected change data to other external data sources to achieve the purpose of ETL.</p></li></ul><h3 id="write-modes-sink" tabindex="-1"><a class="header-anchor" href="#write-modes-sink"><span>Write Modes (Sink)</span></a></h3><ul><li><p><strong>Streaming Sink:</strong> Used in Flink&#39;s streaming mode, it synchronizes the insert, update, and delete records of the Dynamic Table in Flink to IoTDB in real-time.</p></li><li><p><strong>Batch Sink:</strong> Used in Flink&#39;s batch mode, it writes the batch computation results from Flink to IoTDB in a single operation.</p></li></ul><h2 id="usage" tabindex="-1"><a class="header-anchor" href="#usage"><span>Usage</span></a></h2><p>We provide two ways to use the flink-sql-iotdb-connector. One is to reference it through Maven during project development, and the other is to use it in Flink&#39;s sql-client. We will introduce these two usage methods separately.</p><blockquote><p>📌 Note: flink version requires 1.17.0 and above.</p></blockquote><h3 id="maven" tabindex="-1"><a class="header-anchor" href="#maven"><span>Maven</span></a></h3><p>Simply add the following dependency to your project&#39;s pom file:</p><div class="language-xml line-numbers-mode" data-ext="xml" data-title="xml"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.iotdb<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>flink-sql-iotdb-connector<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>\${iotdb.version}<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="sql-client" tabindex="-1"><a class="header-anchor" href="#sql-client"><span>sql-client</span></a></h3><p>If you want to use the flink-sql-iotdb-connector in the sql-client, follow these steps to configure the environment:</p>`,17),d={href:"https://iotdb.apache.org/Download/",target:"_blank",rel:"noopener noreferrer"},r=n("li",null,[n("p",null,[s("Copy the jar file to the "),n("code",null,"$FLINK_HOME/lib"),s(" directory.")])],-1),k=n("li",null,[n("p",null,"Start the Flink cluster.")],-1),m=n("li",null,[n("p",null,"Start the sql-client.")],-1),v=a(`<p>You can now use the flink-sql-iotdb-connector in the sql-client.</p><h2 id="table-structure-specification" tabindex="-1"><a class="header-anchor" href="#table-structure-specification"><span>Table Structure Specification</span></a></h2><p>Regardless of the type of connector used, the following table structure specifications must be met:</p><ul><li>For all tables using the <code>IoTDB connector</code>, the first column must be named <code>Time_</code> and have a data type of <code>BIGINT</code>.</li><li>All column names, except for the <code>Time_</code> column, must start with <code>root.</code>. Additionally, any node in the column name cannot be purely numeric. If there are purely numeric or other illegal characters in the column name, they must be enclosed in backticks. For example, the path <code>root.sg.d0.123</code> is an illegal path, but <code>root.sg.d0.</code>123\`\` is a valid path.</li><li>When querying data from IoTDB using either <code>pattern</code> or <code>sql</code>, the time series names in the query result must include all column names in Flink, except for <code>Time_</code>. If there is no corresponding column name in the query result, that column will be filled with null.</li><li>The supported data types in flink-sql-iotdb-connector are: <code>INT</code>, <code>BIGINT</code>, <code>FLOAT</code>, <code>DOUBLE</code>, <code>BOOLEAN</code>, <code>STRING</code>. The data type of each column in Flink Table must match the corresponding time series type in IoTDB, otherwise an error will occur and the Flink task will exit.</li></ul><p>The following examples illustrate the mapping between time series in IoTDB and columns in Flink Table.</p><h2 id="read-mode-source" tabindex="-1"><a class="header-anchor" href="#read-mode-source"><span>Read Mode (Source)</span></a></h2><h3 id="scan-table-bounded" tabindex="-1"><a class="header-anchor" href="#scan-table-bounded"><span>Scan Table (Bounded)</span></a></h3><h4 id="parameters" tabindex="-1"><a class="header-anchor" href="#parameters"><span>Parameters</span></a></h4><table><thead><tr><th>Parameter</th><th>Required</th><th>Default</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td>nodeUrls</td><td>No</td><td>127.0.0.1:6667</td><td>String</td><td>Specifies the datanode addresses of IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.</td></tr><tr><td>user</td><td>No</td><td>root</td><td>String</td><td>IoTDB username</td></tr><tr><td>password</td><td>No</td><td>root</td><td>String</td><td>IoTDB password</td></tr><tr><td>scan.bounded.lower-bound</td><td>No</td><td>-1L</td><td>Long</td><td>Lower bound (inclusive) of the timestamp for bounded scan queries. Valid when the parameter is greater than <code>0</code>.</td></tr><tr><td>scan.bounded.upper-bound</td><td>No</td><td>-1L</td><td>Long</td><td>Upper bound (inclusive) of the timestamp for bounded scan queries. Valid when the parameter is greater than <code>0</code>.</td></tr><tr><td>sql</td><td>Yes</td><td>None</td><td>String</td><td>Query to be executed in IoTDB.</td></tr></tbody></table><h4 id="example" tabindex="-1"><a class="header-anchor" href="#example"><span>Example</span></a></h4><p>This example demonstrates how to read data from IoTDB using the <code>scan table</code> method in a Flink Table Job:</p><p>Assume the data in IoTDB is as follows:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.028s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token operator">*</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">BoundedScanTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token comment">// setup table environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// setup schema</span>
<span class="token class-name">Schema</span> iotdbTableSchema <span class="token operator">=</span>
<span class="token class-name">Schema</span><span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register table</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span>
<span class="token class-name">TableDescriptor</span><span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>iotdbTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;nodeUrls&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;127.0.0.1:6667&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;sql&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;select ** from root&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// output table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>After executing the above job, the output table in the Flink console is as follows:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | Time_ | root.sg.d0.s0 | root.sg.d1.s0 | root.sg.d1.s1 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 1 | 1.0833644 | 2.34874 | 1.2414109 |
| +I | 2 | 4.929185 | 3.1885583 | 4.6980085 |
| +I | 3 | 3.5206156 | 3.5600138 | 4.8080945 |
| +I | 4 | 1.3449302 | 2.8781595 | 3.3195343 |
| +I | 5 | 3.3079383 | 3.3840187 | 3.7278645 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="lookup-point" tabindex="-1"><a class="header-anchor" href="#lookup-point"><span>Lookup Point</span></a></h3><h4 id="parameters-1" tabindex="-1"><a class="header-anchor" href="#parameters-1"><span>Parameters</span></a></h4><table><thead><tr><th>Parameter</th><th>Required</th><th>Default</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td>nodeUrls</td><td>No</td><td>127.0.0.1:6667</td><td>String</td><td>Specifies the addresses of the IoTDB datanode. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.</td></tr><tr><td>user</td><td>No</td><td>root</td><td>String</td><td>IoTDB username</td></tr><tr><td>password</td><td>No</td><td>root</td><td>String</td><td>IoTDB password</td></tr><tr><td>lookup.cache.max-rows</td><td>No</td><td>-1</td><td>Integer</td><td>Maximum number of rows to cache for lookup queries. Effective when the parameter is greater than <code>0</code>.</td></tr><tr><td>lookup.cache.ttl-sec</td><td>No</td><td>-1</td><td>Integer</td><td>Time-to-live for cached data in lookup queries, in seconds.</td></tr><tr><td>sql</td><td>Yes</td><td>None</td><td>String</td><td>SQL query to execute in IoTDB.</td></tr></tbody></table><h4 id="example-1" tabindex="-1"><a class="header-anchor" href="#example-1"><span>Example</span></a></h4><p>This example demonstrates how to perform a <code>lookup</code> query using the <code>device</code> table in IoTDB as a dimension table:</p><ul><li>Use the <code>datagen connector</code> to generate two fields as the left table for <code>Lookup Join</code>. The first field is an incrementing field representing the timestamp. The second field is a random field representing a measurement time series.</li><li>Register a table using the <code>IoTDB connector</code> as the right table for <code>Lookup Join</code>.</li><li>Join the two tables together.</li></ul><p>The current data in IoTDB is as follows:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.028s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">DataTypes</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">EnvironmentSettings</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Schema</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableDescriptor</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">LookupTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// Setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// Register left table</span>
<span class="token class-name">Schema</span> dataGenTableSchema <span class="token operator">=</span>
<span class="token class-name">Schema</span><span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">INT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> datagenDescriptor <span class="token operator">=</span>
<span class="token class-name">TableDescriptor</span><span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;datagen&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>dataGenTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.kind&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;sequence&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.start&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.end&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.s0.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.s0.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;leftTable&quot;</span><span class="token punctuation">,</span> datagenDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// Register right table</span>
<span class="token class-name">Schema</span> iotdbTableSchema <span class="token operator">=</span>
<span class="token class-name">Schema</span><span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span>
<span class="token class-name">TableDescriptor</span><span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>iotdbTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;sql&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;select ** from root&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;rightTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// Join</span>
<span class="token class-name">String</span> sql <span class="token operator">=</span>
<span class="token string">&quot;SELECT l.Time_, l.s0, r.\`root.sg.d0.s0\`, r.\`root.sg.d1.s0\`, r.\`root.sg.d1.s1\` &quot;</span>
<span class="token operator">+</span> <span class="token string">&quot;FROM (SELECT *, PROCTIME() AS proc_time FROM leftTable) AS l &quot;</span>
<span class="token operator">+</span> <span class="token string">&quot;JOIN rightTable FOR SYSTEM_TIME AS OF l.proc_time AS r &quot;</span>
<span class="token operator">+</span> <span class="token string">&quot;ON l.Time_ = r.Time_&quot;</span><span class="token punctuation">;</span>
<span class="token comment">// Output table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">sqlQuery</span><span class="token punctuation">(</span>sql<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>After executing the above task, the output table in Flink&#39;s console is as follows:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>+----+----------------------+-------------+---------------+----------------------+--------------------------------+
| op | Time_ | s0 | root.sg.d0.s0 | root.sg.d1.s0 | root.sg.d1.s1 |
+----+----------------------+-------------+---------------+----------------------+--------------------------------+
| +I | 5 | 1 | 3.3079383 | 3.3840187 | 3.7278645 |
| +I | 2 | 1 | 4.929185 | 3.1885583 | 4.6980085 |
| +I | 1 | 1 | 1.0833644 | 2.34874 | 1.2414109 |
| +I | 4 | 1 | 1.3449302 | 2.8781595 | 3.3195343 |
| +I | 3 | 1 | 3.5206156 | 3.5600138 | 4.8080945 |
+----+----------------------+-------------+---------------+----------------------+--------------------------------+
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="cdc" tabindex="-1"><a class="header-anchor" href="#cdc"><span>CDC</span></a></h3><h4 id="parameters-2" tabindex="-1"><a class="header-anchor" href="#parameters-2"><span>Parameters</span></a></h4>`,29),b=n("thead",null,[n("tr",null,[n("th",null,"Parameter"),n("th",null,"Required"),n("th",null,"Default"),n("th",null,"Type"),n("th",null,"Description")])],-1),h=n("tr",null,[n("td",null,"nodeUrls"),n("td",null,"No"),n("td",null,"127.0.0.1:6667"),n("td",null,"String"),n("td",null,"Specifies the datanode address of IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.")],-1),g=n("tr",null,[n("td",null,"user"),n("td",null,"No"),n("td",null,"root"),n("td",null,"String"),n("td",null,"IoTDB username")],-1),f=n("tr",null,[n("td",null,"password"),n("td",null,"No"),n("td",null,"root"),n("td",null,"String"),n("td",null,"IoTDB password")],-1),T=n("tr",null,[n("td",null,"mode"),n("td",null,"Yes"),n("td",null,"BOUNDED"),n("td",null,"ENUM"),n("td",null,[n("strong",null,[s("This parameter must be set to "),n("code",null,"CDC"),s(" in order to start")])])],-1),q=n("tr",null,[n("td",null,"sql"),n("td",null,"Yes"),n("td",null,"None"),n("td",null,"String"),n("td",null,"SQL query to be executed in IoTDB")],-1),y=n("tr",null,[n("td",null,"cdc.port"),n("td",null,"No"),n("td",null,"8080"),n("td",null,"Integer"),n("td",null,"Port number for the CDC service in IoTDB")],-1),D={href:"http://cdc.task.name",target:"_blank",rel:"noopener noreferrer"},w=n("td",null,"Yes",-1),I=n("td",null,"None",-1),S=n("td",null,"String",-1),x=n("td",null,"Required when the mode parameter is set to CDC. Used to create a Pipe task in IoTDB.",-1),B=n("tr",null,[n("td",null,"cdc.pattern"),n("td",null,"Yes"),n("td",null,"None"),n("td",null,"String"),n("td",null,"Required when the mode parameter is set to CDC. Used as a filtering condition for sending data in IoTDB.")],-1),_=a(`<h4 id="example-2" tabindex="-1"><a class="header-anchor" href="#example-2"><span>Example</span></a></h4><p>This example demonstrates how to retrieve the changing data from a specific path in IoTDB using the <code>CDC Connector</code>:</p><ul><li>Create a <code>CDC</code> table using the <code>CDC Connector</code>.</li><li>Print the <code>CDC</code> table.</li></ul><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token operator">*</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CDCTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// setup schema</span>
<span class="token class-name">Schema</span> iotdbTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register table</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>iotdbTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;mode&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;CDC&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;cdc.task.name&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;test&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;cdc.pattern&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;root.sg&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// output table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>Run the above Flink CDC task and execute the following SQL in IoTDB-cli:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d1<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">,</span>s1<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d1<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">,</span>s1<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d1<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">,</span>s1<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">2.0</span><span class="token punctuation">,</span><span class="token number">1.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">insert</span> <span class="token keyword">into</span> root<span class="token punctuation">.</span>sg<span class="token punctuation">.</span>d0<span class="token punctuation">(</span><span class="token keyword">timestamp</span><span class="token punctuation">,</span>s0<span class="token punctuation">)</span> <span class="token keyword">values</span><span class="token punctuation">(</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">2.0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>The console of Flink will print the following data:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | Time_ | root.sg.d0.s0 | root.sg.d1.s0 | root.sg.d1.s1 |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 7 | &lt;NULL&gt; | 1.0 | 1.0 |
| +I | 6 | &lt;NULL&gt; | 1.0 | 1.0 |
| +I | 6 | &lt;NULL&gt; | 2.0 | 1.0 |
| +I | 7 | 2.0 | &lt;NULL&gt; | &lt;NULL&gt; |
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="write-mode-sink" tabindex="-1"><a class="header-anchor" href="#write-mode-sink"><span>Write Mode (Sink)</span></a></h2><h3 id="streaming-sink" tabindex="-1"><a class="header-anchor" href="#streaming-sink"><span>Streaming Sink</span></a></h3><h4 id="parameters-3" tabindex="-1"><a class="header-anchor" href="#parameters-3"><span>Parameters</span></a></h4><table><thead><tr><th>Parameter</th><th>Required</th><th>Default</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td>nodeUrls</td><td>No</td><td>127.0.0.1:6667</td><td>String</td><td>Specifies the datanode address of IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.</td></tr><tr><td>user</td><td>No</td><td>root</td><td>String</td><td>IoTDB username</td></tr><tr><td>password</td><td>No</td><td>root</td><td>String</td><td>IoTDB password</td></tr><tr><td>aligned</td><td>No</td><td>false</td><td>Boolean</td><td>Whether to call the <code>aligned</code> interface when writing data to IoTDB.</td></tr></tbody></table><h4 id="example-3" tabindex="-1"><a class="header-anchor" href="#example-3"><span>Example</span></a></h4><p>This example demonstrates how to write data to IoTDB in a Flink Table Streaming Job:</p><ul><li>Generate a source data table using the <code>datagen connector</code>.</li><li>Register an output table using the <code>IoTDB connector</code>.</li><li>Insert data from the source table into the output table.</li></ul><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">DataTypes</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">EnvironmentSettings</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Schema</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Table</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableDescriptor</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">StreamingSinkTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inStreamingMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// create data source table</span>
<span class="token class-name">Schema</span> dataGenTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> descriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;datagen&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>dataGenTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;rows-per-second&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.kind&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;sequence&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.start&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.Time_.end&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d0.s0.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d0.s0.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s0.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s0.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s1.min&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;fields.root.sg.d1.s1.max&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;5&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register source table</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;dataGenTable&quot;</span><span class="token punctuation">,</span> descriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">Table</span> dataGenTable <span class="token operator">=</span> tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;dataGenTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// create iotdb sink table</span>
<span class="token class-name">TableDescriptor</span> iotdbDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>dataGenTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbSinkTable&quot;</span><span class="token punctuation">,</span> iotdbDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// insert data</span>
dataGenTable<span class="token punctuation">.</span><span class="token function">executeInsert</span><span class="token punctuation">(</span><span class="token string">&quot;iotdbSinkTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>After the above job is executed, the query result in the IoTDB CLI is as follows:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|
+-----------------------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.054s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="batch-sink" tabindex="-1"><a class="header-anchor" href="#batch-sink"><span>Batch Sink</span></a></h3><h4 id="parameters-4" tabindex="-1"><a class="header-anchor" href="#parameters-4"><span>Parameters</span></a></h4><table><thead><tr><th>Parameter</th><th>Required</th><th>Default</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td>nodeUrls</td><td>No</td><td>127.0.0.1:6667</td><td>String</td><td>Specifies the addresses of datanodes in IoTDB. If IoTDB is deployed in cluster mode, multiple addresses can be specified, separated by commas.</td></tr><tr><td>user</td><td>No</td><td>root</td><td>String</td><td>IoTDB username</td></tr><tr><td>password</td><td>No</td><td>root</td><td>String</td><td>IoTDB password</td></tr><tr><td>aligned</td><td>No</td><td>false</td><td>Boolean</td><td>Whether to call the <code>aligned</code> interface when writing data to IoTDB.</td></tr></tbody></table><h4 id="example-4" tabindex="-1"><a class="header-anchor" href="#example-4"><span>Example</span></a></h4><p>This example demonstrates how to write data to IoTDB in a Batch Job of a Flink Table:</p><ul><li>Generate a source table using the <code>IoTDB connector</code>.</li><li>Register an output table using the <code>IoTDB connector</code>.</li><li>Write the renamed columns from the source table back to IoTDB.</li></ul><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">DataTypes</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">EnvironmentSettings</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Schema</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Table</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableDescriptor</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">TableEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token keyword">static</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>table<span class="token punctuation">.</span>api<span class="token punctuation">.</span></span><span class="token class-name">Expressions</span><span class="token punctuation">.</span>$<span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">BatchSinkTest</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// setup environment</span>
<span class="token class-name">EnvironmentSettings</span> settings <span class="token operator">=</span> <span class="token class-name">EnvironmentSettings</span>
<span class="token punctuation">.</span><span class="token function">newInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">inBatchMode</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableEnvironment</span> tableEnv <span class="token operator">=</span> <span class="token class-name">TableEnvironment</span><span class="token punctuation">.</span><span class="token function">create</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// create source table</span>
<span class="token class-name">Schema</span> sourceTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> sourceTableDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>sourceTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">option</span><span class="token punctuation">(</span><span class="token string">&quot;sql&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;select ** from root.sg.d0,root.sg.d1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;sourceTable&quot;</span><span class="token punctuation">,</span> sourceTableDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">Table</span> sourceTable <span class="token operator">=</span> tableEnv<span class="token punctuation">.</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;sourceTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// register sink table</span>
<span class="token class-name">Schema</span> sinkTableSchema <span class="token operator">=</span> <span class="token class-name">Schema</span>
<span class="token punctuation">.</span><span class="token function">newBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;Time_&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">BIGINT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d2.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s0&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">column</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">DataTypes</span><span class="token punctuation">.</span><span class="token function">FLOAT</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">TableDescriptor</span> sinkTableDescriptor <span class="token operator">=</span> <span class="token class-name">TableDescriptor</span>
<span class="token punctuation">.</span><span class="token function">forConnector</span><span class="token punctuation">(</span><span class="token string">&quot;IoTDB&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">schema</span><span class="token punctuation">(</span>sinkTableSchema<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tableEnv<span class="token punctuation">.</span><span class="token function">createTemporaryTable</span><span class="token punctuation">(</span><span class="token string">&quot;sinkTable&quot;</span><span class="token punctuation">,</span> sinkTableDescriptor<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// insert data</span>
sourceTable<span class="token punctuation">.</span><span class="token function">renameColumns</span><span class="token punctuation">(</span>
$<span class="token punctuation">(</span><span class="token string">&quot;root.sg.d0.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">as</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d2.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
$<span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">as</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s0&quot;</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
$<span class="token punctuation">(</span><span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">as</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg.d3.s1&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">insertInto</span><span class="token punctuation">(</span><span class="token string">&quot;sinkTable&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>After the above task is executed, the query result in the IoTDB cli is as follows:</p><div class="language-text line-numbers-mode" data-ext="text" data-title="text"><pre class="language-text"><code>IoTDB&gt; select ** from root;
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
| Time|root.sg.d0.s0|root.sg.d1.s0|root.sg.d1.s1|root.sg.d2.s0|root.sg.d3.s0|root.sg.d3.s1|
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
|1970-01-01T08:00:00.001+08:00| 1.0833644| 2.34874| 1.2414109| 1.0833644| 2.34874| 1.2414109|
|1970-01-01T08:00:00.002+08:00| 4.929185| 3.1885583| 4.6980085| 4.929185| 3.1885583| 4.6980085|
|1970-01-01T08:00:00.003+08:00| 3.5206156| 3.5600138| 4.8080945| 3.5206156| 3.5600138| 4.8080945|
|1970-01-01T08:00:00.004+08:00| 1.3449302| 2.8781595| 3.3195343| 1.3449302| 2.8781595| 3.3195343|
|1970-01-01T08:00:00.005+08:00| 3.3079383| 3.3840187| 3.7278645| 3.3079383| 3.3840187| 3.7278645|
+-----------------------------+-------------+-------------+-------------+-------------+-------------+-------------+
Total line number = 5
It costs 0.015s
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div>`,27);function E(L,F){const t=o("ExternalLinkIcon");return c(),i("div",null,[u,n("ol",null,[n("li",null,[n("p",null,[s("Download the flink-sql-iotdb-connector jar file with dependencies from the "),n("a",d,[s("official website"),e(t)]),s(".")])]),r,k,m]),v,n("table",null,[b,n("tbody",null,[h,g,f,T,q,y,n("tr",null,[n("td",null,[n("a",D,[s("cdc.task.name"),e(t)])]),w,I,S,x]),B])]),_])}const C=p(l,[["render",E],["__file","Flink-SQL-IoTDB.html.vue"]]),R=JSON.parse('{"path":"/UserGuide/latest/Ecosystem-Integration/Flink-SQL-IoTDB.html","title":"flink-sql-iotdb-connector","lang":"en-US","frontmatter":{"description":"flink-sql-iotdb-connector The flink-sql-iotdb-connector seamlessly connects Flink SQL or Flink Table with IoTDB, enabling real-time read and write operations on IoTDB within Fli...","head":[["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/latest/Ecosystem-Integration/Flink-SQL-IoTDB.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/latest/Ecosystem-Integration/Flink-SQL-IoTDB.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"flink-sql-iotdb-connector"}],["meta",{"property":"og:description","content":"flink-sql-iotdb-connector The flink-sql-iotdb-connector seamlessly connects Flink SQL or Flink Table with IoTDB, enabling real-time read and write operations on IoTDB within Fli..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2024-01-17T06:56:46.000Z"}],["meta",{"property":"article:modified_time","content":"2024-01-17T06:56:46.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"flink-sql-iotdb-connector\\",\\"image\\":[\\"\\"],\\"dateModified\\":\\"2024-01-17T06:56:46.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"Read and Write Modes","slug":"read-and-write-modes","link":"#read-and-write-modes","children":[{"level":3,"title":"Read Modes (Source)","slug":"read-modes-source","link":"#read-modes-source","children":[]},{"level":3,"title":"Write Modes (Sink)","slug":"write-modes-sink","link":"#write-modes-sink","children":[]}]},{"level":2,"title":"Usage","slug":"usage","link":"#usage","children":[{"level":3,"title":"Maven","slug":"maven","link":"#maven","children":[]},{"level":3,"title":"sql-client","slug":"sql-client","link":"#sql-client","children":[]}]},{"level":2,"title":"Table Structure Specification","slug":"table-structure-specification","link":"#table-structure-specification","children":[]},{"level":2,"title":"Read Mode (Source)","slug":"read-mode-source","link":"#read-mode-source","children":[{"level":3,"title":"Scan Table (Bounded)","slug":"scan-table-bounded","link":"#scan-table-bounded","children":[]},{"level":3,"title":"Lookup Point","slug":"lookup-point","link":"#lookup-point","children":[]},{"level":3,"title":"CDC","slug":"cdc","link":"#cdc","children":[]}]},{"level":2,"title":"Write Mode (Sink)","slug":"write-mode-sink","link":"#write-mode-sink","children":[{"level":3,"title":"Streaming Sink","slug":"streaming-sink","link":"#streaming-sink","children":[]},{"level":3,"title":"Batch Sink","slug":"batch-sink","link":"#batch-sink","children":[]}]}],"git":{"createdTime":1694658523000,"updatedTime":1705474606000,"contributors":[{"name":"CritasWang","email":"critas@outlook.com","commits":1}]},"readingTime":{"minutes":8.18,"words":2455},"filePathRelative":"UserGuide/latest/Ecosystem-Integration/Flink-SQL-IoTDB.md","localizedDate":"September 14, 2023","autoDesc":true}');export{C as comp,R as data};