blob: d639cc14d956a3ea16f7d5b268a925ddbbc67d93 [file] [log] [blame]
<!doctype html>
<html lang="en-US" data-theme="light">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<meta name="generator" content="VuePress 2.0.0-rc.9" />
<meta name="theme" content="VuePress Theme Hope 2.0.0-rc.34" />
<style>
html {
background: var(--bg-color, #fff);
}
html[data-theme="dark"] {
background: var(--bg-color, #1d1e1f);
}
body {
background: var(--bg-color);
}
</style>
<script>
const userMode = localStorage.getItem("vuepress-theme-hope-scheme");
const systemDarkMode =
window.matchMedia &&
window.matchMedia("(prefers-color-scheme: dark)").matches;
if (userMode === "dark" || (userMode !== "light" && systemDarkMode)) {
document.documentElement.setAttribute("data-theme", "dark");
}
</script>
<link rel="alternate" hreflang="zh-cn" href="https://iotdb.apache.org/zh/UserGuide/latest/User-Manual/Streaming_timecho.html"><meta property="og:url" content="https://iotdb.apache.org/UserGuide/latest/User-Manual/Streaming_timecho.html"><meta property="og:site_name" content="IoTDB Website"><meta property="og:title" content="IoTDB stream processing framework"><meta property="og:description" content="IoTDB stream processing framework The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engin..."><meta property="og:type" content="article"><meta property="og:image" content="https://alioss.timecho.com/docs/img/1706778988482.jpg"><meta property="og:locale" content="en-US"><meta property="og:locale:alternate" content="zh-CN"><meta property="og:updated_time" content="2024-04-08T07:45:44.000Z"><meta property="article:modified_time" content="2024-04-08T07:45:44.000Z"><script type="application/ld+json">{"@context":"https://schema.org","@type":"Article","headline":"IoTDB stream processing framework","image":["https://alioss.timecho.com/docs/img/1706778988482.jpg","https://alioss.timecho.com/docs/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png"],"dateModified":"2024-04-08T07:45:44.000Z","author":[]}</script><link rel="icon" href="/favicon.ico"><meta name="Description" content="Apache IoTDB: Time Series Database for IoT"><meta name="Keywords" content="TSDB, time series, time series database, IoTDB, IoT database, IoT data management,时序数据库, 时间序列管理, IoTDB, 物联网数据库, 实时数据库, 物联网数据管理, 物联网数据"><meta name="baidu-site-verification" content="wfKETzB3OT"><meta name="google-site-verification" content="mZWAoRY0yj_HAr-s47zHCGHzx5Ju-RVm5wDbPnwQYFo"><script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["setDoNotTrack", true]);
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '56']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script><title>IoTDB stream processing framework | IoTDB Website</title><meta name="description" content="IoTDB stream processing framework The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engin...">
<link rel="preload" href="/assets/style-DnEHAOmf.css" as="style"><link rel="stylesheet" href="/assets/style-DnEHAOmf.css">
<link rel="modulepreload" href="/assets/app-DrPcRZG6.js"><link rel="modulepreload" href="/assets/Streaming_timecho.html-D616o0Vh.js">
</head>
<body>
<div id="app"><!--[--><!--[--><!--[--><span tabindex="-1"></span><a href="#main-content" class="vp-skip-link sr-only">Skip to main content</a><!--]--><!--[--><div class="theme-container has-toc"><!--[--><header id="navbar" class="vp-navbar hide-icon"><div class="vp-navbar-start"><button type="button" class="vp-toggle-sidebar-button" title="Toggle Sidebar"><span class="icon"></span></button><!--[--><!----><!--]--><!--[--><a class="route-link vp-brand" href="/"><img class="vp-nav-logo" src="/logo.png" alt><!----><span class="vp-site-name hide-in-pad">IoTDB Website</span></a><!--]--><!--[--><!----><!--]--></div><div class="vp-navbar-center"><!--[--><!----><!--]--><!--[--><!--]--><!--[--><!----><!--]--></div><div class="vp-navbar-end"><!--[--><!----><!--]--><!--[--><!--[--><div id="docsearch-container" style="display:none;"></div><div><button type="button" class="DocSearch DocSearch-Button" aria-label="Search"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">Search</span></span><span class="DocSearch-Button-Keys"><kbd class="DocSearch-Button-Key"><svg width="15" height="15" class="DocSearch-Control-Key-Icon"><path d="M4.505 4.496h2M5.505 5.496v5M8.216 4.496l.055 5.993M10 7.5c.333.333.5.667.5 1v2M12.326 4.5v5.996M8.384 4.496c1.674 0 2.116 0 2.116 1.5s-.442 1.5-2.116 1.5M3.205 9.303c-.09.448-.277 1.21-1.241 1.203C1 10.5.5 9.513.5 8V7c0-1.57.5-2.5 1.464-2.494.964.006 1.134.598 1.24 1.342M12.553 10.5h1.953" stroke-width="1.2" stroke="currentColor" fill="none" stroke-linecap="square"></path></svg></kbd><kbd class="DocSearch-Button-Key">K</kbd></span></button></div><!--]--><nav class="vp-nav-links"><div class="vp-nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="Documentation"><span class="title"><!---->Documentation</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a class="route-link nav-link" href="/UserGuide/latest/QuickStart/QuickStart.html" aria-label="v1.3.x"><!---->v1.3.x<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/UserGuide/V1.2.x/QuickStart/QuickStart.html" aria-label="v1.2.x"><!---->v1.2.x<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/UserGuide/V1.1.x/QuickStart/QuickStart.html" aria-label="v1.1.x"><!---->v1.1.x<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/UserGuide/V1.0.x/QuickStart/QuickStart.html" aria-label="v1.0.x"><!---->v1.0.x<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/UserGuide/V0.13.x/QuickStart/QuickStart.html" aria-label="v0.13.x"><!---->v0.13.x<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><a href="https://cwiki.apache.org/confluence/display/IOTDB/System+Design" rel="noopener noreferrer" target="_blank" aria-label="Design" class="nav-link"><!---->Design<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></div><div class="vp-nav-item hide-in-mobile"><a class="route-link nav-link" href="/Download/" aria-label="Download"><!---->Download<!----></a></div><div class="vp-nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="Community"><span class="title"><!---->Community</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a class="route-link nav-link" href="/Community/About.html" aria-label="About"><!---->About<!----></a></li><li class="dropdown-item"><a href="https://cwiki.apache.org/confluence/display/iotdb" rel="noopener noreferrer" target="_blank" aria-label="Wiki" class="nav-link"><!---->Wiki<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Community/Community-Project-Committers.html" aria-label="People"><!---->People<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Community/Community-Powered-By.html" aria-label="Powered By"><!---->Powered By<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Community/Materials.html" aria-label="Resources"><!---->Resources<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Community/Feedback.html" aria-label="Feedback"><!---->Feedback<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="Development"><span class="title"><!---->Development</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a class="route-link nav-link" href="/Development/VoteRelease.html" aria-label="How to vote"><!---->How to vote<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/HowToCommit.html" aria-label="How to Commit"><!---->How to Commit<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/HowToJoin.html" aria-label="Become a Contributor"><!---->Become a Contributor<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/Committer.html" aria-label="Become a Committer"><!---->Become a Committer<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/ContributeGuide.html" aria-label="ContributeGuide"><!---->ContributeGuide<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/HowtoContributeCode.html" aria-label="How to Contribute Code"><!---->How to Contribute Code<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/format-changelist.html" aria-label="Changelist of TsFile"><!---->Changelist of TsFile<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/Development/rpc-changelist.html" aria-label="Changelist of RPC"><!---->Changelist of RPC<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="ASF"><span class="title"><!---->ASF</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a href="https://www.apache.org/" rel="noopener noreferrer" target="_blank" aria-label="Foundation" class="nav-link"><!---->Foundation<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/licenses/" rel="noopener noreferrer" target="_blank" aria-label="License" class="nav-link"><!---->License<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/security/" rel="noopener noreferrer" target="_blank" aria-label="Security" class="nav-link"><!---->Security<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/foundation/sponsorship.html" rel="noopener noreferrer" target="_blank" aria-label="Sponsorship" class="nav-link"><!---->Sponsorship<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/foundation/thanks.html" rel="noopener noreferrer" target="_blank" aria-label="Thanks" class="nav-link"><!---->Thanks<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/events/current-event" rel="noopener noreferrer" target="_blank" aria-label="Current Events" class="nav-link"><!---->Current Events<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://privacy.apache.org/policies/privacy-policy-public.html" rel="noopener noreferrer" target="_blank" aria-label="Privacy" class="nav-link"><!---->Privacy<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li></ul></button></div></div></nav><div class="vp-nav-item"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="Select language"><!--[--><svg xmlns="http://www.w3.org/2000/svg" class="icon i18n-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="i18n icon" style="width:1rem;height:1rem;vertical-align:middle;"><path d="M379.392 460.8 494.08 575.488l-42.496 102.4L307.2 532.48 138.24 701.44l-71.68-72.704L234.496 460.8l-45.056-45.056c-27.136-27.136-51.2-66.56-66.56-108.544h112.64c7.68 14.336 16.896 27.136 26.112 35.84l45.568 46.08 45.056-45.056C382.976 312.32 409.6 247.808 409.6 204.8H0V102.4h256V0h102.4v102.4h256v102.4H512c0 70.144-37.888 161.28-87.04 210.944L378.88 460.8zM576 870.4 512 1024H409.6l256-614.4H768l256 614.4H921.6l-64-153.6H576zM618.496 768h196.608L716.8 532.48 618.496 768z"></path></svg><!--]--><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a class="route-link nav-link active" href="/UserGuide/latest/User-Manual/Streaming_timecho.html" aria-label="English"><!---->English<!----></a></li><li class="dropdown-item"><a class="route-link nav-link" href="/zh/UserGuide/latest/User-Manual/Streaming_timecho.html" aria-label="简体中文"><!---->简体中文<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><button type="button" id="appearance-switch"><svg xmlns="http://www.w3.org/2000/svg" class="icon auto-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="auto icon" style="display:none;"><path d="M512 992C246.92 992 32 777.08 32 512S246.92 32 512 32s480 214.92 480 480-214.92 480-480 480zm0-840c-198.78 0-360 161.22-360 360 0 198.84 161.22 360 360 360s360-161.16 360-360c0-198.78-161.22-360-360-360zm0 660V212c165.72 0 300 134.34 300 300 0 165.72-134.28 300-300 300z"></path></svg><svg xmlns="http://www.w3.org/2000/svg" class="icon dark-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="dark icon" style="display:none;"><path d="M524.8 938.667h-4.267a439.893 439.893 0 0 1-313.173-134.4 446.293 446.293 0 0 1-11.093-597.334A432.213 432.213 0 0 1 366.933 90.027a42.667 42.667 0 0 1 45.227 9.386 42.667 42.667 0 0 1 10.24 42.667 358.4 358.4 0 0 0 82.773 375.893 361.387 361.387 0 0 0 376.747 82.774 42.667 42.667 0 0 1 54.187 55.04 433.493 433.493 0 0 1-99.84 154.88 438.613 438.613 0 0 1-311.467 128z"></path></svg><svg xmlns="http://www.w3.org/2000/svg" class="icon light-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="light icon" style="display:block;"><path d="M952 552h-80a40 40 0 0 1 0-80h80a40 40 0 0 1 0 80zM801.88 280.08a41 41 0 0 1-57.96-57.96l57.96-58a41.04 41.04 0 0 1 58 58l-58 57.96zM512 752a240 240 0 1 1 0-480 240 240 0 0 1 0 480zm0-560a40 40 0 0 1-40-40V72a40 40 0 0 1 80 0v80a40 40 0 0 1-40 40zm-289.88 88.08-58-57.96a41.04 41.04 0 0 1 58-58l57.96 58a41 41 0 0 1-57.96 57.96zM192 512a40 40 0 0 1-40 40H72a40 40 0 0 1 0-80h80a40 40 0 0 1 40 40zm30.12 231.92a41 41 0 0 1 57.96 57.96l-57.96 58a41.04 41.04 0 0 1-58-58l58-57.96zM512 832a40 40 0 0 1 40 40v80a40 40 0 0 1-80 0v-80a40 40 0 0 1 40-40zm289.88-88.08 58 57.96a41.04 41.04 0 0 1-58 58l-57.96-58a41 41 0 0 1 57.96-57.96z"></path></svg></button></div><div class="vp-nav-item vp-action"><a class="vp-action-link" href="https://github.com/apache/iotdb" target="_blank" rel="noopener noreferrer" aria-label="GitHub"><svg xmlns="http://www.w3.org/2000/svg" class="icon github-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="github icon" style="width:1.25rem;height:1.25rem;vertical-align:middle;"><path d="M511.957 21.333C241.024 21.333 21.333 240.981 21.333 512c0 216.832 140.544 400.725 335.574 465.664 24.49 4.395 32.256-10.07 32.256-23.083 0-11.69.256-44.245 0-85.205-136.448 29.61-164.736-64.64-164.736-64.64-22.315-56.704-54.4-71.765-54.4-71.765-44.587-30.464 3.285-29.824 3.285-29.824 49.195 3.413 75.179 50.517 75.179 50.517 43.776 75.008 114.816 53.333 142.762 40.79 4.523-31.66 17.152-53.377 31.19-65.537-108.971-12.458-223.488-54.485-223.488-242.602 0-53.547 19.114-97.323 50.517-131.67-5.035-12.33-21.93-62.293 4.779-129.834 0 0 41.258-13.184 134.912 50.346a469.803 469.803 0 0 1 122.88-16.554c41.642.213 83.626 5.632 122.88 16.554 93.653-63.488 134.784-50.346 134.784-50.346 26.752 67.541 9.898 117.504 4.864 129.834 31.402 34.347 50.474 78.123 50.474 131.67 0 188.586-114.73 230.016-224.042 242.09 17.578 15.232 33.578 44.672 33.578 90.454v135.85c0 13.142 7.936 27.606 32.854 22.87C862.25 912.597 1002.667 728.747 1002.667 512c0-271.019-219.648-490.667-490.71-490.667z"></path></svg></a></div><!--]--><!--[--><!----><!--]--><button type="button" class="vp-toggle-navbar-button" aria-label="Toggle Navbar" aria-expanded="false" aria-controls="nav-screen"><span><span class="vp-top"></span><span class="vp-middle"></span><span class="vp-bottom"></span></span></button></div></header><!----><!--]--><!----><div class="toggle-sidebar-wrapper"><span class="arrow start"></span></div><aside id="sidebar" class="vp-sidebar"><!--[--><!----><!--]--><ul class="vp-sidebar-links"><li><section class="vp-sidebar-group"><p class="vp-sidebar-header"><!----><span class="vp-sidebar-title">IoTDB User Guide (V1.3.x)</span><!----></p><ul class="vp-sidebar-links"></ul></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">About IoTDB</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">Quick Start</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">Basic Concept</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">Deployment &amp; Maintenance</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">User Manual</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">Tools System</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">API</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">Ecosystem Integration</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">SQL Manual</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">FAQ</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-header clickable" type="button"><!----><span class="vp-sidebar-title">Reference</span><span class="vp-arrow end"></span></button><!----></section></li></ul><!--[--><!----><!--]--></aside><!--[--><main id="main-content" class="vp-page"><!--[--><!--[--><!----><!--]--><!----><nav class="vp-breadcrumb disable"></nav><div class="vp-page-title"><h1><!---->IoTDB stream processing framework</h1><div class="page-info"><!----><!----><span class="page-date-info" aria-label="Writing Date"><svg xmlns="http://www.w3.org/2000/svg" class="icon calendar-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="calendar icon"><path d="M716.4 110.137c0-18.753-14.72-33.473-33.472-33.473-18.753 0-33.473 14.72-33.473 33.473v33.473h66.993v-33.473zm-334.87 0c0-18.753-14.72-33.473-33.473-33.473s-33.52 14.72-33.52 33.473v33.473h66.993v-33.473zm468.81 33.52H716.4v100.465c0 18.753-14.72 33.473-33.472 33.473a33.145 33.145 0 01-33.473-33.473V143.657H381.53v100.465c0 18.753-14.72 33.473-33.473 33.473a33.145 33.145 0 01-33.473-33.473V143.657H180.6A134.314 134.314 0 0046.66 277.595v535.756A134.314 134.314 0 00180.6 947.289h669.74a134.36 134.36 0 00133.94-133.938V277.595a134.314 134.314 0 00-133.94-133.938zm33.473 267.877H147.126a33.145 33.145 0 01-33.473-33.473c0-18.752 14.72-33.473 33.473-33.473h736.687c18.752 0 33.472 14.72 33.472 33.473a33.145 33.145 0 01-33.472 33.473z"></path></svg><span><!----></span><meta property="datePublished" content="2024-01-10T02:47:40.000Z"></span><span class="page-pageview-info" aria-label="Page views"><svg xmlns="http://www.w3.org/2000/svg" class="icon eye-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="eye icon"><path d="M992 512.096c0-5.76-.992-10.592-1.28-11.136-.192-2.88-1.152-8.064-2.08-10.816-.256-.672-.544-1.376-.832-2.08-.48-1.568-1.024-3.104-1.6-4.32C897.664 290.112 707.104 160 512 160c-195.072 0-385.632 130.016-473.76 322.592-1.056 2.112-1.792 4.096-2.272 5.856a55.512 55.512 0 00-.64 1.6c-1.76 5.088-1.792 8.64-1.632 7.744-.832 3.744-1.568 11.168-1.568 11.168-.224 2.272-.224 4.032.032 6.304 0 0 .736 6.464 1.088 7.808.128 1.824.576 4.512 1.12 6.976h-.032c.448 2.08 1.12 4.096 1.984 6.08.48 1.536.992 2.976 1.472 4.032C126.432 733.856 316.992 864 512 864c195.136 0 385.696-130.048 473.216-321.696 1.376-2.496 2.24-4.832 2.848-6.912.256-.608.48-1.184.672-1.728 1.536-4.48 1.856-8.32 1.728-8.32l-.032.032c.608-3.104 1.568-7.744 1.568-13.28zM512 672c-88.224 0-160-71.776-160-160s71.776-160 160-160 160 71.776 160 160-71.776 160-160 160z"></path></svg><span id="ArtalkPV" class="vp-pageview waline-pageview-count" data-path="/UserGuide/latest/User-Manual/Streaming_timecho.html" data-page-key="/UserGuide/latest/User-Manual/Streaming_timecho.html">...</span></span><span class="page-reading-time-info" aria-label="Reading Time"><svg xmlns="http://www.w3.org/2000/svg" class="icon timer-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="timer icon"><path d="M799.387 122.15c4.402-2.978 7.38-7.897 7.38-13.463v-1.165c0-8.933-7.38-16.312-16.312-16.312H256.33c-8.933 0-16.311 7.38-16.311 16.312v1.165c0 5.825 2.977 10.874 7.637 13.592 4.143 194.44 97.22 354.963 220.201 392.763-122.204 37.542-214.893 196.511-220.2 389.397-4.661 5.049-7.638 11.651-7.638 19.03v5.825h566.49v-5.825c0-7.379-2.849-13.981-7.509-18.9-5.049-193.016-97.867-351.985-220.2-389.527 123.24-37.67 216.446-198.453 220.588-392.892zM531.16 450.445v352.632c117.674 1.553 211.787 40.778 211.787 88.676H304.097c0-48.286 95.149-87.382 213.728-88.676V450.445c-93.077-3.107-167.901-81.297-167.901-177.093 0-8.803 6.99-15.793 15.793-15.793 8.803 0 15.794 6.99 15.794 15.793 0 80.261 63.69 145.635 142.01 145.635s142.011-65.374 142.011-145.635c0-8.803 6.99-15.793 15.794-15.793s15.793 6.99 15.793 15.793c0 95.019-73.789 172.82-165.96 177.093z"></path></svg><span>About 20 min</span><meta property="timeRequired" content="PT20M"></span><!----><!----></div><hr></div><div class="vp-toc-placeholder"><aside id="toc"><!--[--><!----><!--]--><div class="vp-toc-header">On This Page<button type="button" class="print-button" title="Print"><svg xmlns="http://www.w3.org/2000/svg" class="icon print-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="print icon"><path d="M819.2 364.8h-44.8V128c0-17.067-14.933-32-32-32H281.6c-17.067 0-32 14.933-32 32v236.8h-44.8C145.067 364.8 96 413.867 96 473.6v192c0 59.733 49.067 108.8 108.8 108.8h44.8V896c0 17.067 14.933 32 32 32h460.8c17.067 0 32-14.933 32-32V774.4h44.8c59.733 0 108.8-49.067 108.8-108.8v-192c0-59.733-49.067-108.8-108.8-108.8zM313.6 160h396.8v204.8H313.6V160zm396.8 704H313.6V620.8h396.8V864zM864 665.6c0 25.6-19.2 44.8-44.8 44.8h-44.8V588.8c0-17.067-14.933-32-32-32H281.6c-17.067 0-32 14.933-32 32v121.6h-44.8c-25.6 0-44.8-19.2-44.8-44.8v-192c0-25.6 19.2-44.8 44.8-44.8h614.4c25.6 0 44.8 19.2 44.8 44.8v192z"></path></svg></button><div class="arrow end"></div></div><div class="vp-toc-wrapper"><ul class="vp-toc-list"><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level2" href="#custom-stream-processing-plugin-development">Custom stream processing plugin development</a></li><li><ul class="vp-toc-list"><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#programming-development-dependencies">Programming development dependencies</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#event-driven-programming-model">Event-driven programming model</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#custom-stream-processing-plugin-programming-interface-definition">Custom stream processing plugin programming interface definition</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level2" href="#custom-stream-processing-plugin-management">Custom stream processing plugin management</a></li><li><ul class="vp-toc-list"><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#load-plugin-statement">Load plugin statement</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#delete-plugin-statement">Delete plugin statement</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#view-plugin-statements">View plugin statements</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level2" href="#system-preset-stream-processing-plugin">System preset stream processing plugin</a></li><li><ul class="vp-toc-list"><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#pre-built-source-plugin">Pre-built Source Plugin</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#preset-processor-plugin">Preset processor plugin</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#preset-sink-plugin">Preset sink plugin</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level2" href="#stream-processing-task-management">Stream processing task management</a></li><li><ul class="vp-toc-list"><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#create-a-stream-processing-task">Create a stream processing task</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#start-the-stream-processing-task">Start the stream processing task</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#stop-the-stream-processing-task">Stop the stream processing task</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#delete-stream-processing-tasks">Delete stream processing tasks</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#display-stream-processing-tasks">Display stream processing tasks</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#stream-processing-task-running-status-migration">Stream processing task running status migration</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level2" href="#authority-management">authority management</a></li><li><ul class="vp-toc-list"><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#stream-processing-tasks">Stream processing tasks</a></li><!----><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level3" href="#stream-processing-task-plugin">Stream processing task plugin</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="vp-toc-item"><a class="route-link vp-toc-link level2" href="#configuration-parameters">Configuration parameters</a></li><!----><!--]--></ul><div class="vp-toc-marker" style="top:-1.7rem;"></div></div><!--[--><!----><!--]--></aside></div><!--[--><!----><!--]--><div class="theme-hope-content"><h1 id="iotdb-stream-processing-framework" tabindex="-1"><a class="header-anchor" href="#iotdb-stream-processing-framework"><span>IoTDB stream processing framework</span></a></h1><p>The IoTDB stream processing framework allows users to implement customized stream processing logic, which can monitor and capture storage engine changes, transform changed data, and push transformed data outward.</p><p>We call <!---->. A stream processing task (Pipe) contains three subtasks:</p><ul><li>Source task</li><li>Processor task</li><li>Sink task</li></ul><p>The stream processing framework allows users to customize the processing logic of three subtasks using Java language and process data in a UDF-like manner.<br> In a Pipe, the above three subtasks are executed by three plugins respectively, and the data will be processed by these three plugins in turn:<br> Pipe Source is used to extract data, Pipe Processor is used to process data, Pipe Sink is used to send data, and the final data will be sent to an external system.</p><p><strong>The model of the Pipe task is as follows:</strong></p><figure><img src="https://alioss.timecho.com/docs/img/1706778988482.jpg" alt="pipe.png" tabindex="0" loading="lazy"><figcaption>pipe.png</figcaption></figure><p>Describing a data flow processing task essentially describes the properties of Pipe Source, Pipe Processor and Pipe Sink plugins.<br> Users can declaratively configure the specific attributes of the three subtasks through SQL statements, and achieve flexible data ETL capabilities by combining different attributes.</p><p>Using the stream processing framework, a complete data link can be built to meet the needs of end-side-cloud synchronization, off-site disaster recovery, and read-write load sub-library*.</p><h2 id="custom-stream-processing-plugin-development" tabindex="-1"><a class="header-anchor" href="#custom-stream-processing-plugin-development"><span>Custom stream processing plugin development</span></a></h2><h3 id="programming-development-dependencies" tabindex="-1"><a class="header-anchor" href="#programming-development-dependencies"><span>Programming development dependencies</span></a></h3><p>It is recommended to use maven to build the project and add the following dependencies in <code>pom.xml</code>. Please be careful to select the same dependency version as the IoTDB server version.</p><div class="language-xml line-numbers-mode" data-ext="xml" data-title="xml"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.iotdb<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>pipe-api<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>1.3.1<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>scope</span><span class="token punctuation">&gt;</span></span>provided<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>scope</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="event-driven-programming-model" tabindex="-1"><a class="header-anchor" href="#event-driven-programming-model"><span>Event-driven programming model</span></a></h3><p>The user programming interface design of the stream processing plugin refers to the general design concept of the event-driven programming model. Events are data abstractions in the user programming interface, and the programming interface is decoupled from the specific execution method. It only needs to focus on describing the processing method expected by the system after the event (data) reaches the system.</p><p>In the user programming interface of the stream processing plugin, events are an abstraction of database data writing operations. The event is captured by the stand-alone stream processing engine, and is passed to the PipeSource plugin, PipeProcessor plugin, and PipeSink plugin in sequence according to the three-stage stream processing process, and triggers the execution of user logic in the three plugins in turn.</p><p>In order to take into account the low latency of stream processing in low load scenarios on the end side and the high throughput of stream processing in high load scenarios on the end side, the stream processing engine will dynamically select processing objects in the operation logs and data files. Therefore, user programming of stream processing The interface requires users to provide processing logic for the following two types of events: operation log writing event TabletInsertionEvent and data file writing event TsFileInsertionEvent.</p><h4 id="operation-log-writing-event-tabletinsertionevent" tabindex="-1"><a class="header-anchor" href="#operation-log-writing-event-tabletinsertionevent"><span><strong>Operation log writing event (TabletInsertionEvent)</strong></span></a></h4><p>The operation log write event (TabletInsertionEvent) is a high-level data abstraction for user write requests. It provides users with the ability to manipulate the underlying data of write requests by providing a unified operation interface.</p><p>For different database deployment methods, the underlying storage structures corresponding to operation log writing events are different. For stand-alone deployment scenarios, the operation log writing event is an encapsulation of write-ahead log (WAL) entries; for a distributed deployment scenario, the operation log writing event is an encapsulation of a single node consensus protocol operation log entry.</p><p>For write operations generated by different write request interfaces in the database, the data structure of the request structure corresponding to the operation log write event is also different. IoTDB provides numerous writing interfaces such as InsertRecord, InsertRecords, InsertTablet, InsertTablets, etc. Each writing request uses a completely different serialization method, and the generated binary entries are also different.</p><p>The existence of operation log writing events provides users with a unified view of data operations, which shields the implementation differences of the underlying data structure, greatly reduces the user&#39;s programming threshold, and improves the ease of use of the function.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/** TabletInsertionEvent is used to define the event of data insertion. */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TabletInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* The consumer processes the data row by row and collects the results by RowCollector.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processRowByRow</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Row</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* The consumer processes the Tablet directly and collects the results by RowCollector.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processTablet</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Tablet</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="data-file-writing-event-tsfileinsertionevent" tabindex="-1"><a class="header-anchor" href="#data-file-writing-event-tsfileinsertionevent"><span><strong>Data file writing event (TsFileInsertionEvent)</strong></span></a></h4><p>The data file writing event (TsFileInsertionEvent) is a high-level abstraction of the database file writing operation. It is a data collection of several operation log writing events (TabletInsertionEvent).</p><p>The storage engine of IoTDB is LSM structured. When data is written, the writing operation will first be placed into a log-structured file, and the written data will be stored in the memory at the same time. When the memory reaches the control upper limit, the disk flushing behavior will be triggered, that is, the data in the memory will be converted into a database file, and the previously prewritten operation log will be deleted. When the data in the memory is converted into the data in the database file, it will undergo two compression processes: encoding compression and general compression. Therefore, the data in the database file takes up less space than the original data in the memory.</p><p>In extreme network conditions, directly transmitting data files is more economical than transmitting data writing operations. It will occupy lower network bandwidth and achieve faster transmission speeds. Of course, there is no free lunch. Computing and processing data in files requires additional file I/O costs compared to directly computing and processing data in memory. However, it is precisely the existence of two structures, disk data files and memory write operations, with their own advantages and disadvantages, that gives the system the opportunity to make dynamic trade-offs and adjustments. It is based on this observation that data files are introduced into the plugin&#39;s event model. Write event.</p><p>To sum up, the data file writing event appears in the event stream of the stream processing plugin, and there are two situations:</p><p>(1) Historical data extraction: Before a stream processing task starts, all written data that has been placed on the disk will exist in the form of TsFile. After a stream processing task starts, when collecting historical data, the historical data will be abstracted using TsFileInsertionEvent;</p><p>(2) Real-time data extraction: When a stream processing task is in progress, when the real-time processing speed of operation log write events in the data stream is slower than the write request speed, after a certain progress, the operation log write events that cannot be processed in the future will be persisted. to disk and exists in the form of TsFile. After this data is extracted by the stream processing engine, TsFileInsertionEvent will be used as an abstraction.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
* which is compressed and encoded, and requires IO cost for computational processing.
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TsFileInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
*
* <span class="token keyword">@return</span> <span class="token punctuation">{</span><span class="token keyword">@code</span> <span class="token code-section"><span class="token code language-java"><span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span></span></span><span class="token punctuation">}</span> the list of TabletInsertionEvent
*/</span>
<span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="custom-stream-processing-plugin-programming-interface-definition" tabindex="-1"><a class="header-anchor" href="#custom-stream-processing-plugin-programming-interface-definition"><span>Custom stream processing plugin programming interface definition</span></a></h3><p>Based on the custom stream processing plugin programming interface, users can easily write data extraction plugins, data processing plugins and data sending plugins, so that the stream processing function can be flexibly adapted to various industrial scenarios.</p><h4 id="data-extraction-plugin-interface" tabindex="-1"><a class="header-anchor" href="#data-extraction-plugin-interface"><span>Data extraction plugin interface</span></a></h4><p>Data extraction is the first stage of the three stages of stream processing data from data extraction to data sending. The data extraction plugin (PipeSource) is the bridge between the stream processing engine and the storage engine. It monitors the behavior of the storage engine,<br> Capture various data write events.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeSource
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeSource is responsible for capturing events from sources.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various data sources can be supported by implementing different PipeSource classes.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeSource is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of `WITH Source` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will
* be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to
* configure the runtime behavior of the PipeSource.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Then the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to start the PipeSource.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be
* called to capture events from sources and then the events will be passed to the
* PipeProcessor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called when the collaboration task is
* cancelled (the `DROP PIPE` command is executed).
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeSource</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeSource. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeSourceRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeSource
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* Start the Source. After this method is called, events should be ready to be supplied by
* <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>. This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSourceRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* Supply single event from the Source and the caller will send the event to the processor.
* This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSource</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@return</span> the event to be supplied. the event may be null if the Source has no more events at
* the moment, but the Source is still running for more events.
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token class-name">Event</span> <span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="data-processing-plugin-interface" tabindex="-1"><a class="header-anchor" href="#data-processing-plugin-interface"><span>Data processing plugin interface</span></a></h4><p>Data processing is the second stage of the three stages of stream processing data from data extraction to data sending. The data processing plugin (PipeProcessor) is mainly used to filter and transform the data captured by the data extraction plugin (PipeSource).<br> various events.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeProcessor
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeProcessor is used to filter and transform the Event formed by the PipeSource.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeProcessor is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* will be called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
* to configure the runtime behavior of the PipeProcessor.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSource captures the events and wraps them into three types of Event instances.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeSource. The
* following 3 methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSink serializes the events into binaries and send them to sinks.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeProcessor</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeProcessor. In this method, the user can do the
* following things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeProcessorRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called and before the beginning of the
* events processing.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeProcessor
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is called to process the TabletInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is called to process the TsFileInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">default</span> <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">final</span> <span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent <span class="token operator">:</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token function">process</span><span class="token punctuation">(</span>tabletInsertionEvent<span class="token punctuation">,</span> eventCollector<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* This method is called to process the Event.
*
* <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be processed
* <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="data-sending-plugin-interface" tabindex="-1"><a class="header-anchor" href="#data-sending-plugin-interface"><span>Data sending plugin interface</span></a></h4><p>Data sending is the third stage of the three stages of stream processing data from data extraction to data sending. The data sending plugin (PipeSink) is mainly used to send data processed by the data processing plugin (PipeProcessor).<br> Various events, it serves as the network implementation layer of the stream processing framework, and the interface should allow access to multiple real-time communication protocols and multiple sinks.</p><div class="language-java line-numbers-mode" data-ext="java" data-title="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
* PipeSink
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeSink is responsible for sending events to sinks.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various network protocols can be supported by implementing different PipeSink classes.
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeSink is as follows:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of `WITH SINK` clause in SQL are
* parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be
* called to validate the parameters.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span>
* <span class="token class-name">PipeSinkRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to configure the runtime behavior of the
* PipeSink and the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to create a connection
* with sink.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSource captures the events and wraps them into three types of Event instances.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeSink.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeSink serializes the events into binaries and send them to sinks. The following 3
* methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>In addition, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called periodically to check
* whether the connection with sink is still alive. The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be
* called to create a new connection with the sink when the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
* throws exceptions.
*/</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeSink</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>
<span class="token doc-comment comment">/**
* This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
* <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSinkRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
*/</span>
<span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is mainly used to customize PipeSink. In this method, the user can do the following
* things:
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeSinkRuntimeConfiguration.
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
*
* <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is
* called and before the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
*
* <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
* <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeSink
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeSinkRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
<span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to create a connection with sink. This method will be called after the
* method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeSinkRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called or
* will be called when the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeSink</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> throws exceptions.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection is failed to be created
*/</span>
<span class="token keyword">void</span> <span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method will be called periodically to check whether the connection with sink is still
* alive.
*
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection dies
*/</span>
<span class="token keyword">void</span> <span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the TabletInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the TsFileInsertionEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">default</span> <span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
<span class="token keyword">try</span> <span class="token punctuation">{</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">final</span> <span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent <span class="token operator">:</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token function">transfer</span><span class="token punctuation">(</span>tabletInsertionEvent<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
tsFileInsertionEvent<span class="token punctuation">.</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token doc-comment comment">/**
* This method is used to transfer the generic events, including HeartbeatEvent.
*
* <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be transferred
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
* <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
*/</span>
<span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="custom-stream-processing-plugin-management" tabindex="-1"><a class="header-anchor" href="#custom-stream-processing-plugin-management"><span>Custom stream processing plugin management</span></a></h2><p>In order to ensure the flexibility and ease of use of user-defined plugins in actual production, the system also needs to provide the ability to dynamically and uniformly manage plugins.<br> The stream processing plugin management statements introduced in this chapter provide an entry point for dynamic unified management of plugins.</p><h3 id="load-plugin-statement" tabindex="-1"><a class="header-anchor" href="#load-plugin-statement"><span>Load plugin statement</span></a></h3><p>In IoTDB, if you want to dynamically load a user-defined plugin in the system, you first need to implement a specific plugin class based on PipeSource, PipeProcessor or PipeSink.<br> Then the plugin class needs to be compiled and packaged into a jar executable file, and finally the plugin is loaded into IoTDB using the management statement for loading the plugin.</p><p>The syntax of the management statement for loading the plugin is shown in the figure.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPEPLUGIN <span class="token operator">&lt;</span>alias<span class="token operator">&gt;</span>
<span class="token keyword">AS</span> <span class="token operator">&lt;</span><span class="token keyword">full</span> class name<span class="token operator">&gt;</span>
<span class="token keyword">USING</span> <span class="token operator">&lt;</span>URI <span class="token keyword">of</span> JAR package<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>Example: If you implement a data processing plugin named edu.tsinghua.iotdb.pipe.ExampleProcessor, and the packaged jar package is pipe-plugin.jar, you want to use this plugin in the stream processing engine, and mark the plugin as example. There are two ways to use the plugin package, one is to upload to the URI server, and the other is to upload to the local directory of the cluster.</p><p>Method 1: Upload to the URI server</p><p>Preparation: To register in this way, you need to upload the JAR package to the URI server in advance and ensure that the IoTDB instance that executes the registration statement can access the URI server. For example <a href="https://example.com:8080/iotdb/pipe-plugin.jar" target="_blank" rel="noopener noreferrer">https://example.com:8080/iotdb/pipe-plugin.jar<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span></a> .</p><p>SQL:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SQL</span> <span class="token keyword">CREATE</span> PIPEPLUGIN example
<span class="token keyword">AS</span> <span class="token string">&#39;edu.tsinghua.iotdb.pipe.ExampleProcessor&#39;</span>
<span class="token keyword">USING</span> URI <span class="token string">&#39;&lt;https://example.com:8080/iotdb/pipe-plugin.jar&gt;&#39;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>Method 2: Upload the data to the local directory of the cluster</p><p>Preparation: To register in this way, you need to place the JAR package in any path on the machine where the DataNode node is located, and we recommend that you place the JAR package in the /ext/pipe directory of the IoTDB installation path (the installation package is already in the installation package, so you do not need to create a new one). For example: iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar. <strong>(Note: If you are using a cluster, you will need to place the JAR package under the same path as the machine where each DataNode node is located)</strong></p><p>SQL:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SQL</span> <span class="token keyword">CREATE</span> PIPEPLUGIN example
<span class="token keyword">AS</span> <span class="token string">&#39;edu.tsinghua.iotdb.pipe.ExampleProcessor&#39;</span>
<span class="token keyword">USING</span> URI <span class="token string">&#39;&lt;file:/IoTDB installation path/iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar&gt;&#39;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="delete-plugin-statement" tabindex="-1"><a class="header-anchor" href="#delete-plugin-statement"><span>Delete plugin statement</span></a></h3><p>When the user no longer wants to use a plugin and needs to uninstall the plugin from the system, he can use the delete plugin statement as shown in the figure.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPEPLUGIN <span class="token operator">&lt;</span>alias<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="view-plugin-statements" tabindex="-1"><a class="header-anchor" href="#view-plugin-statements"><span>View plugin statements</span></a></h3><p>Users can also view plugins in the system on demand. View the statement of the plugin as shown in the figure.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPEPLUGINS
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h2 id="system-preset-stream-processing-plugin" tabindex="-1"><a class="header-anchor" href="#system-preset-stream-processing-plugin"><span>System preset stream processing plugin</span></a></h2><h3 id="pre-built-source-plugin" tabindex="-1"><a class="header-anchor" href="#pre-built-source-plugin"><span>Pre-built Source Plugin</span></a></h3><h4 id="iotdb-source" tabindex="-1"><a class="header-anchor" href="#iotdb-source"><span>iotdb-source</span></a></h4><p>Function: Extract historical or realtime data inside IoTDB into pipe.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>source</td><td>iotdb-source</td><td>String: iotdb-source</td><td>required</td></tr><tr><td>source.pattern</td><td>path prefix for filtering time series</td><td>String: any time series prefix</td><td>optional: root</td></tr><tr><td>source.history.start-time</td><td>start of synchronizing historical data event time,including start-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>source.history.end-time</td><td>end of synchronizing historical data event time,including end-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>source.forwarding-pipe-requests</td><td>Whether to forward data written by another Pipe (usually Data Sync)</td><td>Boolean: true, false</td><td>optional:true</td></tr><tr><td>start-time(V1.3.1+)</td><td>start of synchronizing all data event time,including start-time. Will disable &quot;history.start-time&quot; &quot;history.end-time&quot; if configured</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>end-time(V1.3.1+)</td><td>end of synchronizing all data event time,including end-time. Will disable &quot;history.start-time&quot; &quot;history.end-time&quot; if configured</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>source.realtime.mode</td><td>Extraction mode for real-time data</td><td>String: hybrid, stream, batch</td><td>optional:hybrid</td></tr><tr><td>source.forwarding-pipe-requests</td><td>Whether to forward data written by another Pipe (usually Data Sync)</td><td>Boolean: true, false</td><td>optional:true</td></tr></tbody></table><blockquote><p>🚫 <strong>source.pattern Parameter Description</strong></p><ul><li><p>Pattern should use backquotes to modify illegal characters or illegal path nodes, for example, if you want to filter root.`a@b` or root.`123`, you should set the pattern to root.`a@b` or root.`123`(Refer specifically to <a href="https://iotdb.apache.org/Download/" target="_blank" rel="noopener noreferrer">Timing of single and double quotes and backquotes<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span></a></p></li><li><p>In the underlying implementation, when pattern is detected as root (default value) or a database name, synchronization efficiency is higher, and any other format will reduce performance.</p></li><li><p>The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter &#39;source.pattern&#39;=&#39;root.aligned.1&#39;:</p><ul><li>root.aligned.1TS</li><li>root.aligned.1TS.`1`</li><li>root.aligned.100TS</li></ul><p>the data will be synchronized;</p><ul><li>root.aligned.`1`</li><li>root.aligned.`123`</li></ul><p>the data will not be synchronized.</p></li></ul></blockquote><blockquote><p>❗️<strong>start-time, end-time parameter description of source</strong></p><ul><li>start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00. However, version 1.3.1+ supports timeStamp format like 1706704494000.</li></ul></blockquote><blockquote><p><strong>A piece of data from production to IoTDB contains two key concepts of time</strong></p><ul><li><strong>event time:</strong> The time when the data is actually produced (or the generation time assigned to the data by the data production system, which is the time item in the data point), also called event time.</li><li><strong>arrival time:</strong> The time when data arrives in the IoTDB system.</li></ul><p>The out-of-order data we often refer to refers to data whose <strong>event time</strong> is far behind the current system time (or the maximum <strong>event time</strong> that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their <strong>arrival time</strong> will increase with the order in which the data arrives at IoTDB.</p></blockquote><blockquote><p>💎 <strong>The work of iotdb-source can be split into two stages</strong></p><ol><li>Historical data extraction: All data with <strong>arrival time</strong> &lt; <strong>current system time</strong> when creating the pipe is called historical data</li><li>Realtime data extraction: All data with <strong>arrival time</strong> &gt;= <strong>current system time</strong> when the pipe is created is called realtime data</li></ol><p>The historical data transmission phase and the realtime data transmission phase are executed serially. Only when the historical data transmission phase is completed, the realtime data transmission phase is executed.**</p></blockquote><blockquote><p>📌 <strong>source.realtime.mode: Data extraction mode</strong></p><ul><li>log: In this mode, the task only uses the operation log for data processing and sending</li><li>file: In this mode, the task only uses data files for data processing and sending.</li><li>hybrid: This mode takes into account the characteristics of low latency but low throughput when sending data one by one in the operation log, and the characteristics of high throughput but high latency when sending in batches of data files. It can automatically operate under different write loads. Switch the appropriate data extraction method. First, adopt the data extraction method based on operation logs to ensure low sending delay. When a data backlog occurs, it will automatically switch to the data extraction method based on data files to ensure high sending throughput. When the backlog is eliminated, it will automatically switch back to the data extraction method based on data files. The data extraction method of the operation log avoids the problem of difficulty in balancing data sending delay or throughput using a single data extraction algorithm.</li></ul></blockquote><blockquote><p>🍕 <strong>source.forwarding-pipe-requests: Whether to allow forwarding data transmitted from another pipe</strong></p><ul><li>If you want to use pipe to build data synchronization of A -&gt; B -&gt; C, then the pipe of B -&gt; C needs to set this parameter to true, so that the data written by A to B through the pipe in A -&gt; B can be forwarded correctly. to C</li><li>If you want to use pipe to build two-way data synchronization (dual-active) of A &lt;-&gt; B, then the pipes of A -&gt; B and B -&gt; A need to set this parameter to false, otherwise the data will be endless. inter-cluster round-robin forwarding</li></ul></blockquote><h3 id="preset-processor-plugin" tabindex="-1"><a class="header-anchor" href="#preset-processor-plugin"><span>Preset processor plugin</span></a></h3><h4 id="do-nothing-processor" tabindex="-1"><a class="header-anchor" href="#do-nothing-processor"><span>do-nothing-processor</span></a></h4><p>Function: No processing is done on the events passed in by the source.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>processor</td><td>do-nothing-processor</td><td>String: do-nothing-processor</td><td>required</td></tr></tbody></table><h3 id="preset-sink-plugin" tabindex="-1"><a class="header-anchor" href="#preset-sink-plugin"><span>Preset sink plugin</span></a></h3><h4 id="do-nothing-sink" tabindex="-1"><a class="header-anchor" href="#do-nothing-sink"><span>do-nothing-sink</span></a></h4><p>Function: No processing is done on the events passed in by the processor.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>sink</td><td>do-nothing-sink</td><td>String: do-nothing-sink</td><td>required</td></tr></tbody></table><h2 id="stream-processing-task-management" tabindex="-1"><a class="header-anchor" href="#stream-processing-task-management"><span>Stream processing task management</span></a></h2><h3 id="create-a-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#create-a-stream-processing-task"><span>Create a stream processing task</span></a></h3><p>Use the <code>CREATE PIPE</code> statement to create a stream processing task. Taking the creation of a data synchronization stream processing task as an example, the sample SQL statement is as follows:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId is the name that uniquely identifies the sync task</span>
<span class="token keyword">WITH</span> SOURCE <span class="token punctuation">(</span>
<span class="token comment">-- Default IoTDB Data Extraction Plugin</span>
<span class="token string">&#39;source&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-source&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Path prefix, only data that can match the path prefix will be extracted for subsequent processing and delivery</span>
<span class="token string">&#39;source.pattern&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;root.timecho&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Whether to extract historical data</span>
<span class="token string">&#39;source.history.enable&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Describes the time range of the historical data being extracted, indicating the earliest possible time</span>
<span class="token string">&#39;source.history.start-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2011.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Describes the time range of the extracted historical data, indicating the latest time</span>
<span class="token string">&#39;source.history.end-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2022.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Whether to extract realtime data</span>
<span class="token string">&#39;source.realtime.enable&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> PROCESSOR <span class="token punctuation">(</span>
<span class="token comment">-- Default data processing plugin, means no processing</span>
<span class="token string">&#39;processor&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;do-nothing-processor&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token comment">-- IoTDB data sending plugin with target IoTDB</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Data service for one of the DataNode nodes on the target IoTDB ip</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- Data service port of one of the DataNode nodes of the target IoTDB</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>When creating a stream processing task, you need to configure the PipeId and the parameters of the three plugin parts:</strong></p><table><thead><tr><th>Configuration</th><th>Description</th><th>Required or not</th><th>Default implementation</th><th>Default implementation description</th><th>Default implementation description</th></tr></thead><tbody><tr><td>PipeId</td><td>A globally unique name that identifies a stream processing</td><td><!----></td><td>-</td><td>-</td><td>-</td></tr><tr><td>source</td><td>Pipe Source plugin, responsible for extracting stream processing data at the bottom of the database</td><td>Optional</td><td>iotdb-source</td><td>Integrate the full historical data of the database and subsequent real-time data arriving into the stream processing task</td><td>No</td></tr><tr><td>processor</td><td>Pipe Processor plugin, responsible for processing data</td><td>Optional</td><td>do-nothing-processor</td><td>Does not do any processing on the incoming data</td><td><!----></td></tr><tr><td>sink</td><td>Pipe Sink plugin, responsible for sending data</td><td><!----></td><td>-</td><td>-</td><td><!----></td></tr></tbody></table><p>In the example, the iotdb-source, do-nothing-processor and iotdb-thrift-sink plugins are used to build the data flow processing task. IoTDB also has other built-in stream processing plugins, <strong>please check the &quot;System Preset Stream Processing plugin&quot; section</strong>.</p><p><strong>A simplest example of the CREATE PIPE statement is as follows:</strong></p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId is a name that uniquely identifies the stream processing task</span>
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token comment">-- IoTDB data sending plugin, the target is IoTDB</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token comment">--The data service IP of one of the DataNode nodes in the target IoTDB</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
<span class="token comment">-- The data service port of one of the DataNode nodes in the target IoTDB</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>The semantics expressed are: synchronize all historical data in this database instance and subsequent real-time data arriving to the IoTDB instance with the target 127.0.0.1:6667.</p><p><strong>Notice:</strong></p><ul><li><p>SOURCE and PROCESSOR are optional configurations. If you do not fill in the configuration parameters, the system will use the corresponding default implementation.</p></li><li><p>SINK is a required configuration and needs to be configured declaratively in the CREATE PIPE statement</p></li><li><p>SINK has self-reuse capability. For different stream processing tasks, if their SINKs have the same KV attributes (the keys corresponding to the values of all attributes are the same), then the system will only create one SINK instance in the end to realize the duplication of connection resources.</p><ul><li>For example, there are the following declarations of two stream processing tasks, pipe1 and pipe2:</li></ul><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE pipe1
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">CREATE</span> PIPE pipe2
<span class="token keyword">WITH</span> SINK <span class="token punctuation">(</span>
<span class="token string">&#39;sink&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-sink&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token string">&#39;sink.ip&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div></li><li><p>Because their declarations of SINK are exactly the same (<strong>even if the order of declaration of some attributes is different</strong>), the framework will automatically reuse the SINKs they declared, and ultimately the SINKs of pipe1 and pipe2 will be the same instance. .</p></li><li><p>When the source is the default iotdb-source, and source.forwarding-pipe-requests is the default value true, please do not build an application scenario that includes data cycle synchronization (it will cause an infinite loop):</p><ul><li>IoTDB A -&gt; IoTDB B -&gt; IoTDB A</li><li>IoTDB A -&gt; IoTDB A</li></ul></li></ul><h3 id="start-the-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#start-the-stream-processing-task"><span>Start the stream processing task</span></a></h3><p>After the CREATE PIPE statement is successfully executed, the stream processing task-related instance will be created, but the running status of the entire stream processing task will be set to STOPPED(V1.3.0), that is, the stream processing task will not process data immediately. In version 1.3.1 and later, the status of the task will be set to RUNNING after CREATE.</p><p>You can use the START PIPE statement to cause a stream processing task to start processing data:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">START</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="stop-the-stream-processing-task" tabindex="-1"><a class="header-anchor" href="#stop-the-stream-processing-task"><span>Stop the stream processing task</span></a></h3><p>Use the STOP PIPE statement to stop the stream processing task from processing data:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code>STOP PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="delete-stream-processing-tasks" tabindex="-1"><a class="header-anchor" href="#delete-stream-processing-tasks"><span>Delete stream processing tasks</span></a></h3><p>Use the DROP PIPE statement to stop the stream processing task from processing data (when the stream processing task status is RUNNING), and then delete the entire stream processing task:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>Users do not need to perform a STOP operation before deleting the stream processing task.</p><h3 id="display-stream-processing-tasks" tabindex="-1"><a class="header-anchor" href="#display-stream-processing-tasks"><span>Display stream processing tasks</span></a></h3><p>Use the SHOW PIPES statement to view all stream processing tasks:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>The query results are as follows:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
<span class="token operator">|</span> ID<span class="token operator">|</span> CreationTime<span class="token operator">|</span> State<span class="token operator">|</span>PipeSource<span class="token operator">|</span>PipeProcessor<span class="token operator">|</span>PipeSink<span class="token operator">|</span>ExceptionMessage<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>kafka<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">30</span>T20:<span class="token number">58</span>:<span class="token number">30.689</span><span class="token operator">|</span>RUNNING<span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> {}<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>iotdb<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">31</span>T12:<span class="token number">55</span>:<span class="token number">28.129</span><span class="token operator">|</span>STOPPED<span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> TException: <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>You can use <code>&lt;PipeId&gt;</code> to specify the status of a stream processing task you want to see:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>You can also use the where clause to determine whether the Pipe Sink used by a certain &lt;PipeId&gt; is reused.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
<span class="token keyword">WHERE</span> SINK USED <span class="token keyword">BY</span> <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="stream-processing-task-running-status-migration" tabindex="-1"><a class="header-anchor" href="#stream-processing-task-running-status-migration"><span>Stream processing task running status migration</span></a></h3><p>A stream processing pipe will pass through various states during its managed life cycle:</p><ul><li><strong>RUNNING:</strong> pipe is working properly <ul><li>When a pipe is successfully created, its initial state is RUNNING.(V1.3.1+)</li></ul></li><li><strong>STOPPED:</strong> The pipe is stopped. When the pipeline is in this state, there are several possibilities: <ul><li>When a pipe is successfully created, its initial state is STOPPED.(V1.3.0)</li><li>The user manually pauses a pipe that is in normal running status, and its status will passively change from RUNNING to STOPPED.</li><li>When an unrecoverable error occurs during the running of a pipe, its status will automatically change from RUNNING to STOPPED</li></ul></li><li><strong>DROPPED:</strong> The pipe task was permanently deleted</li></ul><p>The following diagram shows all states and state transitions:</p><figure><img src="https://alioss.timecho.com/docs/img/状态迁移图.png" alt="State migration diagram" tabindex="0" loading="lazy"><figcaption>State migration diagram</figcaption></figure><h2 id="authority-management" tabindex="-1"><a class="header-anchor" href="#authority-management"><span>authority management</span></a></h2><h3 id="stream-processing-tasks" tabindex="-1"><a class="header-anchor" href="#stream-processing-tasks"><span>Stream processing tasks</span></a></h3><table><thead><tr><th>Permission name</th><th>Description</th></tr></thead><tbody><tr><td>USE_PIPE</td><td>Register a stream processing task. The path is irrelevant.</td></tr><tr><td>USE_PIPE</td><td>Start the stream processing task. The path is irrelevant.</td></tr><tr><td>USE_PIPE</td><td>Stop the stream processing task. The path is irrelevant.</td></tr><tr><td>USE_PIPE</td><td>Offload stream processing tasks. The path is irrelevant.</td></tr><tr><td>USE_PIPE</td><td>Query stream processing tasks. The path is irrelevant.</td></tr></tbody></table><h3 id="stream-processing-task-plugin" tabindex="-1"><a class="header-anchor" href="#stream-processing-task-plugin"><span>Stream processing task plugin</span></a></h3><table><thead><tr><th>Permission name</th><th>Description</th></tr></thead><tbody><tr><td>USE_PIPE</td><td>Register stream processing task plugin. The path is irrelevant.</td></tr><tr><td>USE_PIPE</td><td>Uninstall the stream processing task plugin. The path is irrelevant.</td></tr><tr><td>USE_PIPE</td><td>Query stream processing task plugin. The path is irrelevant.</td></tr></tbody></table><h2 id="configuration-parameters" tabindex="-1"><a class="header-anchor" href="#configuration-parameters"><span>Configuration parameters</span></a></h2><p>In iotdb-common.properties:</p><p>V1.3.0+:</p><div class="language-Properties line-numbers-mode" data-ext="Properties" data-title="Properties"><pre class="language-Properties"><code>####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by &quot;\\&quot;, or if its prefix is &quot;\\\\&quot;, then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>V1.3.1+:</p><div class="language-Properties line-numbers-mode" data-ext="Properties" data-title="Properties"><pre class="language-Properties"><code># Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by &quot;\\&quot;, or if its prefix is &quot;\\\\&quot;, then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# pipe_sink_selector_number=4
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div></div><!--[--><!----><!--]--><footer class="vp-page-meta"><div class="vp-meta-item edit-link"><a href="https://github.com/apache/iotdb-docs/edit/main/src/UserGuide/latest/User-Manual/Streaming_timecho.md" rel="noopener noreferrer" target="_blank" aria-label="Found Error? Edit this page on GitHub" class="nav-link vp-meta-label"><!--[--><svg xmlns="http://www.w3.org/2000/svg" class="icon edit-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="edit icon"><path d="M430.818 653.65a60.46 60.46 0 0 1-50.96-93.281l71.69-114.012 7.773-10.365L816.038 80.138A60.46 60.46 0 0 1 859.225 62a60.46 60.46 0 0 1 43.186 18.138l43.186 43.186a60.46 60.46 0 0 1 0 86.373L588.879 565.55l-8.637 8.637-117.466 68.234a60.46 60.46 0 0 1-31.958 11.229z"></path><path d="M728.802 962H252.891A190.883 190.883 0 0 1 62.008 771.98V296.934a190.883 190.883 0 0 1 190.883-192.61h267.754a60.46 60.46 0 0 1 0 120.92H252.891a69.962 69.962 0 0 0-69.098 69.099V771.98a69.962 69.962 0 0 0 69.098 69.098h475.911A69.962 69.962 0 0 0 797.9 771.98V503.363a60.46 60.46 0 1 1 120.922 0V771.98A190.883 190.883 0 0 1 728.802 962z"></path></svg><!--]-->Found Error? Edit this page on GitHub<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></div><div class="vp-meta-item git-info"><div class="update-time"><span class="vp-meta-label">Last update: </span><!----></div><!----></div></footer><!----><!----><!--[--><!----><!--]--><!--]--></main><!--]--><footer style="padding-bottom:2rem;"><span id="doc-version" style="display:none;">latest</span><p style="text-align:center;color:#909399;font-size:12px;margin:0 30px;">Copyright © 2024 The Apache Software Foundation.<br> Apache and the Apache feather logo are trademarks of The Apache Software Foundation</p><p style="text-align:center;margin-top:10px;color:#909399;font-size:12px;margin:0 30px;"><strong>Have a question?</strong> Connect with us on QQ, WeChat, or Slack. <a href="https://github.com/apache/iotdb/issues/1995">Join the community</a> now.</p></footer></div><!--]--><!--]--><!--[--><!----><!--]--><!--]--></div>
<script type="module" src="/assets/app-DrPcRZG6.js" defer></script>
</body>
</html>