blob: 528b8f41320d3ddb373227b82fd747223b9770c6 [file] [log] [blame]
import{_ as t,r as p,o,c as e,b as n,d as s,a as c,e as i}from"./app-Bp5kEZWW.js";const l={},u=n("h2",{id:"flink-iotdb-connector",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#flink-iotdb-connector"},[n("span",null,"Flink-IoTDB-Connector")])],-1),k={href:"https://flink.apache.org/",target:"_blank",rel:"noopener noreferrer"},r=i(`<h3 id="iotdbsink" tabindex="-1"><a class="header-anchor" href="#iotdbsink"><span>IoTDBSink</span></a></h3><p>To use the <code>IoTDBSink</code>, you need construct an instance of it by specifying <code>IoTDBSinkOptions</code> and <code>IoTSerializationSchema</code> instances.<br> The <code>IoTDBSink</code> send only one event after another by default, but you can change to batch by invoking <code>withBatchSize(int)</code>.</p><h4 id="example" tabindex="-1"><a class="header-anchor" href="#example"><span>Example</span></a></h4><p>This example shows a case that sends data to a IoTDB server from a Flink job:</p><ul><li>A simulated Source <code>SensorSource</code> generates data points per 1 second.</li><li>Flink uses <code>IoTDBSink</code> to consume the generated data points and write the data into IoTDB.</li></ul><p>It is noteworthy that to use IoTDBSink, schema auto-creation in IoTDB should be enabled.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>file<span class="token punctuation">.</span>metadata<span class="token punctuation">.</span>enums<span class="token punctuation">.</span></span><span class="token class-name">CompressionType</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>file<span class="token punctuation">.</span>metadata<span class="token punctuation">.</span>enums<span class="token punctuation">.</span></span><span class="token class-name">TSDataType</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>file<span class="token punctuation">.</span>metadata<span class="token punctuation">.</span>enums<span class="token punctuation">.</span></span><span class="token class-name">TSEncoding</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">com<span class="token punctuation">.</span>google<span class="token punctuation">.</span>common<span class="token punctuation">.</span>collect<span class="token punctuation">.</span></span><span class="token class-name">Lists</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>streaming<span class="token punctuation">.</span>api<span class="token punctuation">.</span>environment<span class="token punctuation">.</span></span><span class="token class-name">StreamExecutionEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>streaming<span class="token punctuation">.</span>api<span class="token punctuation">.</span>functions<span class="token punctuation">.</span>source<span class="token punctuation">.</span></span><span class="token class-name">SourceFunction</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">java<span class="token punctuation">.</span>security<span class="token punctuation">.</span></span><span class="token class-name">SecureRandom</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">HashMap</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Map</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Random</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">FlinkIoTDBSink</span> <span class="token punctuation">{</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token comment">// run the flink job on local mini cluster</span>
<span class="token class-name">StreamExecutionEnvironment</span> env <span class="token operator">=</span> <span class="token class-name">StreamExecutionEnvironment</span><span class="token punctuation">.</span><span class="token function">getExecutionEnvironment</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">IoTDBSinkOptions</span> options <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">IoTDBSinkOptions</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
options<span class="token punctuation">.</span><span class="token function">setHost</span><span class="token punctuation">(</span><span class="token string">&quot;127.0.0.1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
options<span class="token punctuation">.</span><span class="token function">setPort</span><span class="token punctuation">(</span><span class="token number">6667</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
options<span class="token punctuation">.</span><span class="token function">setUser</span><span class="token punctuation">(</span><span class="token string">&quot;root&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
options<span class="token punctuation">.</span><span class="token function">setPassword</span><span class="token punctuation">(</span><span class="token string">&quot;root&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// If the server enables auto_create_schema, then we do not need to register all timeseries</span>
<span class="token comment">// here.</span>
options<span class="token punctuation">.</span><span class="token function">setTimeseriesOptionList</span><span class="token punctuation">(</span>
<span class="token class-name">Lists</span><span class="token punctuation">.</span><span class="token function">newArrayList</span><span class="token punctuation">(</span>
<span class="token keyword">new</span> <span class="token class-name">IoTDBSinkOptions<span class="token punctuation">.</span>TimeseriesOption</span><span class="token punctuation">(</span>
<span class="token string">&quot;root.sg.d1.s1&quot;</span><span class="token punctuation">,</span> <span class="token class-name">TSDataType</span><span class="token punctuation">.</span><span class="token constant">DOUBLE</span><span class="token punctuation">,</span> <span class="token class-name">TSEncoding</span><span class="token punctuation">.</span><span class="token constant">GORILLA</span><span class="token punctuation">,</span> <span class="token class-name">CompressionType</span><span class="token punctuation">.</span><span class="token constant">SNAPPY</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">IoTSerializationSchema</span> serializationSchema <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">DefaultIoTSerializationSchema</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">IoTDBSink</span> ioTDBSink <span class="token operator">=</span>
<span class="token keyword">new</span> <span class="token class-name">IoTDBSink</span><span class="token punctuation">(</span>options<span class="token punctuation">,</span> serializationSchema<span class="token punctuation">)</span>
<span class="token comment">// enable batching</span>
<span class="token punctuation">.</span><span class="token function">withBatchSize</span><span class="token punctuation">(</span><span class="token number">10</span><span class="token punctuation">)</span>
<span class="token comment">// how many connectons to the server will be created for each parallelism</span>
<span class="token punctuation">.</span><span class="token function">withSessionPoolSize</span><span class="token punctuation">(</span><span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
env<span class="token punctuation">.</span><span class="token function">addSource</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">SensorSource</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">name</span><span class="token punctuation">(</span><span class="token string">&quot;sensor-source&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">setParallelism</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">addSink</span><span class="token punctuation">(</span>ioTDBSink<span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">name</span><span class="token punctuation">(</span><span class="token string">&quot;iotdb-sink&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
env<span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token string">&quot;iotdb-flink-example&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token keyword">private</span> <span class="token keyword">static</span> <span class="token keyword">class</span> <span class="token class-name">SensorSource</span> <span class="token keyword">implements</span> <span class="token class-name">SourceFunction</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Map</span><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span><span class="token punctuation">&gt;</span></span> <span class="token punctuation">{</span>
<span class="token keyword">boolean</span> running <span class="token operator">=</span> <span class="token boolean">true</span><span class="token punctuation">;</span>
<span class="token class-name">Random</span> random <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">SecureRandom</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token annotation punctuation">@Override</span>
<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">run</span><span class="token punctuation">(</span><span class="token class-name">SourceContext</span> context<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">while</span> <span class="token punctuation">(</span>running<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token class-name">Map</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> tuple <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">HashMap</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tuple<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;device&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;root.sg.d1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tuple<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;timestamp&quot;</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">.</span><span class="token function">valueOf</span><span class="token punctuation">(</span><span class="token class-name">System</span><span class="token punctuation">.</span><span class="token function">currentTimeMillis</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tuple<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;measurements&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;s1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tuple<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;types&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;DOUBLE&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
tuple<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;values&quot;</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">.</span><span class="token function">valueOf</span><span class="token punctuation">(</span>random<span class="token punctuation">.</span><span class="token function">nextDouble</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
context<span class="token punctuation">.</span><span class="token function">collect</span><span class="token punctuation">(</span>tuple<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">Thread</span><span class="token punctuation">.</span><span class="token function">sleep</span><span class="token punctuation">(</span><span class="token number">1000</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token annotation punctuation">@Override</span>
<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">cancel</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
running <span class="token operator">=</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="usage" tabindex="-1"><a class="header-anchor" href="#usage"><span>Usage</span></a></h4><ul><li>Launch the IoTDB server.</li><li>Run <code>org.apache.iotdb.flink.FlinkIoTDBSink.java</code> to run the flink job on local mini cluster.</li></ul><h3 id="iotdbsource" tabindex="-1"><a class="header-anchor" href="#iotdbsource"><span>IoTDBSource</span></a></h3><p>To use the <code>IoTDBSource</code>, you need to construct an instance of <code>IoTDBSource</code> by specifying <code>IoTDBSourceOptions</code><br> and implementing the abstract method <code>convert()</code> in <code>IoTDBSource</code>. The <code>convert</code> methods defines how<br> you want the row data to be transformed.</p><h4 id="example-1" tabindex="-1"><a class="header-anchor" href="#example-1"><span>Example</span></a></h4><p>This example shows a case where data are read from IoTDB.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>options<span class="token punctuation">.</span></span><span class="token class-name">IoTDBSourceOptions</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>rpc<span class="token punctuation">.</span></span><span class="token class-name">IoTDBConnectionException</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>rpc<span class="token punctuation">.</span></span><span class="token class-name">StatementExecutionException</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>rpc<span class="token punctuation">.</span></span><span class="token class-name">TSStatusCode</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>session<span class="token punctuation">.</span></span><span class="token class-name">Session</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>file<span class="token punctuation">.</span>metadata<span class="token punctuation">.</span>enums<span class="token punctuation">.</span></span><span class="token class-name">CompressionType</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>file<span class="token punctuation">.</span>metadata<span class="token punctuation">.</span>enums<span class="token punctuation">.</span></span><span class="token class-name">TSDataType</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>file<span class="token punctuation">.</span>metadata<span class="token punctuation">.</span>enums<span class="token punctuation">.</span></span><span class="token class-name">TSEncoding</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>iotdb<span class="token punctuation">.</span>tsfile<span class="token punctuation">.</span>read<span class="token punctuation">.</span>common<span class="token punctuation">.</span></span><span class="token class-name">RowRecord</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>flink<span class="token punctuation">.</span>streaming<span class="token punctuation">.</span>api<span class="token punctuation">.</span>environment<span class="token punctuation">.</span></span><span class="token class-name">StreamExecutionEnvironment</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">ArrayList</span></span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token import"><span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">List</span></span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">FlinkIoTDBSource</span> <span class="token punctuation">{</span>
<span class="token keyword">static</span> <span class="token keyword">final</span> <span class="token class-name">String</span> <span class="token constant">LOCAL_HOST</span> <span class="token operator">=</span> <span class="token string">&quot;127.0.0.1&quot;</span><span class="token punctuation">;</span>
<span class="token keyword">static</span> <span class="token keyword">final</span> <span class="token class-name">String</span> <span class="token constant">ROOT_SG1_D1_S1</span> <span class="token operator">=</span> <span class="token string">&quot;root.sg1.d1.s1&quot;</span><span class="token punctuation">;</span>
<span class="token keyword">static</span> <span class="token keyword">final</span> <span class="token class-name">String</span> <span class="token constant">ROOT_SG1_D1</span> <span class="token operator">=</span> <span class="token string">&quot;root.sg1.d1&quot;</span><span class="token punctuation">;</span>
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token function">prepareData</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token comment">// run the flink job on local mini cluster</span>
<span class="token class-name">StreamExecutionEnvironment</span> env <span class="token operator">=</span> <span class="token class-name">StreamExecutionEnvironment</span><span class="token punctuation">.</span><span class="token function">getExecutionEnvironment</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">IoTDBSourceOptions</span> ioTDBSourceOptions <span class="token operator">=</span>
<span class="token keyword">new</span> <span class="token class-name">IoTDBSourceOptions</span><span class="token punctuation">(</span><span class="token string">&quot;127.0.0.1&quot;</span><span class="token punctuation">,</span> <span class="token number">6667</span><span class="token punctuation">,</span> <span class="token string">&quot;root&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;root&quot;</span><span class="token punctuation">,</span>
<span class="token string">&quot;select s1 from &quot;</span> <span class="token operator">+</span> <span class="token constant">ROOT_SG1_D1</span> <span class="token operator">+</span> <span class="token string">&quot; align by device&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
env<span class="token punctuation">.</span><span class="token function">addSource</span><span class="token punctuation">(</span>
<span class="token keyword">new</span> <span class="token class-name">IoTDBSource</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">RowRecord</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>ioTDBSourceOptions<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token annotation punctuation">@Override</span>
<span class="token keyword">public</span> <span class="token class-name">RowRecord</span> <span class="token function">convert</span><span class="token punctuation">(</span><span class="token class-name">RowRecord</span> rowRecord<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token keyword">return</span> rowRecord<span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">name</span><span class="token punctuation">(</span><span class="token string">&quot;sensor-source&quot;</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token punctuation">.</span><span class="token function">setParallelism</span><span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
env<span class="token punctuation">.</span><span class="token function">execute</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* Write some data to IoTDB
*/</span>
<span class="token keyword">private</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">prepareData</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">IoTDBConnectionException</span><span class="token punctuation">,</span> <span class="token class-name">StatementExecutionException</span> <span class="token punctuation">{</span>
<span class="token class-name">Session</span> session <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Session</span><span class="token punctuation">(</span><span class="token constant">LOCAL_HOST</span><span class="token punctuation">,</span> <span class="token number">6667</span><span class="token punctuation">,</span> <span class="token string">&quot;root&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;root&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
session<span class="token punctuation">.</span><span class="token keyword">open</span><span class="token punctuation">(</span><span class="token boolean">false</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">try</span> <span class="token punctuation">{</span>
session<span class="token punctuation">.</span><span class="token function">setStorageGroup</span><span class="token punctuation">(</span><span class="token string">&quot;root.sg1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token operator">!</span>session<span class="token punctuation">.</span><span class="token function">checkTimeseriesExists</span><span class="token punctuation">(</span><span class="token constant">ROOT_SG1_D1_S1</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
session<span class="token punctuation">.</span><span class="token function">createTimeseries</span><span class="token punctuation">(</span>
<span class="token constant">ROOT_SG1_D1_S1</span><span class="token punctuation">,</span> <span class="token class-name">TSDataType</span><span class="token punctuation">.</span><span class="token constant">INT64</span><span class="token punctuation">,</span> <span class="token class-name">TSEncoding</span><span class="token punctuation">.</span><span class="token constant">RLE</span><span class="token punctuation">,</span> <span class="token class-name">CompressionType</span><span class="token punctuation">.</span><span class="token constant">SNAPPY</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">List</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> measurements <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">ArrayList</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token class-name">List</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TSDataType</span><span class="token punctuation">&gt;</span></span> types <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">ArrayList</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
measurements<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token string">&quot;s1&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
measurements<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token string">&quot;s2&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
measurements<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token string">&quot;s3&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
types<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token class-name">TSDataType</span><span class="token punctuation">.</span><span class="token constant">INT64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
types<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token class-name">TSDataType</span><span class="token punctuation">.</span><span class="token constant">INT64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
types<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token class-name">TSDataType</span><span class="token punctuation">.</span><span class="token constant">INT64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">long</span> time <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> time <span class="token operator">&lt;</span> <span class="token number">100</span><span class="token punctuation">;</span> time<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token class-name">List</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Object</span><span class="token punctuation">&gt;</span></span> values <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">ArrayList</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
values<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token number">1L</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
values<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token number">2L</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
values<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token number">3L</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
session<span class="token punctuation">.</span><span class="token function">insertRecord</span><span class="token punctuation">(</span><span class="token constant">ROOT_SG1_D1</span><span class="token punctuation">,</span> time<span class="token punctuation">,</span> measurements<span class="token punctuation">,</span> types<span class="token punctuation">,</span> values<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span> <span class="token keyword">catch</span> <span class="token punctuation">(</span><span class="token class-name">StatementExecutionException</span> e<span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token keyword">if</span> <span class="token punctuation">(</span>e<span class="token punctuation">.</span><span class="token function">getStatusCode</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">!=</span> <span class="token class-name">TSStatusCode</span><span class="token punctuation">.</span><span class="token constant">PATH_ALREADY_EXIST_ERROR</span><span class="token punctuation">.</span><span class="token function">getStatusCode</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token keyword">throw</span> e<span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="usage-1" tabindex="-1"><a class="header-anchor" href="#usage-1"><span>Usage</span></a></h4><p>Launch the IoTDB server.<br> Run org.apache.iotdb.flink.FlinkIoTDBSource.java to run the flink job on local mini cluster.</p>`,16);function d(m,v){const a=p("ExternalLinkIcon");return o(),e("div",null,[u,n("p",null,[s("IoTDB integration for "),n("a",k,[s("Apache Flink"),c(a)]),s(". This module includes the IoTDB sink that allows a flink job to write events into timeseries, and the IoTDB source allowing reading data from IoTDB.")]),r])}const h=t(l,[["render",d],["__file","Flink-IoTDB.html.vue"]]),g=JSON.parse('{"path":"/UserGuide/V0.13.x/Ecosystem-Integration/Flink-IoTDB.html","title":"","lang":"en-US","frontmatter":{"description":"Flink-IoTDB-Connector IoTDB integration for Apache Flink. This module includes the IoTDB sink that allows a flink job to write events into timeseries, and the IoTDB source allow...","head":[["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/V0.13.x/Ecosystem-Integration/Flink-IoTDB.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/V0.13.x/Ecosystem-Integration/Flink-IoTDB.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:description","content":"Flink-IoTDB-Connector IoTDB integration for Apache Flink. This module includes the IoTDB sink that allows a flink job to write events into timeseries, and the IoTDB source allow..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2023-07-10T03:11:17.000Z"}],["meta",{"property":"article:modified_time","content":"2023-07-10T03:11:17.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"\\",\\"image\\":[\\"\\"],\\"dateModified\\":\\"2023-07-10T03:11:17.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"Flink-IoTDB-Connector","slug":"flink-iotdb-connector","link":"#flink-iotdb-connector","children":[{"level":3,"title":"IoTDBSink","slug":"iotdbsink","link":"#iotdbsink","children":[]},{"level":3,"title":"IoTDBSource","slug":"iotdbsource","link":"#iotdbsource","children":[]}]}],"git":{"createdTime":1688958677000,"updatedTime":1688958677000,"contributors":[{"name":"CritasWang","email":"critas@outlook.com","commits":1}]},"readingTime":{"minutes":2.34,"words":703},"filePathRelative":"UserGuide/V0.13.x/Ecosystem-Integration/Flink-IoTDB.md","localizedDate":"July 10, 2023","autoDesc":true}');export{h as comp,g as data};