| import{_ as n,c as a,b as l,o}from"./app-C-fAkKj6.js";const e={};function p(t,s){return o(),a("div",null,s[0]||(s[0]=[l(`<h1 id="apache-flink-iotdb" tabindex="-1"><a class="header-anchor" href="#apache-flink-iotdb"><span>Apache Flink(IoTDB)</span></a></h1><p>IoTDB integration for <a href="https://flink.apache.org/" target="_blank" rel="noopener noreferrer">Apache Flink</a>. This module includes the IoTDB sink that allows a flink job to write events into timeseries, and the IoTDB source allowing reading data from IoTDB.</p><h2 id="iotdbsink" tabindex="-1"><a class="header-anchor" href="#iotdbsink"><span>IoTDBSink</span></a></h2><p>To use the <code>IoTDBSink</code>, you need construct an instance of it by specifying <code>IoTDBSinkOptions</code> and <code>IoTSerializationSchema</code> instances.<br> The <code>IoTDBSink</code> send only one event after another by default, but you can change to batch by invoking <code>withBatchSize(int)</code>.</p><h3 id="example" tabindex="-1"><a class="header-anchor" href="#example"><span>Example</span></a></h3><p>This example shows a case that sends data to a IoTDB server from a Flink job:</p><ul><li>A simulated Source <code>SensorSource</code> generates data points per 1 second.</li><li>Flink uses <code>IoTDBSink</code> to consume the generated data points and write the data into IoTDB.</li></ul><p>It is noteworthy that to use IoTDBSink, schema auto-creation in IoTDB should be enabled.</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" data-title="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.flink.options.IoTDBSinkOptions</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.file.metadata.enums.CompressionType</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.file.metadata.enums.TSDataType</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> com.google.common.collect.Lists</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.flink.streaming.api.functions.source.SourceFunction</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.security.SecureRandom</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.HashMap</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Map</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Random</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> FlinkIoTDBSink</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // run the flink job on local mini cluster</span></span> |
| <span class="line"><span style="color:#E5C07B;"> StreamExecutionEnvironment</span><span style="color:#E06C75;"> env</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> StreamExecutionEnvironment</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getExecutionEnvironment</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> IoTDBSinkOptions</span><span style="color:#E06C75;"> options</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> IoTDBSinkOptions</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> options</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setHost</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"127.0.0.1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> options</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setPort</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">6667</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> options</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setUser</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> options</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setPassword</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // If the server enables auto_create_schema, then we do not need to register all timeseries</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // here.</span></span> |
| <span class="line"><span style="color:#E5C07B;"> options</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setTimeseriesOptionList</span><span style="color:#ABB2BF;">(</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Lists</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">newArrayList</span><span style="color:#ABB2BF;">(</span></span> |
| <span class="line"><span style="color:#C678DD;"> new</span><span style="color:#ABB2BF;"> IoTDBSinkOptions.</span><span style="color:#61AFEF;">TimeseriesOption</span><span style="color:#ABB2BF;">(</span></span> |
| <span class="line"><span style="color:#98C379;"> "root.sg.d1.s1"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">DOUBLE</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">TSEncoding</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">GORILLA</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">CompressionType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">SNAPPY</span><span style="color:#ABB2BF;">)));</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> IoTSerializationSchema</span><span style="color:#E06C75;"> serializationSchema</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> DefaultIoTSerializationSchema</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> IoTDBSink</span><span style="color:#E06C75;"> ioTDBSink</span><span style="color:#56B6C2;"> =</span></span> |
| <span class="line"><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> IoTDBSink</span><span style="color:#ABB2BF;">(options, serializationSchema)</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // enable batching</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">withBatchSize</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // how many connections to the server will be created for each parallelism</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">withSessionPoolSize</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">3</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> env</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">addSource</span><span style="color:#ABB2BF;">(</span><span style="color:#C678DD;">new</span><span style="color:#61AFEF;"> SensorSource</span><span style="color:#ABB2BF;">())</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">name</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"sensor-source"</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">setParallelism</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">1</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">addSink</span><span style="color:#ABB2BF;">(ioTDBSink)</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">name</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"iotdb-sink"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> env</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">execute</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"iotdb-flink-example"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> private</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> SensorSource</span><span style="color:#C678DD;"> implements</span><span style="color:#E5C07B;"> SourceFunction</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Map</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">,</span><span style="color:#E5C07B;"> String</span><span style="color:#ABB2BF;">>></span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> boolean</span><span style="color:#E06C75;"> running </span><span style="color:#56B6C2;">=</span><span style="color:#D19A66;"> true</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Random</span><span style="color:#E06C75;"> random </span><span style="color:#56B6C2;">=</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SecureRandom</span><span style="color:#E06C75;">()</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#ABB2BF;"> @</span><span style="color:#E5C07B;">Override</span></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> run</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">SourceContext</span><span style="color:#E06C75;font-style:italic;"> context</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (running) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Map</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">tuple</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> HashMap</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tuple</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"device"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root.sg.d1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tuple</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"timestamp"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">valueOf</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">System</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">currentTimeMillis</span><span style="color:#ABB2BF;">()));</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tuple</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"measurements"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"s1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tuple</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"types"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"DOUBLE"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tuple</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"values"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">valueOf</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">random</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">nextDouble</span><span style="color:#ABB2BF;">()));</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> context</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">collect</span><span style="color:#ABB2BF;">(tuple);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Thread</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">sleep</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">1000</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#ABB2BF;"> @</span><span style="color:#E5C07B;">Override</span></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> cancel</span><span style="color:#ABB2BF;">()</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> running </span><span style="color:#56B6C2;">=</span><span style="color:#D19A66;"> false</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="usage" tabindex="-1"><a class="header-anchor" href="#usage"><span>Usage</span></a></h3><ul><li>Launch the IoTDB server.</li><li>Run <code>org.apache.iotdb.flink.FlinkIoTDBSink.java</code> to run the flink job on local mini cluster.</li></ul><h2 id="iotdbsource" tabindex="-1"><a class="header-anchor" href="#iotdbsource"><span>IoTDBSource</span></a></h2><p>To use the <code>IoTDBSource</code>, you need to construct an instance of <code>IoTDBSource</code> by specifying <code>IoTDBSourceOptions</code><br> and implementing the abstract method <code>convert()</code> in <code>IoTDBSource</code>. The <code>convert</code> methods defines how<br> you want the row data to be transformed.</p><h3 id="example-1" tabindex="-1"><a class="header-anchor" href="#example-1"><span>Example</span></a></h3><p>This example shows a case where data are read from IoTDB.</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" data-title="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.flink.options.IoTDBSourceOptions</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.IoTDBConnectionException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.StatementExecutionException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.TSStatusCode</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.Session</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.file.metadata.enums.CompressionType</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.file.metadata.enums.TSDataType</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.tsfile.read.common.RowRecord</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.ArrayList</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.List</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> FlinkIoTDBSource</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> final</span><span style="color:#E5C07B;"> String</span><span style="color:#E06C75;"> LOCAL_HOST </span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> "127.0.0.1"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> final</span><span style="color:#E5C07B;"> String</span><span style="color:#E06C75;"> ROOT_SG1_D1_S1 </span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> "root.sg1.d1.s1"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> final</span><span style="color:#E5C07B;"> String</span><span style="color:#E06C75;"> ROOT_SG1_D1 </span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> "root.sg1.d1"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#61AFEF;"> prepareData</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // run the flink job on local mini cluster</span></span> |
| <span class="line"><span style="color:#E5C07B;"> StreamExecutionEnvironment</span><span style="color:#E06C75;"> env</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> StreamExecutionEnvironment</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getExecutionEnvironment</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> IoTDBSourceOptions</span><span style="color:#E06C75;"> ioTDBSourceOptions</span><span style="color:#56B6C2;"> =</span></span> |
| <span class="line"><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> IoTDBSourceOptions</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"127.0.0.1"</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">6667</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> "select s1 from "</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> ROOT_SG1_D1 </span><span style="color:#56B6C2;">+</span><span style="color:#98C379;"> " align by device"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> env</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">addSource</span><span style="color:#ABB2BF;">(</span></span> |
| <span class="line"><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> IoTDBSource</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">RowRecord</span><span style="color:#ABB2BF;">>(ioTDBSourceOptions) {</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> @</span><span style="color:#E5C07B;">Override</span></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#E5C07B;"> RowRecord</span><span style="color:#61AFEF;"> convert</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">RowRecord</span><span style="color:#E06C75;font-style:italic;"> rowRecord</span><span style="color:#ABB2BF;">)</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> return</span><span style="color:#ABB2BF;"> rowRecord;</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> })</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">name</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"sensor-source"</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">print</span><span style="color:#ABB2BF;">()</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> .</span><span style="color:#61AFEF;">setParallelism</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">2</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> env</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">execute</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * Write some data to IoTDB</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> private</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> prepareData</span><span style="color:#ABB2BF;">()</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IoTDBConnectionException</span><span style="color:#ABB2BF;">,</span><span style="color:#E5C07B;"> StatementExecutionException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Session</span><span style="color:#E06C75;"> session</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Session</span><span style="color:#ABB2BF;">(LOCAL_HOST, </span><span style="color:#D19A66;">6667</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">false</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setStorageGroup</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"root.sg1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> if</span><span style="color:#ABB2BF;"> (</span><span style="color:#56B6C2;">!</span><span style="color:#E5C07B;">session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">checkTimeseriesExists</span><span style="color:#ABB2BF;">(ROOT_SG1_D1_S1)) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">createTimeseries</span><span style="color:#ABB2BF;">(</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> ROOT_SG1_D1_S1, </span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">INT64</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">TSEncoding</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">RLE</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">CompressionType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">SNAPPY</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">measurements</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">types</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> measurements</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"s1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> measurements</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"s2"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> measurements</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"s3"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> types</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">INT64</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> types</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">INT64</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> types</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">INT64</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">long</span><span style="color:#E06C75;"> time</span><span style="color:#56B6C2;"> =</span><span style="color:#D19A66;"> 0</span><span style="color:#ABB2BF;">; time </span><span style="color:#56B6C2;"><</span><span style="color:#D19A66;"> 100</span><span style="color:#ABB2BF;">; time++) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Object</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">values</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> values</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">1L</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> values</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">2L</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> values</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">3L</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">insertRecord</span><span style="color:#ABB2BF;">(ROOT_SG1_D1, time, measurements, types, values);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> } </span><span style="color:#C678DD;">catch</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">StatementExecutionException</span><span style="color:#E06C75;font-style:italic;"> e</span><span style="color:#ABB2BF;">) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> if</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">e</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getStatusCode</span><span style="color:#ABB2BF;">() </span><span style="color:#56B6C2;">!=</span><span style="color:#E5C07B;"> TSStatusCode</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PATH_ALREADY_EXIST_ERROR</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getStatusCode</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> throw</span><span style="color:#ABB2BF;"> e;</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="usage-1" tabindex="-1"><a class="header-anchor" href="#usage-1"><span>Usage</span></a></h3><p>Launch the IoTDB server.<br> Run org.apache.iotdb.flink.FlinkIoTDBSource.java to run the flink job on local mini cluster.</p>`,18)]))}const r=n(e,[["render",p],["__file","Flink-IoTDB.html.vue"]]),i=JSON.parse('{"path":"/UserGuide/V1.2.x/Ecosystem-Integration/Flink-IoTDB.html","title":"Apache Flink(IoTDB)","lang":"en-US","frontmatter":{"description":"Apache Flink(IoTDB) IoTDB integration for Apache Flink. This module includes the IoTDB sink that allows a flink job to write events into timeseries, and the IoTDB source allowin...","head":[["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/V1.2.x/Ecosystem-Integration/Flink-IoTDB.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/V1.2.x/Ecosystem-Integration/Flink-IoTDB.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"Apache Flink(IoTDB)"}],["meta",{"property":"og:description","content":"Apache Flink(IoTDB) IoTDB integration for Apache Flink. This module includes the IoTDB sink that allows a flink job to write events into timeseries, and the IoTDB source allowin..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2024-08-12T04:20:54.000Z"}],["meta",{"property":"article:modified_time","content":"2024-08-12T04:20:54.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"Apache Flink(IoTDB)\\",\\"image\\":[\\"\\"],\\"dateModified\\":\\"2024-08-12T04:20:54.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"IoTDBSink","slug":"iotdbsink","link":"#iotdbsink","children":[{"level":3,"title":"Example","slug":"example","link":"#example","children":[]},{"level":3,"title":"Usage","slug":"usage","link":"#usage","children":[]}]},{"level":2,"title":"IoTDBSource","slug":"iotdbsource","link":"#iotdbsource","children":[{"level":3,"title":"Example","slug":"example-1","link":"#example-1","children":[]},{"level":3,"title":"Usage","slug":"usage-1","link":"#usage-1","children":[]}]}],"git":{"createdTime":1688958677000,"updatedTime":1723436454000,"contributors":[{"name":"CritasWang","username":"CritasWang","email":"critas@outlook.com","commits":1,"url":"https://github.com/CritasWang"},{"name":"Lei","username":"Lei","email":"33376433+LeiRui@users.noreply.github.com","commits":1,"url":"https://github.com/Lei"},{"name":"W1y1r","username":"W1y1r","email":"150988475+W1y1r@users.noreply.github.com","commits":1,"url":"https://github.com/W1y1r"}]},"readingTime":{"minutes":2.35,"words":705},"filePathRelative":"UserGuide/V1.2.x/Ecosystem-Integration/Flink-IoTDB.md","localizedDate":"July 10, 2023","autoDesc":true}');export{r as comp,i as data}; |