blob: b56b84fd980d38b215782b7d82a63b2a6556ab1b [file] [log] [blame]
import{_ as l,r as o,o as c,c as r,b as n,d as s,a,w as t,e as p}from"./app-Bp5kEZWW.js";const u={},d=n("h1",{id:"iotdb-流处理框架",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#iotdb-流处理框架"},[n("span",null,"IoTDB 流处理框架")])],-1),k=n("p",null,"IoTDB 流处理框架允许用户实现自定义的流处理逻辑,可以实现对存储引擎变更的监听和捕获、实现对变更数据的变形、实现对变形后数据的向外推送等逻辑。",-1),v=p(`<ul><li>抽取(Source)</li><li>处理(Process)</li><li>发送(Sink)</li></ul><p>流处理框架允许用户使用 Java 语言自定义编写三个子任务的处理逻辑,通过类似 UDF 的方式处理数据。<br> 在一个 Pipe 中,上述的三个子任务分别由三种插件执行实现,数据会依次经过这三个插件进行处理:<br> Pipe Source 用于抽取数据,Pipe Processor 用于处理数据,Pipe Sink 用于发送数据,最终数据将被发至外部系统。</p><p><strong>Pipe 任务的模型如下:</strong></p><figure><img src="https://alioss.timecho.com/docs/img/1706697228308.jpg" alt="任务模型图" tabindex="0" loading="lazy"><figcaption>任务模型图</figcaption></figure><p>描述一个数据流处理任务,本质就是描述 Pipe Source、Pipe Processor 和 Pipe Sink 插件的属性。<br> 用户可以通过 SQL 语句声明式地配置三个子任务的具体属性,通过组合不同的属性,实现灵活的数据 ETL 能力。</p><p>利用流处理框架,可以搭建完整的数据链路来满足端<em>边云同步、异地灾备、读写负载分库</em>等需求。</p><h2 id="自定义流处理插件开发" tabindex="-1"><a class="header-anchor" href="#自定义流处理插件开发"><span>自定义流处理插件开发</span></a></h2><h3 id="编程开发依赖" tabindex="-1"><a class="header-anchor" href="#编程开发依赖"><span>编程开发依赖</span></a></h3><p>推荐采用 maven 构建项目,在<code>pom.xml</code>中添加以下依赖。请注意选择和 IoTDB 服务器版本相同的依赖版本。</p><div class="language-xml line-numbers-mode" data-ext="xml" data-title="xml"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.iotdb<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>pipe-api<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>1.3.1<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>scope</span><span class="token punctuation">&gt;</span></span>provided<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>scope</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="事件驱动编程模型" tabindex="-1"><a class="header-anchor" href="#事件驱动编程模型"><span>事件驱动编程模型</span></a></h3><p>流处理插件的用户编程接口设计,参考了事件驱动编程模型的通用设计理念。事件(Event)是用户编程接口中的数据抽象,而编程接口与具体的执行方式解耦,只需要专注于描述事件(数据)到达系统后,系统期望的处理方式即可。</p><p>在流处理插件的用户编程接口中,事件是数据库数据写入操作的抽象。事件由单机流处理引擎捕获,按照流处理三个阶段的流程,依次传递至 PipeSource 插件,PipeProcessor 插件和 PipeSink 插件,并依次在三个插件中触发用户逻辑的执行。</p><p>为了兼顾端侧低负载场景下的流处理低延迟和端侧高负载场景下的流处理高吞吐,流处理引擎会动态地在操作日志和数据文件中选择处理对象,因此,流处理的用户编程接口要求用户提供下列两类事件的处理逻辑:操作日志写入事件 TabletInsertionEvent 和数据文件写入事件 TsFileInsertionEvent。</p><h4 id="操作日志写入事件-tabletinsertionevent" tabindex="-1"><a class="header-anchor" href="#操作日志写入事件-tabletinsertionevent"><span><strong>操作日志写入事件(TabletInsertionEvent)</strong></span></a></h4><p>操作日志写入事件(TabletInsertionEvent)是对用户写入请求的高层数据抽象,它通过提供统一的操作接口,为用户提供了操纵写入请求底层数据的能力。</p><p>对于不同的数据库部署方式,操作日志写入事件对应的底层存储结构是不一样的。对于单机部署的场景,操作日志写入事件是对写前日志(WAL)条目的封装;对于分布式部署的场景,操作日志写入事件是对单个节点共识协议操作日志条目的封装。</p><p>对于数据库不同写入请求接口生成的写入操作,操作日志写入事件对应的请求结构体的数据结构也是不一样的。IoTDB 提供了 InsertRecordInsertRecordsInsertTabletInsertTablets 等众多的写入接口,每一种写入请求都使用了完全不同的序列化方式,生成的二进制条目也不尽相同。</p><p>操作日志写入事件的存在,为用户提供了一种统一的数据操作视图,它屏蔽了底层数据结构的实现差异,极大地降低了用户的编程门槛,提升了功能的易用性。</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/** TabletInsertionEvent is used to define the event of data insertion. */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TabletInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* The consumer processes the data row by row and collects the results by RowCollector.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processRowByRow</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Row</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* The consumer processes the Tablet directly and collects the results by RowCollector.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processTablet</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Tablet</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="数据文件写入事件-tsfileinsertionevent" tabindex="-1"><a class="header-anchor" href="#数据文件写入事件-tsfileinsertionevent"><span><strong>数据文件写入事件(TsFileInsertionEvent)</strong></span></a></h4><p>数据文件写入事件(TsFileInsertionEvent) 是对数据库文件落盘操作的高层抽象,它是若干操作日志写入事件(TabletInsertionEvent)的数据集合。</p><p>IoTDB 的存储引擎是 LSM 结构的。数据写入时会先将写入操作落盘到日志结构的文件里,同时将写入数据保存在内存里。当内存达到控制上限,则会触发刷盘行为,即将内存中的数据转换为数据库文件,同时删除之前预写的操作日志。当内存中的数据转换为数据库文件中的数据时,会经过编码压缩和通用压缩两次压缩处理,因此数据库文件的数据相比内存中的原始数据占用的空间更少。</p><p>在极端的网络情况下,直接传输数据文件相比传输数据写入的操作要更加经济,它会占用更低的网络带宽,能实现更快的传输速度。当然,天下没有免费的午餐,对文件中的数据进行计算处理,相比直接对内存中的数据进行计算处理时,需要额外付出文件 I/O 的代价。但是,正是磁盘数据文件和内存写入操作两种结构各有优劣的存在,给了系统做动态权衡调整的机会,也正是基于这样的观察,插件的事件模型中才引入了数据文件写入事件。</p><p>综上,数据文件写入事件出现在流处理插件的事件流中,存在下面两种情况:</p><p>(1)历史数据抽取:一个流处理任务开始前,所有已经落盘的写入数据都会以 TsFile 的形式存在。一个流处理任务开始后,采集历史数据时,历史数据将以 TsFileInsertionEvent 作为抽象;</p><p>(2)实时数据抽取:一个流处理任务进行时,当数据流中实时处理操作日志写入事件的速度慢于写入请求速度一定进度之后,未来得及处理的操作日志写入事件会被被持久化至磁盘,以 TsFile 的形式存在,这一些数据被流处理引擎抽取到后,会以 TsFileInsertionEvent 作为抽象。</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
* which is compressed and encoded, and requires IO cost for computational processing.
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TsFileInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> the list of TabletInsertionEvent
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="自定义流处理插件编程接口定义" tabindex="-1"><a class="header-anchor" href="#自定义流处理插件编程接口定义"><span>自定义流处理插件编程接口定义</span></a></h3><p>基于自定义流处理插件编程接口,用户可以轻松编写数据抽取插件、数据处理插件和数据发送插件,从而使得流处理功能灵活适配各种工业场景。</p><h4 id="数据抽取插件接口" tabindex="-1"><a class="header-anchor" href="#数据抽取插件接口"><span>数据抽取插件接口</span></a></h4><p>数据抽取是流处理数据从数据抽取到数据发送三阶段的第一阶段。数据抽取插件(PipeSource)是流处理引擎和存储引擎的桥梁,它通过监听存储引擎的行为,<br> 捕获各种数据写入事件。</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeSource
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeSource is responsible for capturing events from sources.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various data sources can be supported by implementing different PipeSource classes.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeSource is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of \`WITH SOURCE\` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will
* be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to
* config the runtime behavior of the PipeSource.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Then the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to start the PipeSource.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be
* called to capture events from sources and then the events will be passed to the
* PipeProcessor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called when the collaboration task is
* cancelled (the \`DROP PIPE\` command is executed).
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeSource</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeSource. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeSourceRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeSource
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* Start the Source. After this method is called, events should be ready to be supplied by
* <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>. This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* Supply single event from the Source and the caller will send the event to the processor.
* This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@return</span> the event to be supplied. the event may be null if the Source has no more events at
* the moment, but the Source is still running for more events.
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token class-name">Event</span> <span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="数据处理插件接口" tabindex="-1"><a class="header-anchor" href="#数据处理插件接口"><span>数据处理插件接口</span></a></h4><p>数据处理是流处理数据从数据抽取到数据发送三阶段的第二阶段。数据处理插件(PipeProcessor)主要用于过滤和转换由数据抽取插件(PipeSource)捕获的<br> 各种事件。</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeProcessor
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeProcessor is used to filter and transform the Event formed by the PipeSource.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeProcessor is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of \`WITH PROCESSOR\` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* will be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
* to config the runtime behavior of the PipeProcessor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSource captures the events and wraps them into three types of Event instances.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeSink. The
* following 3 methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSink serializes the events into binaries and send them to sinks.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the \`DROP PIPE\` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeProcessor</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeProcessor. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeProcessorRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called and before the beginning of the
* events processing.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeProcessor
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is called to process the TabletInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is called to process the TsFileInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">default</span> <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">final</span> <span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent <span class="token operator">:</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token function">process</span><span class="token punctuation">(</span>tabletInsertionEvent<span class="token punctuation">,</span> eventCollector<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* This method is called to process the Event.
*
* <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="数据发送插件接口" tabindex="-1"><a class="header-anchor" href="#数据发送插件接口"><span>数据发送插件接口</span></a></h4><p>数据发送是流处理数据从数据抽取到数据发送三阶段的第三阶段。数据发送插件(PipeSink)主要用于发送经由数据处理插件(PipeProcessor)处理过后的<br> 各种事件,它作为流处理框架的网络实现层,接口上应允许接入多种实时通信协议和多种连接器。</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeSink
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeSink is responsible for sending events to sinks.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various network protocols can be supported by implementing different PipeSink classes.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeSink is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of \`WITH SINK\` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be
* called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span>
* <span class="token class-name">PipeSinkRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to config the runtime behavior of the
* PipeSink and the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to create a connection
* with sink.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSource captures the events and wraps them into three types of Event instances.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeSink.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSink serializes the events into binaries and send them to sinks. The following 3
* methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the \`DROP PIPE\` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>In addition, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called periodically to check
* whether the connection with sink is still alive. The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be
* called to create a new connection with the sink when the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* throws exceptions.
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeSink</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSinkRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeSink. In this method, the user can do the following
* things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeSinkRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is
* called and before the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeSink
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeSinkRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to create a connection with sink. This method will be called after the
* method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSinkRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called or
* will be called when the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> throws exceptions.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection is failed to be created
*/</span>
<span class="token keyword">void</span> <span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method will be called periodically to check whether the connection with sink is still
* alive.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection dies
*/</span>
<span class="token keyword">void</span> <span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the TabletInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the TsFileInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">default</span> <span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">try</span> <span class="token punctuation">{</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">final</span> <span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent <span class="token operator">:</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token function">transfer</span><span class="token punctuation">(</span>tabletInsertionEvent<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the generic events, including HeartbeatEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="自定义流处理插件管理" tabindex="-1"><a class="header-anchor" href="#自定义流处理插件管理"><span>自定义流处理插件管理</span></a></h2><p>为了保证用户自定义插件在实际生产中的灵活性和易用性,系统还需要提供对插件进行动态统一管理的能力。<br> 本章节介绍的流处理插件管理语句提供了对插件进行动态统一管理的入口。</p><h3 id="加载插件语句" tabindex="-1"><a class="header-anchor" href="#加载插件语句"><span>加载插件语句</span></a></h3><p>在 IoTDB 中,若要在系统中动态载入一个用户自定义插件,则首先需要基于 PipeSource PipeProcessor 或者 PipeSink 实现一个具体的插件类,<br> 然后需要将插件类编译打包成 jar 可执行文件,最后使用加载插件的管理语句将插件载入 IoTDB。</p><p>加载插件的管理语句的语法如图所示。</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPEPLUGIN <span class="token operator">&lt;</span>别名<span class="token operator">&gt;</span>
<span class="token keyword">AS</span> <span class="token operator">&lt;</span>全类名<span class="token operator">&gt;</span>
<span class="token keyword">USING</span> <span class="token operator">&lt;</span>JAR 包的 URI<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>示例:假如用户实现了一个全类名为edu.tsinghua.iotdb.pipe.ExampleProcessor 的数据处理插件,打包后的jar包为 pipe-plugin.jar ,用户希望在流处理引擎中使用这个插件,将插件标记为 example。插件包有两种使用方式,一种为上传到URI服务器,一种为上传到集群本地目录,两种方法任选一种即可。</p><p>【方式一】上传到URI服务器</p>`,47),m={href:"https://example.com:8080/iotdb/pipe-plugin.jar",target:"_blank",rel:"noopener noreferrer"},b=p(`<p>创建语句:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SQL</span> <span class="token keyword">CREATE</span> PIPEPLUGIN example
<span class="token keyword">AS</span> <span class="token string">&#39;edu.tsinghua.iotdb.pipe.ExampleProcessor&#39;</span>
<span class="token keyword">USING</span> URI <span class="token string">&#39;&lt;https://example.com:8080/iotdb/pipe-plugin.jar&gt;&#39;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>【方式二】上传到集群本地目录</p><p>准备工作:使用该种方式注册,您需要提前将 JAR 包放置到DataNode节点所在机器的任意路径下,推荐您将JAR包放在IoTDB安装路径的/ext/pipe目录下(安装包中已有,无需新建)。例如:iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar。(<strong>注意:如果您使用的是集群,那么需要将 JAR 包放置到每个 DataNode 节点所在机器的该路径下)</strong></p><p>创建语句:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SQL</span> <span class="token keyword">CREATE</span> PIPEPLUGIN example
<span class="token keyword">AS</span> <span class="token string">&#39;edu.tsinghua.iotdb.pipe.ExampleProcessor&#39;</span>
<span class="token keyword">USING</span> URI <span class="token string">&#39;&lt;file:/iotdb安装路径/iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar&gt;&#39;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="删除插件语句" tabindex="-1"><a class="header-anchor" href="#删除插件语句"><span>删除插件语句</span></a></h3><p>当用户不再想使用一个插件,需要将插件从系统中卸载时,可以使用如图所示的删除插件语句。</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPEPLUGIN <span class="token operator">&lt;</span>别名<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="查看插件语句" tabindex="-1"><a class="header-anchor" href="#查看插件语句"><span>查看插件语句</span></a></h3><p>用户也可以按需查看系统中的插件。查看插件的语句如图所示。</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPEPLUGINS
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h2 id="系统预置的流处理插件" tabindex="-1"><a class="header-anchor" href="#系统预置的流处理插件"><span>系统预置的流处理插件</span></a></h2><h3 id="预置-source-插件" tabindex="-1"><a class="header-anchor" href="#预置-source-插件"><span>预置 source 插件</span></a></h3><h4 id="iotdb-source" tabindex="-1"><a class="header-anchor" href="#iotdb-source"><span>iotdb-source</span></a></h4><p>作用:抽取 IoTDB 内部的历史或实时数据进入 pipe。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>source</td><td>iotdb-source</td><td>String: iotdb-source</td><td>required</td></tr><tr><td>source.pattern</td><td>用于筛选时间序列的路径前缀</td><td>String: 任意的时间序列前缀</td><td>optional: root</td></tr><tr><td>source.history.start-time</td><td>抽取的历史数据的开始 event time,包含 start-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>source.history.end-time</td><td>抽取的历史数据的结束 event time,包含 end-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>start-time(V1.3.1+)</td><td>start of synchronizing all data event timeincluding start-time. Will disable &quot;history.start-time&quot; &quot;history.end-time&quot; if configured</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>end-time(V1.3.1+)</td><td>end of synchronizing all data event time,including end-time. Will disable &quot;history.start-time&quot; &quot;history.end-time&quot; if configured</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>source.realtime.mode</td><td>实时数据的抽取模式</td><td>String: hybrid, log, file</td><td>optional: hybrid</td></tr><tr><td>source.forwarding-pipe-requests</td><td>是否抽取由其他 Pipe (通常是数据同步)写入的数据</td><td>Boolean: true, false</td><td>optional: true</td></tr></tbody></table>`,17),h=n("p",null,[s("🚫 "),n("strong",null,"source.pattern 参数说明")],-1),g={href:"https://iotdb.apache.org/zh/Download/#_1-0-%E7%89%88%E6%9C%AC%E4%B8%8D%E5%85%BC%E5%AE%B9%E7%9A%84%E8%AF%AD%E6%B3%95%E8%AF%A6%E7%BB%86%E8%AF%B4%E6%98%8E",target:"_blank",rel:"noopener noreferrer"},f=p("<li><p>在底层实现中,当检测到 pattern root(默认值)时,抽取效率较高,其他任意格式都将降低性能</p></li><li><p>路径前缀不需要能够构成完整的路径。例如,当创建一个包含参数为 &#39;source.pattern&#39;=&#39;root.aligned.1&#39; pipe 时:</p><ul><li>root.aligned.1TS</li><li>root.aligned.1TS.`1`</li><li>root.aligned.100T</li></ul><p>的数据会被抽取;</p><ul><li>root.aligned.`1`</li><li>root.aligned.`123`</li></ul><p>的数据不会被抽取。</p></li>",2),P=p(`<blockquote><p>❗️<strong>source.history 的 start-time,end-time 参数说明</strong></p><ul><li>start-time,end-time 应为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00</li></ul></blockquote><blockquote><p>✅ <strong>一条数据从生产到落库 IoTDB,包含两个关键的时间概念</strong></p><ul><li><strong>event time:</strong> 数据实际生产时的时间(或者数据生产系统给数据赋予的生成时间,是数据点中的时间项),也称为事件时间。</li><li><strong>arrival time:</strong> 数据到达 IoTDB 系统内的时间。</li></ul><p>我们常说的乱序数据,指的是数据到达时,其 <strong>event time</strong> 远落后于当前系统时间(或者已经落库的最大 <strong>event time</strong>)的数据。另一方面,不论是乱序数据还是顺序数据,只要它们是新到达系统的,那它们的 <strong>arrival time</strong> 都是会随着数据到达 IoTDB 的顺序递增的。</p></blockquote><blockquote><p>💎 <strong>iotdb-source 的工作可以拆分成两个阶段</strong></p><ol><li>历史数据抽取:所有 <strong>arrival time</strong> &lt; 创建 pipe 时<strong>当前系统时间</strong>的数据称为历史数据</li><li>实时数据抽取:所有 <strong>arrival time</strong> &gt;= 创建 pipe 时<strong>当前系统时间</strong>的数据称为实时数据</li></ol><p>历史数据传输阶段和实时数据传输阶段,<strong>两阶段串行执行,只有当历史数据传输阶段完成后,才执行实时数据传输阶段。</strong></p></blockquote><blockquote><p>📌 <strong>source.realtime.mode:数据抽取的模式</strong></p><ul><li>log:该模式下,任务仅使用操作日志进行数据处理、发送</li><li>file:该模式下,任务仅使用数据文件进行数据处理、发送</li><li>hybrid:该模式,考虑了按操作日志逐条目发送数据时延迟低但吞吐低的特点,以及按数据文件批量发送时发送吞吐高但延迟高的特点,能够在不同的写入负载下自动切换适合的数据抽取方式,首先采取基于操作日志的数据抽取方式以保证低发送延迟,当产生数据积压时自动切换成基于数据文件的数据抽取方式以保证高发送吞吐,积压消除时自动切换回基于操作日志的数据抽取方式,避免了采用单一数据抽取算法难以平衡数据发送延迟或吞吐的问题。</li></ul></blockquote><blockquote><p>🍕 <strong>source.forwarding-pipe-requests:是否允许转发从另一 pipe 传输而来的数据</strong></p><ul><li>如果要使用 pipe 构建 A -&gt; B -&gt; C 的数据同步,那么 B -&gt; C 的 pipe 需要将该参数为 true 后,A -&gt; B 中 A 通过 pipe 写入 B 的数据才能被正确转发到 C</li><li>如果要使用 pipe 构建 A &lt;-&gt; B 的双向数据同步(双活),那么 A -&gt; B B -&gt; A pipe 都需要将该参数设置为 false,否则将会造成数据无休止的集群间循环转发</li></ul></blockquote><h3 id="预置-processor-插件" tabindex="-1"><a class="header-anchor" href="#预置-processor-插件"><span>预置 processor 插件</span></a></h3><h4 id="do-nothing-processor" tabindex="-1"><a class="header-anchor" href="#do-nothing-processor"><span>do-nothing-processor</span></a></h4><p>作用:不对 source 传入的事件做任何的处理。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>processor</td><td>do-nothing-processor</td><td>String: do-nothing-processor</td><td>required</td></tr></tbody></table><h3 id="预置-sink-插件" tabindex="-1"><a class="header-anchor" href="#预置-sink-插件"><span>预置 sink 插件</span></a></h3><h4 id="do-nothing-sink" tabindex="-1"><a class="header-anchor" href="#do-nothing-sink"><span>do-nothing-sink</span></a></h4><p>作用:不对 processor 传入的事件做任何的处理。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>sink</td><td>do-nothing-sink</td><td>String: do-nothing-sink</td><td>required</td></tr></tbody></table><h2 id="流处理任务管理" tabindex="-1"><a class="header-anchor" href="#流处理任务管理"><span>流处理任务管理</span></a></h2><h3 id="创建流处理任务" tabindex="-1"><a class="header-anchor" href="#创建流处理任务"><span>创建流处理任务</span></a></h3><p>使用 <code>CREATE PIPE</code> 语句来创建流处理任务。以数据同步流处理任务的创建为例,示例 SQL 语句如下:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId 是能够唯一标定流处理任务的名字</span>
<span class="token keyword">WITH</span> SOURCE <span class="token punctuation">(</span>
<span class="token comment">-- 默认的 IoTDB 数据抽取插件</span>
<span class="token string">&#39;source&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-source&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 路径前缀,只有能够匹配该路径前缀的数据才会被抽取,用作后续的处理和发送</span>
<span class="token string">&#39;source.pattern&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;root.timecho&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 是否抽取历史数据</span>
<span class="token string">&#39;source.history.enable&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 描述被抽取的历史数据的时间范围,表示最早时间</span>
<span class="token string">&#39;source.history.start-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2011.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 描述被抽取的历史数据的时间范围,表示最晚时间</span>
<span class="token string">&#39;source.history.end-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2022.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 是否抽取实时数据</span>
<span class="token string">&#39;source.realtime.enable&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 描述实时数据的抽取方式</span>
<span class="token string">&#39;source.realtime.mode&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;hybrid&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> PROCESSOR <span class="token punctuation">(</span>
<span class="token comment">-- 默认的数据处理插件,即不做任何处理</span>
<span class="token string">&#39;processor&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;do-nothing-processor&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token comment">-- IoTDB 数据发送插件,目标端为 IoTDB</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>创建流处理任务时需要配置 PipeId 以及三个插件部分的参数:</strong></p>`,18),w=n("thead",null,[n("tr",null,[n("th",null,"配置项"),n("th",null,"说明"),n("th",null,"是否必填"),n("th",null,"默认实现"),n("th",null,"默认实现说明"),n("th",null,"是否允许自定义实现")])],-1),y=n("td",null,"PipeId",-1),E=n("td",null,"全局唯一标定一个流处理任务的名称",-1),I=n("td",null,"-",-1),T=n("td",null,"-",-1),_=n("td",null,"-",-1),x=n("tr",null,[n("td",null,"source"),n("td",null,"Pipe Source 插件,负责在数据库底层抽取流处理数据"),n("td",null,"选填"),n("td",null,"iotdb-source"),n("td",null,"将数据库的全量历史数据和后续到达的实时数据接入流处理任务"),n("td",null,"否")],-1),S=n("td",null,"processor",-1),q=n("td",null,"Pipe Processor 插件,负责处理数据",-1),R=n("td",null,"选填",-1),C=n("td",null,"do-nothing-processor",-1),B=n("td",null,"对传入的数据不做任何处理",-1),D=n("td",null,"sink",-1),A=n("td",null,"Pipe Sink 插件,负责发送数据",-1),U=n("td",null,"-",-1),N=n("td",null,"-",-1),L=p(`<p>示例中,使用了 iotdb-source、do-nothing-processor 和 iotdb-thrift-sink 插件构建数据流处理任务。IoTDB 还内置了其他的流处理插件,<strong>请查看“系统预置流处理插件”一节</strong>。</p><p><strong>一个最简的 CREATE PIPE 语句示例如下:</strong></p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId 是能够唯一标定流处理任务的名字</span>
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token comment">-- IoTDB 数据发送插件,目标端为 IoTDB</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>其表达的语义是:将本数据库实例中的全量历史数据和后续到达的实时数据,同步到目标为 127.0.0.1:6667 的 IoTDB 实例上。</p><p><strong>注意:</strong></p><ul><li><p>SOURCE PROCESSOR 为选填配置,若不填写配置参数,系统则会采用相应的默认实现</p></li><li><p>SINK 为必填配置,需要在 CREATE PIPE 语句中声明式配置</p></li><li><p>SINK 具备自复用能力。对于不同的流处理任务,如果他们的 SINK 具备完全相同 KV 属性的(所有属性的 key 对应的 value 都相同),<strong>那么系统最终只会创建一个 SINK 实例</strong>,以实现对连接资源的复用。</p><ul><li>例如,有下面 pipe1, pipe2 两个流处理任务的声明:</li></ul><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE pipe1
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">CREATE</span> PIPE pipe2
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ul><li>因为它们对 SINK 的声明完全相同(<strong>即使某些属性声明时的顺序不同</strong>),所以框架会自动对它们声明的 SINK 进行复用,最终 pipe1, pipe2 SINK 将会是同一个实例。</li></ul></li><li><p>在 source 为默认的 iotdb-source,且 source.forwarding-pipe-requests 为默认值 true 时,请不要构建出包含数据循环同步的应用场景(会导致无限循环):</p><ul><li>IoTDB A -&gt; IoTDB B -&gt; IoTDB A</li><li>IoTDB A -&gt; IoTDB A</li></ul></li></ul><h3 id="启动流处理任务" tabindex="-1"><a class="header-anchor" href="#启动流处理任务"><span>启动流处理任务</span></a></h3><p>CREATE PIPE 语句成功执行后,流处理任务相关实例会被创建,但整个流处理任务的运行状态会被置为 STOPPED,即流处理任务不会立刻处理数据(V1.3.0)。在 1.3.1 及以上的版本,流处理任务的运行状态在创建后将被立即置为 RUNNING。</p><p>可以使用 START PIPE 语句使流处理任务开始处理数据:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">START</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="停止流处理任务" tabindex="-1"><a class="header-anchor" href="#停止流处理任务"><span>停止流处理任务</span></a></h3><p>使用 STOP PIPE 语句使流处理任务停止处理数据:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code>STOP PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="删除流处理任务" tabindex="-1"><a class="header-anchor" href="#删除流处理任务"><span>删除流处理任务</span></a></h3><p>使用 DROP PIPE 语句使流处理任务停止处理数据(当流处理任务状态为 RUNNING 时),然后删除整个流处理任务流处理任务:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>用户在删除流处理任务前,不需要执行 STOP 操作。</p><h3 id="展示流处理任务" tabindex="-1"><a class="header-anchor" href="#展示流处理任务"><span>展示流处理任务</span></a></h3><p>使用 SHOW PIPES 语句查看所有流处理任务:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>查询结果如下:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
<span class="token operator">|</span> ID<span class="token operator">|</span> CreationTime <span class="token operator">|</span> State<span class="token operator">|</span>PipeSource<span class="token operator">|</span>PipeProcessor<span class="token operator">|</span>PipeSink<span class="token operator">|</span>ExceptionMessage<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>kafka<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">30</span>T20:<span class="token number">58</span>:<span class="token number">30.689</span><span class="token operator">|</span>RUNNING<span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> {}<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>iotdb<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">31</span>T12:<span class="token number">55</span>:<span class="token number">28.129</span><span class="token operator">|</span>STOPPED<span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> TException: <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>可以使用 <code>&lt;PipeId&gt;</code> 指定想看的某个流处理任务状态:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>您也可以通过 where 子句,判断某个 &lt;PipeId&gt; 使用的 Pipe Sink 被复用的情况。</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
<span class="token keyword">WHERE</span> SINK USED <span class="token keyword">BY</span> <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="流处理任务运行状态迁移" tabindex="-1"><a class="header-anchor" href="#流处理任务运行状态迁移"><span>流处理任务运行状态迁移</span></a></h3><p>一个流处理 pipe 在其的生命周期中会经过多种状态:</p><ul><li><strong>RUNNING:</strong> pipe 正在正常工作 <ul><li>当一个 pipe 被成功创建之后,其初始状态为工作状态(V1.3.1+)</li></ul></li><li><strong>STOPPED:</strong> pipe 处于停止运行状态。当管道处于该状态时,有如下几种可能: <ul><li>当一个 pipe 被成功创建之后,其初始状态为暂停状态(V1.3.0)</li><li>用户手动将一个处于正常运行状态的 pipe 暂停,其状态会被动从 RUNNING 变为 STOPPED</li><li>当一个 pipe 运行过程中出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED</li></ul></li><li><strong>DROPPED:</strong> pipe 任务被永久删除</li></ul><p>下图表明了所有状态以及状态的迁移:</p><figure><img src="https://alioss.timecho.com/docs/img/状态迁移图.png" alt="状态迁移图" tabindex="0" loading="lazy"><figcaption>状态迁移图</figcaption></figure><h2 id="权限管理" tabindex="-1"><a class="header-anchor" href="#权限管理"><span>权限管理</span></a></h2><h3 id="流处理任务" tabindex="-1"><a class="header-anchor" href="#流处理任务"><span>流处理任务</span></a></h3><table><thead><tr><th>权限名称</th><th>描述</th></tr></thead><tbody><tr><td>USE_PIPE</td><td>注册流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>开启流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>停止流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>卸载流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>查询流处理任务。路径无关。</td></tr></tbody></table><h3 id="流处理任务插件" tabindex="-1"><a class="header-anchor" href="#流处理任务插件"><span>流处理任务插件</span></a></h3><table><thead><tr><th>权限名称</th><th>描述</th></tr></thead><tbody><tr><td>USE_PIPE</td><td>注册流处理任务插件。路径无关。</td></tr><tr><td>USE_PIPE</td><td>卸载流处理任务插件。路径无关。</td></tr><tr><td>USE_PIPE</td><td>查询流处理任务插件。路径无关。</td></tr></tbody></table><h2 id="配置参数" tabindex="-1"><a class="header-anchor" href="#配置参数"><span>配置参数</span></a></h2><p>在 iotdb-common.properties 中:</p><p>V1.3.0+:</p><div class="language-Properties line-numbers-mode" data-ext="Properties" data-title="Properties"><pre class="language-Properties"><code>####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by &quot;\\\\&quot;, or if its prefix is &quot;\\\\\\\\&quot;, then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\\\pipe
# For Linux platform
# If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>V1.3.1+:</p><div class="language-Properties line-numbers-mode" data-ext="Properties" data-title="Properties"><pre class="language-Properties"><code># Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by &quot;\\\\&quot;, or if its prefix is &quot;\\\\\\\\&quot;, then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\\\pipe
# For Linux platform
# If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# pipe_sink_selector_number=4
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div>`,42);function V(O,j){const e=o("font"),i=o("ExternalLinkIcon");return c(),r("div",null,[d,k,n("p",null,[s("我们将"),a(e,{color:"RED"},{default:t(()=>[s("一个数据流处理任务称为 Pipe")]),_:1}),s("。一个流处理任务(Pipe)包含三个子任务:")]),v,n("p",null,[s("准备工作:使用该种方式注册,您需要提前将 JAR 包上传到 URI 服务器上并确保执行注册语句的IoTDB实例能够访问该 URI 服务器。例如 "),n("a",m,[s("https://example.com:8080/iotdb/pipe-plugin.jar"),a(i)]),s(" 。")]),b,n("blockquote",null,[h,n("ul",null,[n("li",null,[n("p",null,[s("Pattern 需用反引号修饰不合法字符或者是不合法路径节点,例如如果希望筛选 root.`a@b` 或者 root.`123`,应设置 pattern 为 root.`a@b` 或者 root.`123`(具体参考 "),n("a",g,[s("单双引号和反引号的使用时机"),a(i)]),s(")")])]),f])]),P,n("table",null,[w,n("tbody",null,[n("tr",null,[y,E,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("必填")]),_:1})]),I,T,_]),x,n("tr",null,[S,q,R,C,B,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("是")]),_:1})])]),n("tr",null,[D,A,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("必填")]),_:1})]),U,N,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("是")]),_:1})])])])]),L])}const W=l(u,[["render",V],["__file","Streaming_timecho.html.vue"]]),z=JSON.parse('{"path":"/zh/UserGuide/latest/User-Manual/Streaming_timecho.html","title":"IoTDB 流处理框架","lang":"zh-CN","frontmatter":{"description":"IoTDB 流处理框架 IoTDB 流处理框架允许用户实现自定义的流处理逻辑,可以实现对存储引擎变更的监听和捕获、实现对变更数据的变形、实现对变形后数据的向外推送等逻辑。 我们将。一个流处理任务(Pipe)包含三个子任务: 抽取(Source 处理(Process 发送(Sink 流处理框架允许用户使用 Java 语言自定义编写三个子任务的处理逻...","head":[["link",{"rel":"alternate","hreflang":"en-us","href":"https://iotdb.apache.org/UserGuide/latest/User-Manual/Streaming_timecho.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/zh/UserGuide/latest/User-Manual/Streaming_timecho.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"IoTDB 流处理框架"}],["meta",{"property":"og:description","content":"IoTDB 流处理框架 IoTDB 流处理框架允许用户实现自定义的流处理逻辑,可以实现对存储引擎变更的监听和捕获、实现对变更数据的变形、实现对变形后数据的向外推送等逻辑。 我们将。一个流处理任务(Pipe)包含三个子任务: 抽取(Source) 处理(Process) 发送(Sink) 流处理框架允许用户使用 Java 语言自定义编写三个子任务的处理逻..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:image","content":"https://alioss.timecho.com/docs/img/1706697228308.jpg"}],["meta",{"property":"og:locale","content":"zh-CN"}],["meta",{"property":"og:locale:alternate","content":"en-US"}],["meta",{"property":"og:updated_time","content":"2024-04-08T07:45:44.000Z"}],["meta",{"property":"article:modified_time","content":"2024-04-08T07:45:44.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"IoTDB 流处理框架\\",\\"image\\":[\\"https://alioss.timecho.com/docs/img/1706697228308.jpg\\",\\"https://alioss.timecho.com/docs/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png\\"],\\"dateModified\\":\\"2024-04-08T07:45:44.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"自定义流处理插件开发","slug":"自定义流处理插件开发","link":"#自定义流处理插件开发","children":[{"level":3,"title":"编程开发依赖","slug":"编程开发依赖","link":"#编程开发依赖","children":[]},{"level":3,"title":"事件驱动编程模型","slug":"事件驱动编程模型","link":"#事件驱动编程模型","children":[]},{"level":3,"title":"自定义流处理插件编程接口定义","slug":"自定义流处理插件编程接口定义","link":"#自定义流处理插件编程接口定义","children":[]}]},{"level":2,"title":"自定义流处理插件管理","slug":"自定义流处理插件管理","link":"#自定义流处理插件管理","children":[{"level":3,"title":"加载插件语句","slug":"加载插件语句","link":"#加载插件语句","children":[]},{"level":3,"title":"删除插件语句","slug":"删除插件语句","link":"#删除插件语句","children":[]},{"level":3,"title":"查看插件语句","slug":"查看插件语句","link":"#查看插件语句","children":[]}]},{"level":2,"title":"系统预置的流处理插件","slug":"系统预置的流处理插件","link":"#系统预置的流处理插件","children":[{"level":3,"title":"预置 source 插件","slug":"预置-source-插件","link":"#预置-source-插件","children":[]},{"level":3,"title":"预置 processor 插件","slug":"预置-processor-插件","link":"#预置-processor-插件","children":[]},{"level":3,"title":"预置 sink 插件","slug":"预置-sink-插件","link":"#预置-sink-插件","children":[]}]},{"level":2,"title":"流处理任务管理","slug":"流处理任务管理","link":"#流处理任务管理","children":[{"level":3,"title":"创建流处理任务","slug":"创建流处理任务","link":"#创建流处理任务","children":[]},{"level":3,"title":"启动流处理任务","slug":"启动流处理任务","link":"#启动流处理任务","children":[]},{"level":3,"title":"停止流处理任务","slug":"停止流处理任务","link":"#停止流处理任务","children":[]},{"level":3,"title":"删除流处理任务","slug":"删除流处理任务","link":"#删除流处理任务","children":[]},{"level":3,"title":"展示流处理任务","slug":"展示流处理任务","link":"#展示流处理任务","children":[]},{"level":3,"title":"流处理任务运行状态迁移","slug":"流处理任务运行状态迁移","link":"#流处理任务运行状态迁移","children":[]}]},{"level":2,"title":"权限管理","slug":"权限管理","link":"#权限管理","children":[{"level":3,"title":"流处理任务","slug":"流处理任务","link":"#流处理任务","children":[]},{"level":3,"title":"流处理任务插件","slug":"流处理任务插件","link":"#流处理任务插件","children":[]}]},{"level":2,"title":"配置参数","slug":"配置参数","link":"#配置参数","children":[]}],"git":{"createdTime":1689242051000,"updatedTime":1712562344000,"contributors":[{"name":"Caideyipi","email":"87789683+Caideyipi@users.noreply.github.com","commits":1},{"name":"CritasWang","email":"critas@outlook.com","commits":1},{"name":"Tansgr","email":"101696091+tanxilo@users.noreply.github.com","commits":1},{"name":"Tanxilo","email":"101696091+tanxilo@users.noreply.github.com","commits":1}]},"readingTime":{"minutes":25.17,"words":7550},"filePathRelative":"zh/UserGuide/latest/User-Manual/Streaming_timecho.md","localizedDate":"2023年7月13日","autoDesc":true}');export{W as comp,z as data};