blob: cde018e3451701b77f106e08d0590e21b8376c09 [file] [log] [blame]
import{_ as l,r as p,o as c,c as r,b as n,d as s,a,w as t,e as o}from"./app-Bp5kEZWW.js";const d={},u=n("h1",{id:"iotdb-stream-processing-framework",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#iotdb-stream-processing-framework"},[n("span",null,"IoTDB Stream Processing Framework")])],-1),m=n("p",null,"The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engine changes, transform changed data, and push transformed data outward.",-1),k=o(`<ul><li>Extract</li><li>Process</li><li>Send (Connect)</li></ul><p>The stream processing framework allows users to customize the processing logic of three subtasks using Java language and process data in a UDF-like manner.<br> In a Pipe, the three subtasks mentioned above are executed and implemented by three types of plugins. Data flows through these three plugins sequentially for processing:<br> Pipe Extractor is used to extract data, Pipe Processor is used to process data, Pipe Connector is used to send data, and the final data will be sent to an external system.</p><p><strong>The model for a Pipe task is as follows:</strong></p><p><img src="https://alioss.timecho.com/docs/img/pipe.png" alt="pipe.png" loading="lazy"><br> A data stream processing task essentially describes the attributes of the Pipe Extractor, Pipe Processor, and Pipe Connector plugins.</p><p>Users can configure the specific attributes of these three subtasks declaratively using SQL statements. By combining different attributes, flexible data ETL (Extract, Transform, Load) capabilities can be achieved.</p><p>Using the stream processing framework, it is possible to build a complete data pipeline to fulfill various requirements such as <em>edge-to-cloud synchronization, remote disaster recovery, and read/write load balancing across multiple databases</em>.</p><h2 id="custom-stream-processing-plugin-development" tabindex="-1"><a class="header-anchor" href="#custom-stream-processing-plugin-development"><span>Custom Stream Processing Plugin Development</span></a></h2><h3 id="programming-development-dependencies" tabindex="-1"><a class="header-anchor" href="#programming-development-dependencies"><span>Programming development dependencies</span></a></h3><p>It is recommended to use Maven to build the project. Add the following dependencies in the <code>pom.xml</code> file. Please make sure to choose dependencies with the same version as the IoTDB server version.</p><div class="language-xml line-numbers-mode" data-ext="xml" data-title="xml"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.iotdb<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>pipe-api<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>1.2.1<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>scope</span><span class="token punctuation">&gt;</span></span>provided<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>scope</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="event-driven-programming-model" tabindex="-1"><a class="header-anchor" href="#event-driven-programming-model"><span>Event-Driven Programming Model</span></a></h3><p>The design of user programming interfaces for stream processing plugins follows the principles of the event-driven programming model. In this model, events serve as the abstraction of data in the user programming interface. The programming interface is decoupled from the specific execution method, allowing the focus to be on describing how the system expects events (data) to be processed upon arrival.</p><p>In the user programming interface of stream processing plugins, events abstract the write operations of database data. Events are captured by the local stream processing engine and passed sequentially through the three stages of stream processing, namely Pipe Extractor, Pipe Processor, and Pipe Connector plugins. User logic is triggered and executed within these three plugins.</p><p>To accommodate both low-latency stream processing in low-load scenarios and high-throughput stream processing in high-load scenarios at the edge, the stream processing engine dynamically chooses the processing objects from operation logs and data files. Therefore, the user programming interface for stream processing requires the user to provide the handling logic for two types of events: TabletInsertionEvent for operation log write events and TsFileInsertionEvent for data file write events.</p><h4 id="tabletinsertionevent" tabindex="-1"><a class="header-anchor" href="#tabletinsertionevent"><span><strong>TabletInsertionEvent</strong></span></a></h4><p>The TabletInsertionEvent is a high-level data abstraction for user write requests, which provides the ability to manipulate the underlying data of the write request by providing a unified operation interface.</p><p>For different database deployments, the underlying storage structure corresponding to the operation log write event is different. For stand-alone deployment scenarios, the operation log write event is an encapsulation of write-ahead log (WAL) entries; for distributed deployment scenarios, the operation log write event is an encapsulation of individual node consensus protocol operation log entries.</p><p>For write operations generated by different write request interfaces of the database, the data structure of the request structure corresponding to the operation log write event is also different.IoTDB provides many write interfaces such as InsertRecord, InsertRecords, InsertTablet, InsertTablets, and so on, and each kind of write request uses a completely different serialisation method to generate a write request. completely different serialisation methods and generate different binary entries.</p><p>The existence of operation log write events provides users with a unified view of data operations, which shields the implementation differences of the underlying data structures, greatly reduces the programming threshold for users, and improves the ease of use of the functionality.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/** TabletInsertionEvent is used to define the event of data insertion. */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TabletInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* The consumer processes the data row by row and collects the results by RowCollector.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processRowByRow</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Row</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* The consumer processes the Tablet directly and collects the results by RowCollector.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processTablet</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Tablet</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="tsfileinsertionevent" tabindex="-1"><a class="header-anchor" href="#tsfileinsertionevent"><span><strong>TsFileInsertionEvent</strong></span></a></h4><p>The TsFileInsertionEvent represents a high-level abstraction of the database&#39;s disk flush operation and is a collection of multiple TabletInsertionEvents.</p><p>IoTDB&#39;s storage engine is based on the LSM (Log-Structured Merge) structure. When data is written, the write operations are first flushed to log-structured files, while the written data is also stored in memory. When the memory reaches its capacity limit, a flush operation is triggered, converting the data in memory into a database file while deleting the previously written log entries. During the conversion from memory data to database file data, two compression processes, encoding compression and universal compression, are applied. As a result, the data in the database file occupies less space compared to the original data in memory.</p><p>In extreme network conditions, directly transferring data files is more cost-effective than transmitting individual write operations. It consumes lower network bandwidth and achieves faster transmission speed. However, there is no such thing as a free lunch. Performing calculations on data in the disk file incurs additional costs for file I/O compared to performing calculations directly on data in memory. Nevertheless, the coexistence of disk data files and memory write operations permits dynamic trade-offs and adjustments. It is based on this observation that the data file write event is introduced into the event model of the plugin.</p><p>In summary, the data file write event appears in the event stream of stream processing plugins in the following two scenarios:</p><ol><li><p>Historical data extraction: Before a stream processing task starts, all persisted write data exists in the form of TsFiles. When collecting historical data at the beginning of a stream processing task, the historical data is abstracted as TsFileInsertionEvent.</p></li><li><p>Real-time data extraction: During the execution of a stream processing task, if the speed of processing the log entries representing real-time operations is slower than the rate of write requests, the unprocessed log entries will be persisted to disk in the form of TsFiles. When these data are extracted by the stream processing engine, they are abstracted as TsFileInsertionEvent.</p></li></ol><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
* which is compressed and encoded, and requires IO cost for computational processing.
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TsFileInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> the list of TabletInsertionEvent
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="custom-stream-processing-plugin-programming-interface-definition" tabindex="-1"><a class="header-anchor" href="#custom-stream-processing-plugin-programming-interface-definition"><span>Custom Stream Processing Plugin Programming Interface Definition</span></a></h3><p>Based on the custom stream processing plugin programming interface, users can easily write data extraction plugins, data processing plugins, and data sending plugins, allowing the stream processing functionality to adapt flexibly to various industrial scenarios.</p><h4 id="data-extraction-plugin-interface" tabindex="-1"><a class="header-anchor" href="#data-extraction-plugin-interface"><span>Data Extraction Plugin Interface</span></a></h4><p>Data extraction is the first stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data extraction plugin (PipeExtractor) serves as a bridge between the stream processing engine and the storage engine. It captures various data write events by listening to the behavior of the storage engine.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeExtractor
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeExtractor is responsible for capturing events from sources.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various data sources can be supported by implementing different PipeExtractor classes.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeExtractor is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of \`WITH EXTRACTOR\` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* will be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
* to config the runtime behavior of the PipeExtractor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Then the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to start the PipeExtractor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will
* be called to capture events from sources and then the events will be passed to the
* PipeProcessor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called when the collaboration task is
* cancelled (the \`DROP PIPE\` command is executed).
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeExtractor</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeExtractor. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeExtractorRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeExtractor
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* Start the extractor. After this method is called, events should be ready to be supplied by
* <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>. This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* Supply single event from the extractor and the caller will send the event to the processor.
* This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@return</span> the event to be supplied. the event may be null if the extractor has no more events at
* the moment, but the extractor is still running for more events.
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token class-name">Event</span> <span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="data-processing-plugin-interface" tabindex="-1"><a class="header-anchor" href="#data-processing-plugin-interface"><span>Data Processing Plugin Interface</span></a></h4><p>Data processing is the second stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data processing plugin (PipeProcessor) is primarily used for filtering and transforming the various events captured by the data extraction plugin (PipeExtractor).</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeProcessor
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeProcessor is used to filter and transform the Event formed by the PipeExtractor.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeProcessor is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of \`WITH PROCESSOR\` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* will be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
* to config the runtime behavior of the PipeProcessor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeExtractor captures the events and wraps them into three types of Event instances.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeConnector. The
* following 3 methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeConnector serializes the events into binaries and send them to sinks.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the \`DROP PIPE\` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeProcessor</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeProcessor. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeProcessorRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called and before the beginning of the
* events processing.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeProcessor
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is called to process the TabletInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is called to process the TsFileInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">default</span> <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">final</span> <span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent <span class="token operator">:</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token function">process</span><span class="token punctuation">(</span>tabletInsertionEvent<span class="token punctuation">,</span> eventCollector<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* This method is called to process the Event.
*
* <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="data-sending-plugin-interface" tabindex="-1"><a class="header-anchor" href="#data-sending-plugin-interface"><span>Data Sending Plugin Interface</span></a></h4><p>Data sending is the third stage of the three-stage process of stream processing, which includes data extraction, data processing, and data sending. The data sending plugin (PipeConnector) is responsible for sending the various events processed by the data processing plugin (PipeProcessor). It serves as the network implementation layer of the stream processing framework and should support multiple real-time communication protocols and connectors in its interface.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeConnector
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeConnector is responsible for sending events to sinks.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various network protocols can be supported by implementing different PipeConnector classes.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeConnector is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of \`WITH CONNECTOR\` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* will be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
* to config the runtime behavior of the PipeConnector and the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to create a connection with sink.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeExtractor captures the events and wraps them into three types of Event instances.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeConnector.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeConnector serializes the events into binaries and send them to sinks. The
* following 3 methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the \`DROP PIPE\` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>In addition, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called periodically to check
* whether the connection with sink is still alive. The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* will be called to create a new connection with the sink when the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> throws exceptions.
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeConnector</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeConnector. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeConnectorRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called and before the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeConnector
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to create a connection with sink. This method will be called after the
* method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is
* called or will be called when the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> throws exceptions.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection is failed to be created
*/</span>
<span class="token keyword">void</span> <span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method will be called periodically to check whether the connection with sink is still
* alive.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection dies
*/</span>
<span class="token keyword">void</span> <span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the TabletInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the TsFileInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">default</span> <span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">final</span> <span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent <span class="token operator">:</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token function">transfer</span><span class="token punctuation">(</span>tabletInsertionEvent<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the Event.
*
* <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="custom-stream-processing-plugin-management" tabindex="-1"><a class="header-anchor" href="#custom-stream-processing-plugin-management"><span>Custom Stream Processing Plugin Management</span></a></h2><p>To ensure the flexibility and usability of user-defined plugins in production environments, the system needs to provide the capability to dynamically manage plugins. This section introduces the management statements for stream processing plugins, which enable the dynamic and unified management of plugins.</p><h3 id="load-plugin-statement" tabindex="-1"><a class="header-anchor" href="#load-plugin-statement"><span>Load Plugin Statement</span></a></h3><p>In IoTDB, to dynamically load a user-defined plugin into the system, you first need to implement a specific plugin class based on PipeExtractor, PipeProcessor, or PipeConnector. Then, you need to compile and package the plugin class into an executable jar file. Finally, you can use the loading plugin management statement to load the plugin into IoTDB.</p><p>The syntax of the loading plugin management statement is as follows:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPEPLUGIN <span class="token operator">&lt;</span>alias<span class="token operator">&gt;</span>
<span class="token keyword">AS</span> <span class="token operator">&lt;</span><span class="token keyword">Full</span> class name<span class="token operator">&gt;</span>
<span class="token keyword">USING</span> <span class="token operator">&lt;</span>URL <span class="token keyword">of</span> jar <span class="token keyword">file</span><span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div>`,44),v={href:"https://example.com:8080/iotdb/pipe-plugin.jar",target:"_blank",rel:"noopener noreferrer"},h=o(`<div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPEPLUGIN example
<span class="token keyword">AS</span> <span class="token string">&#39;edu.tsinghua.iotdb.pipe.ExampleProcessor&#39;</span>
<span class="token keyword">USING</span> URI <span class="token string">&#39;&lt;https://example.com:8080/iotdb/pipe-plugin.jar&gt;&#39;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="delete-plugin-statement" tabindex="-1"><a class="header-anchor" href="#delete-plugin-statement"><span>Delete Plugin Statement</span></a></h3><p>When user no longer wants to use a plugin and needs to uninstall the plugin from the system, you can use the Remove plugin statement as shown below.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPEPLUGIN <span class="token operator">&lt;</span>alias<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="show-plugin-statement" tabindex="-1"><a class="header-anchor" href="#show-plugin-statement"><span>Show Plugin Statement</span></a></h3><p>User can also view the plugin in the system on need. The statement to view plugin is as follows.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPEPLUGINS
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h2 id="system-pre-installed-stream-processing-plugin" tabindex="-1"><a class="header-anchor" href="#system-pre-installed-stream-processing-plugin"><span>System Pre-installed Stream Processing Plugin</span></a></h2><h3 id="pre-built-extractor-plugin" tabindex="-1"><a class="header-anchor" href="#pre-built-extractor-plugin"><span>Pre-built extractor Plugin</span></a></h3><h4 id="iotdb-extractor" tabindex="-1"><a class="header-anchor" href="#iotdb-extractor"><span>iotdb-extractor</span></a></h4><p>Function: Extract historical or realtime data inside IoTDB into pipe.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>extractor</td><td>iotdb-extractor</td><td>String: iotdb-extractor</td><td>required</td></tr><tr><td>extractor.pattern</td><td>path prefix for filtering time series</td><td>String: any time series prefix</td><td>optional: root</td></tr><tr><td>extractor.history.enable</td><td>whether to sync historical data</td><td>Boolean: true, false</td><td>optional: true</td></tr><tr><td>extractor.history.start-time</td><td>start of synchronizing historical data event time,Include start-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>extractor.history.end-time</td><td>end of synchronizing historical data event timeInclude end-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>extractor.realtime.enable</td><td>Whether to sync realtime data</td><td>Boolean: true, false</td><td>optional: true</td></tr></tbody></table>`,12),g=n("p",null,[s("🚫 "),n("strong",null,"extractor.pattern Parameter Description")],-1),b={href:"https://iotdb.apache.org/zh/Download/#_1-0-%E7%89%88%E6%9C%AC%E4%B8%8D%E5%85%BC%E5%AE%B9%E7%9A%84%E8%AF%AD%E6%B3%95%E8%AF%A6%E7%BB%86%E8%AF%B4%E6%98%8E",target:"_blank",rel:"noopener noreferrer"},f=o("<li><p>In the underlying implementation, when pattern is detected as root (default value), synchronization efficiency is higher, and any other format will reduce performance.</p></li><li><p>The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter &#39;extractor.pattern&#39;=&#39;root.aligned.1&#39;:</p><ul><li>root.aligned.1TS</li><li>root.aligned.1TS.`1`</li><li>root.aligned.100TS</li></ul><p>the data will be synchronized;</p><ul><li>root.aligned.`1`</li><li>root.aligned.`123`</li></ul><p>the data will not be synchronized.</p></li>",2),w=o(`<blockquote><p>❗️<strong>start-time, end-time parameter description of extractor.history</strong></p><ul><li>start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00</li></ul></blockquote><blockquote><p>✅ <strong>a piece of data from production to IoTDB contains two key concepts of time</strong></p><ul><li><strong>event time:</strong> the time when the data is actually produced (or the generation time assigned to the data by the data production system, which is a time item in the data point), also called the event time.</li><li><strong>arrival time:</strong> the time the data arrived in the IoTDB system.</li></ul><p>The out-of-order data we often refer to refers to data whose <strong>event time</strong> is far behind the current system time (or the maximum <strong>event time</strong> that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their <strong>arrival time</strong> will increase with the order in which the data arrives at IoTDB.</p></blockquote><blockquote><p>💎 <strong>the work of iotdb-extractor can be split into two stages</strong></p><ol><li>Historical data extraction: All data with <strong>arrival time</strong> &lt; <strong>current system time</strong> when creating the pipe is called historical data</li><li>Realtime data extraction: All data with <strong>arrival time</strong> &gt;= <strong>current system time</strong> when the pipe is created is called realtime data</li></ol><p>The historical data transmission phase and the realtime data transmission phase are executed serially. Only when the historical data transmission phase is completed, the realtime data transmission phase is executed.**</p><p>Users can specify iotdb-extractor to:</p><ul><li>Historical data extraction(<code>&#39;extractor.history.enable&#39; = &#39;true&#39;</code>, <code>&#39;extractor.realtime.enable&#39; = &#39;false&#39;</code> )</li><li>Realtime data extraction(<code>&#39;extractor.history.enable&#39; = &#39;false&#39;</code>, <code>&#39;extractor.realtime.enable&#39; = &#39;true&#39;</code> )</li><li>Full data extraction(<code>&#39;extractor.history.enable&#39; = &#39;true&#39;</code>, <code>&#39;extractor.realtime.enable&#39; = &#39;true&#39;</code> )</li><li>Disable simultaneous sets <code>extractor.history.enable</code> and <code>extractor.realtime.enable</code> to <code>false</code></li></ul></blockquote><h3 id="pre-built-processor-plugin" tabindex="-1"><a class="header-anchor" href="#pre-built-processor-plugin"><span>Pre-built Processor Plugin</span></a></h3><h4 id="do-nothing-processor" tabindex="-1"><a class="header-anchor" href="#do-nothing-processor"><span>do-nothing-processor</span></a></h4><p>Function: Do not do anything with the events passed in by the extractor.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>processor</td><td>do-nothing-processor</td><td>String: do-nothing-processor</td><td>required</td></tr></tbody></table><h3 id="pre-built-connector-plugin" tabindex="-1"><a class="header-anchor" href="#pre-built-connector-plugin"><span>Pre-built Connector Plugin</span></a></h3><h4 id="do-nothing-connector" tabindex="-1"><a class="header-anchor" href="#do-nothing-connector"><span>do-nothing-connector</span></a></h4><p>Function: Does not do anything with the events passed in by the processor.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>do-nothing-connector</td><td>String: do-nothing-connector</td><td>required</td></tr></tbody></table><h2 id="stream-processing-task-management" tabindex="-1"><a class="header-anchor" href="#stream-processing-task-management"><span>Stream Processing Task Management</span></a></h2><h3 id="create-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#create-stream-processing-task"><span>Create Stream Processing Task</span></a></h3><p>A stream processing task can be created using the <code>CREATE PIPE</code> statement, a sample SQL statement is shown below:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId is the name that uniquely identifies the sync task</span>
<span class="token keyword">WITH</span> EXTRACTOR <span class="token punctuation">(</span>
<span class="token comment">-- Default IoTDB Data Extraction Plugin</span>
<span class="token string">&#39;extractor&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-extractor&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Path prefix, only data that can match the path prefix will be extracted for subsequent processing and delivery</span>
<span class="token string">&#39;extractor.pattern&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;root.timecho&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Whether to extract historical data</span>
<span class="token string">&#39;extractor.history.enable&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Describes the time range of the historical data being extracted, indicating the earliest possible time</span>
<span class="token string">&#39;extractor.history.start-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2011.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Describes the time range of the extracted historical data, indicating the latest time</span>
<span class="token string">&#39;extractor.history.end-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2022.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Whether to extract realtime data</span>
<span class="token string">&#39;extractor.realtime.enable&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> PROCESSOR <span class="token punctuation">(</span>
<span class="token comment">-- Default data processing plugin, means no processing</span>
<span class="token string">&#39;processor&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;do-nothing-processor&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
<span class="token comment">-- IoTDB data sending plugin with target IoTDB</span>
<span class="token string">&#39;connector&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Data service for one of the DataNode nodes on the target IoTDB ip</span>
<span class="token string">&#39;connector.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Data service port of one of the DataNode nodes of the target IoTDB</span>
<span class="token string">&#39;connector.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>To create a stream processing task it is necessary to configure the PipeId and the parameters of the three plugin sections:</strong></p>`,16),y=n("thead",null,[n("tr",null,[n("th",null,"configuration item"),n("th",null,"description"),n("th",null,"Required or not"),n("th",null,"default implementation"),n("th",null,"Default implementation description"),n("th",null,"Whether to allow custom implementations")])],-1),P=n("td",null,"pipeId",-1),x=n("td",null,"Globally uniquely identifies the name of a sync task",-1),E=n("td",null,"-",-1),T=n("td",null,"-",-1),I=n("td",null,"-",-1),C=n("tr",null,[n("td",null,"extractor"),n("td",null,"pipe Extractor plugin, for extracting synchronized data at the bottom of the database"),n("td",null,"Optional"),n("td",null,"iotdb-extractor"),n("td",null,"Integrate all historical data of the database and subsequent realtime data into the sync task"),n("td",null,"no")],-1),S=n("td",null,"processor",-1),_=n("td",null,"Pipe Processor plugin, for processing data",-1),q=n("td",null,"Optional",-1),D=n("td",null,"do-nothing-processor",-1),R=n("td",null,"no processing of incoming data",-1),O=n("td",null,"connector",-1),N=n("td",null,"Pipe Connector plugin,for sending data",-1),B=n("td",null,"-",-1),A=n("td",null,"-",-1),F=o(`<p>In the example, the iotdb-extractor, do-nothing-processor, and iotdb-thrift-connector plugins are used to build the data synchronisation task. iotdb has other built-in data synchronisation plugins, **see the section &quot;System pre-built data synchronisation plugins&quot; <strong>. See the &quot;System Pre-installed Stream Processing Plugin&quot; section</strong>.</p><p><strong>An example of a minimalist CREATE PIPE statement is as follows:</strong></p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId is a name that uniquely identifies the task.</span>
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
<span class="token comment">-- IoTDB data sending plugin with target IoTDB</span>
<span class="token string">&#39;connector&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Data service for one of the DataNode nodes on the target IoTDB ip</span>
<span class="token string">&#39;connector.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Data service port of one of the DataNode nodes of the target IoTDB</span>
<span class="token string">&#39;connector.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>The expressed semantics are: synchronise the full amount of historical data and subsequent arrivals of realtime data from this database instance to the IoTDB instance with target 127.0.0.1:6667.</p><p><strong>Note:</strong></p><ul><li><p>EXTRACTOR and PROCESSOR are optional, if no configuration parameters are filled in, the system will use the corresponding default implementation.</p></li><li><p>The CONNECTOR is a mandatory configuration that needs to be declared in the CREATE PIPE statement for configuring purposes.</p></li><li><p>The CONNECTOR exhibits self-reusability. For different tasks, if their CONNECTOR possesses identical KV properties (where the value corresponds to every key), <strong>the system will ultimately create only one instance of the CONNECTOR</strong> to achieve resource reuse for connections.</p><ul><li>For example, there are the following pipe1, pipe2 task declarations:</li></ul><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE pipe1
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
<span class="token string">&#39;connector&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;connector.thrift.host&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;connector.thrift.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">CREATE</span> PIPE pipe2
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
<span class="token string">&#39;connector&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;connector.thrift.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;connector.thrift.host&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ul><li>Since they have identical CONNECTOR declarations (<strong>even if the order of some properties is different</strong>), the framework will automatically reuse the CONNECTOR declared by them. Hence, the CONNECTOR instances for pipe1 and pipe2 will be the same.</li></ul></li><li><p>Please note that we should avoid constructing application scenarios that involve data cycle sync (as it can result in an infinite loop):</p><ul><li>IoTDB A -&gt; IoTDB B -&gt; IoTDB A</li><li>IoTDB A -&gt; IoTDB A</li></ul></li></ul><h3 id="start-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#start-stream-processing-task"><span>Start Stream Processing Task</span></a></h3><p>After the successful execution of the CREATE PIPE statement, an instance of the stream processing task is created, but the overall task&#39;s running status will be set to STOPPED, meaning the task will not immediately process data.</p><p>You can use the START PIPE statement to make the stream processing task start processing data:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">START</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="stop-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#stop-stream-processing-task"><span>Stop Stream Processing Task</span></a></h3><p>Use the STOP PIPE statement to stop the stream processing task from processing data:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code>STOP PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="delete-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#delete-stream-processing-task"><span>Delete Stream Processing Task</span></a></h3><p>If a stream processing task is in the RUNNING state, you can use the DROP PIPE statement to stop it and delete the entire task:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>Before deleting a stream processing task, there is no need to execute the STOP operation.</p><h3 id="show-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#show-stream-processing-task"><span>Show Stream Processing Task</span></a></h3><p>Use the SHOW PIPES statement to view all stream processing tasks:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>The query results are as follows:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
<span class="token operator">|</span> ID<span class="token operator">|</span> CreationTime <span class="token operator">|</span> State<span class="token operator">|</span>PipeExtractor<span class="token operator">|</span>PipeProcessor<span class="token operator">|</span>PipeConnector<span class="token operator">|</span>ExceptionMessage<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>kafka<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">30</span>T20:<span class="token number">58</span>:<span class="token number">30.689</span><span class="token operator">|</span>RUNNING<span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> None<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>iotdb<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">31</span>T12:<span class="token number">55</span>:<span class="token number">28.129</span><span class="token operator">|</span>STOPPED<span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> TException: <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>You can use <code>&lt;PipeId&gt;</code> to specify the status of a stream processing task you want to see:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>Additionally, the WHERE clause can be used to determine if the Pipe Connector used by a specific &lt;PipeId&gt; is being reused.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
<span class="token keyword">WHERE</span> CONNECTOR USED <span class="token keyword">BY</span> <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="stream-processing-task-running-status-migration" tabindex="-1"><a class="header-anchor" href="#stream-processing-task-running-status-migration"><span>Stream Processing Task Running Status Migration</span></a></h3><p>A stream processing task status can transition through several states during the lifecycle of a data synchronization pipe:</p><ul><li><strong>STOPPED:</strong> The pipe is in a stopped state. It can have the following possibilities: <ul><li>After the successful creation of a pipe, its initial state is set to stopped</li><li>The user manually pauses a pipe that is in normal running state, transitioning its status from RUNNING to STOPPED</li><li>If a pipe encounters an unrecoverable error during execution, its status automatically changes from RUNNING to STOPPED.</li></ul></li><li><strong>RUNNING:</strong> The pipe is actively processing data</li><li><strong>DROPPED:</strong> The pipe is permanently deleted</li></ul><p>The following diagram illustrates the different states and their transitions:</p><figure><img src="https://alioss.timecho.com/docs/img/状态迁移图.png" alt="state migration diagram" tabindex="0" loading="lazy"><figcaption>state migration diagram</figcaption></figure><h2 id="authority-management" tabindex="-1"><a class="header-anchor" href="#authority-management"><span>Authority Management</span></a></h2><h3 id="stream-processing-task" tabindex="-1"><a class="header-anchor" href="#stream-processing-task"><span>Stream Processing Task</span></a></h3><table><thead><tr><th>Authority Name</th><th>Description</th></tr></thead><tbody><tr><td>CREATE_PIPE</td><td>Register task,path-independent</td></tr><tr><td>START_PIPE</td><td>Start task,path-independent</td></tr><tr><td>STOP_PIPE</td><td>Stop task,path-independent</td></tr><tr><td>DROP_PIPE</td><td>Uninstall task,path-independent</td></tr><tr><td>SHOW_PIPES</td><td>Query task,path-independent</td></tr></tbody></table><h3 id="stream-processing-task-plugin" tabindex="-1"><a class="header-anchor" href="#stream-processing-task-plugin"><span>Stream Processing Task Plugin</span></a></h3><table><thead><tr><th>Authority Name</th><th>Description</th></tr></thead><tbody><tr><td>CREATE_PIPEPLUGIN</td><td>Register stream processing task plugin,path-independent</td></tr><tr><td>DROP_PIPEPLUGIN</td><td>Delete stream processing task plugin,path-independent</td></tr><tr><td>SHOW_PIPEPLUGINS</td><td>Query stream processing task plugin,path-independent</td></tr></tbody></table><h2 id="configure-parameters" tabindex="-1"><a class="header-anchor" href="#configure-parameters"><span>Configure Parameters</span></a></h2><p>In iotdb-common.properties :</p><div class="language-Properties line-numbers-mode" data-ext="Properties" data-title="Properties"><pre class="language-Properties"><code>####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by &quot;\\\\&quot;, or if its prefix is &quot;\\\\\\\\&quot;, then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\\\pipe
# For Linux platform
# If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div>`,39);function U(W,L){const e=p("font"),i=p("ExternalLinkIcon");return c(),r("div",null,[u,m,n("p",null,[s("We call "),a(e,{color:"RED"},{default:t(()=>[s("a data flow processing task a Pipe")]),_:1}),s(". A stream processing task (Pipe) contains three subtasks:")]),k,n("p",null,[s('For example, if a user implements a data processing plugin with the fully qualified class name "edu.tsinghua.iotdb.pipe.ExampleProcessor" and packages it into a jar file, which is stored at "'),n("a",v,[s("https://example.com:8080/iotdb/pipe-plugin.jar"),a(i)]),s('", and the user wants to use this plugin in the stream processing engine, marking the plugin as "example". The creation statement for this data processing plugin is as follows:')]),h,n("blockquote",null,[g,n("ul",null,[n("li",null,[n("p",null,[s("Pattern should use backquotes to modify illegal characters or illegal path nodes, for example, if you want to filter root.`a@b` or root.`123`, you should set the pattern to root.`a@b` or root.`123`(Refer specifically to "),n("a",b,[s("Timing of single and double quotes and backquotes"),a(i)]),s(")")])]),f])]),w,n("table",null,[y,n("tbody",null,[n("tr",null,[P,x,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("required")]),_:1})]),E,T,I]),C,n("tr",null,[S,_,q,D,R,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("yes")]),_:1})])]),n("tr",null,[O,N,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("required")]),_:1})]),B,A,n("td",null,[a(e,{color:"red"},{default:t(()=>[s("yes")]),_:1})])])])]),F])}const j=l(d,[["render",U],["__file","Streaming.html.vue"]]),V=JSON.parse('{"path":"/UserGuide/V1.2.x/User-Manual/Streaming.html","title":"IoTDB Stream Processing Framework","lang":"en-US","frontmatter":{"description":"IoTDB Stream Processing Framework The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engin...","head":[["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/V1.2.x/User-Manual/Streaming.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/V1.2.x/User-Manual/Streaming.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"IoTDB Stream Processing Framework"}],["meta",{"property":"og:description","content":"IoTDB Stream Processing Framework The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engin..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:image","content":"https://alioss.timecho.com/docs/img/pipe.png"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2024-04-08T07:45:44.000Z"}],["meta",{"property":"article:modified_time","content":"2024-04-08T07:45:44.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"IoTDB Stream Processing Framework\\",\\"image\\":[\\"https://alioss.timecho.com/docs/img/pipe.png\\",\\"https://alioss.timecho.com/docs/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png\\"],\\"dateModified\\":\\"2024-04-08T07:45:44.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"Custom Stream Processing Plugin Development","slug":"custom-stream-processing-plugin-development","link":"#custom-stream-processing-plugin-development","children":[{"level":3,"title":"Programming development dependencies","slug":"programming-development-dependencies","link":"#programming-development-dependencies","children":[]},{"level":3,"title":"Event-Driven Programming Model","slug":"event-driven-programming-model","link":"#event-driven-programming-model","children":[]},{"level":3,"title":"Custom Stream Processing Plugin Programming Interface Definition","slug":"custom-stream-processing-plugin-programming-interface-definition","link":"#custom-stream-processing-plugin-programming-interface-definition","children":[]}]},{"level":2,"title":"Custom Stream Processing Plugin Management","slug":"custom-stream-processing-plugin-management","link":"#custom-stream-processing-plugin-management","children":[{"level":3,"title":"Load Plugin Statement","slug":"load-plugin-statement","link":"#load-plugin-statement","children":[]},{"level":3,"title":"Delete Plugin Statement","slug":"delete-plugin-statement","link":"#delete-plugin-statement","children":[]},{"level":3,"title":"Show Plugin Statement","slug":"show-plugin-statement","link":"#show-plugin-statement","children":[]}]},{"level":2,"title":"System Pre-installed Stream Processing Plugin","slug":"system-pre-installed-stream-processing-plugin","link":"#system-pre-installed-stream-processing-plugin","children":[{"level":3,"title":"Pre-built extractor Plugin","slug":"pre-built-extractor-plugin","link":"#pre-built-extractor-plugin","children":[]},{"level":3,"title":"Pre-built Processor Plugin","slug":"pre-built-processor-plugin","link":"#pre-built-processor-plugin","children":[]},{"level":3,"title":"Pre-built Connector Plugin","slug":"pre-built-connector-plugin","link":"#pre-built-connector-plugin","children":[]}]},{"level":2,"title":"Stream Processing Task Management","slug":"stream-processing-task-management","link":"#stream-processing-task-management","children":[{"level":3,"title":"Create Stream Processing Task","slug":"create-stream-processing-task","link":"#create-stream-processing-task","children":[]},{"level":3,"title":"Start Stream Processing Task","slug":"start-stream-processing-task","link":"#start-stream-processing-task","children":[]},{"level":3,"title":"Stop Stream Processing Task","slug":"stop-stream-processing-task","link":"#stop-stream-processing-task","children":[]},{"level":3,"title":"Delete Stream Processing Task","slug":"delete-stream-processing-task","link":"#delete-stream-processing-task","children":[]},{"level":3,"title":"Show Stream Processing Task","slug":"show-stream-processing-task","link":"#show-stream-processing-task","children":[]},{"level":3,"title":"Stream Processing Task Running Status Migration","slug":"stream-processing-task-running-status-migration","link":"#stream-processing-task-running-status-migration","children":[]}]},{"level":2,"title":"Authority Management","slug":"authority-management","link":"#authority-management","children":[{"level":3,"title":"Stream Processing Task","slug":"stream-processing-task","link":"#stream-processing-task","children":[]},{"level":3,"title":"Stream Processing Task Plugin","slug":"stream-processing-task-plugin","link":"#stream-processing-task-plugin","children":[]}]},{"level":2,"title":"Configure Parameters","slug":"configure-parameters","link":"#configure-parameters","children":[]}],"git":{"createdTime":1696932526000,"updatedTime":1712562344000,"contributors":[{"name":"wanghui42","email":"105700158+wanghui42@users.noreply.github.com","commits":2},{"name":"Caideyipi","email":"87789683+Caideyipi@users.noreply.github.com","commits":1}]},"readingTime":{"minutes":16.88,"words":5064},"filePathRelative":"UserGuide/V1.2.x/User-Manual/Streaming.md","localizedDate":"October 10, 2023","autoDesc":true}');export{j as comp,V as data};