| <!doctype html> |
| <html lang="zh-CN" data-theme="light"> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width,initial-scale=1" /> |
| <meta name="generator" content="VuePress 2.0.0-rc.26" /> |
| <meta name="theme" content="VuePress Theme Hope 2.0.0-rc.99" /> |
| <style> |
| :root { |
| --vp-c-bg: #fff; |
| } |
| |
| [data-theme="dark"] { |
| --vp-c-bg: #1b1b1f; |
| } |
| |
| html, |
| body { |
| background: var(--vp-c-bg); |
| } |
| </style> |
| <script> |
| const userMode = localStorage.getItem("vuepress-theme-hope-scheme"); |
| const systemDarkMode = |
| window.matchMedia && |
| window.matchMedia("(prefers-color-scheme: dark)").matches; |
| |
| if (userMode === "dark" || (userMode !== "light" && systemDarkMode)) { |
| document.documentElement.setAttribute("data-theme", "dark"); |
| } |
| </script> |
| <script type="application/ld+json">{"@context":"https://schema.org","@type":"Article","headline":"流计算框架","image":["https://iotdb.apache.org/img/1706697228308.jpg","https://iotdb.apache.org/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png"],"dateModified":"2025-04-03T09:29:12.000Z","author":[]}</script><meta property="og:url" content="https://iotdb.apache.org/zh/UserGuide/dev-1.3/User-Manual/Streaming_apache.html"><meta property="og:site_name" content="IoTDB Website"><meta property="og:title" content="流计算框架"><meta property="og:description" content="流计算框架 IoTDB 流处理框架允许用户实现自定义的流处理逻辑,可以实现对存储引擎变更的监听和捕获、实现对变更数据的变形、实现对变形后数据的向外推送等逻辑。 我们将。一个流处理任务(Pipe)包含三个子任务: 抽取(Source) 处理(Process) 发送(Sink) 流处理框架允许用户使用 Java 语言自定义编写三个子任务的处理逻辑,通过类似..."><meta property="og:type" content="article"><meta property="og:image" content="https://iotdb.apache.org/img/1706697228308.jpg"><meta property="og:locale" content="zh-CN"><meta property="og:locale:alternate" content="en-US"><meta property="og:updated_time" content="2025-04-03T09:29:12.000Z"><meta property="article:modified_time" content="2025-04-03T09:29:12.000Z"><link rel="alternate" hreflang="en-us" href="https://iotdb.apache.org/UserGuide/dev-1.3/User-Manual/Streaming_apache.html"><script async src="https://widget.kapa.ai/kapa-widget.bundle.js" data-website-id="2d37bfdd-8d98-40ba-9223-9d4f81bfb327" data-project-name="Apache IoTDB" data-project-color="#FFFFFF" data-button-z-index="1999" data-button-padding="4px" data-button-border-radius="4px" data-button-image-height="24px" data-button-image-width="20px" data-button-text-color="#9E2878" data-project-logo="https://iotdb.apache.org/img/logo.svg" data-button-position-right="16px" data-button-position-bottom="8px" data-button-height="56px" data-button-width="48px" data-button-text="Ask" data-modal-override-open-selector="#custom-ask-ai-button" data-modal-image-width="150px" data-modal-title="AI Docs" data-modal-title-color="#9E2878" data-deep-thinking-button-active-bg-color="#F6F7F8" data-deep-thinking-button-active-text-color="#9E2878" data-deep-thinking-button-active-hover-text-color="#9E2878" data-modal-disclaimer="这是一个针对 Apache IoTDB 的定制化大型语言模型,能够访问所有[文档](iotdb.apache.org/docs/)、[GitHub 公开问题、PR 和自述文件](github.com/apache/iotdb)。 |
| |
| 如果您遇到 <Error in verifying browser for feedback submission. Captcha token could not be obtained.> 请确保您能够顺畅访问 Google 服务。" data-user-analytics-fingerprint-enabled="true" data-consent-required="true" data-consent-screen-disclaimer="点击<I agree, let's chat>即表示您同意按照 kapa.ai 的[隐私政策](https://www.kapa.ai/content/privacy-policy)使用 AI 助手。本服务使用 reCAPTCHA,您需要同意 Google 的[隐私政策](https://policies.google.com/privacy)和[服务条款](https://policies.google.com/terms)。继续操作即表示您明确同意 kapa.ai 和 Google 的隐私政策。" data-language="zh"></script><link rel="icon" href="/favicon.ico"><meta name="Description" content="Apache IoTDB: Time Series Database for IoT"><meta name="Keywords" content="TSDB, time series, time series database, IoTDB, IoT database, IoT data management, 时序数据库, 时间序列管理, IoTDB, 物联网数据库, 实时数据库, 物联网数据管理, 物联网数据"><meta name="baidu-site-verification" content="wfKETzB3OT"><meta name="google-site-verification" content="mZWAoRY0yj_HAr-s47zHCGHzx5Ju-RVm5wDbPnwQYFo"><script type="text/javascript">var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| _paq.push(["setDoNotTrack", true]); |
| _paq.push(["disableCookies"]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '56']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script><title>流计算框架 | IoTDB Website</title><meta name="description" content="流计算框架 IoTDB 流处理框架允许用户实现自定义的流处理逻辑,可以实现对存储引擎变更的监听和捕获、实现对变更数据的变形、实现对变形后数据的向外推送等逻辑。 我们将。一个流处理任务(Pipe)包含三个子任务: 抽取(Source) 处理(Process) 发送(Sink) 流处理框架允许用户使用 Java 语言自定义编写三个子任务的处理逻辑,通过类似..."> |
| <link rel="preload" href="/assets/style-JraSdRX4.css" as="style"><link rel="stylesheet" href="/assets/style-JraSdRX4.css"> |
| <link rel="modulepreload" href="/assets/app-DLe9S4ys.js"><link rel="modulepreload" href="/assets/Streaming_apache.html-CJwFdAhS.js"><link rel="modulepreload" href="/assets/1706697228308-D3eVnV2A.js"><link rel="modulepreload" href="/assets/状态迁移图-CzBV9LTE.js"> |
| |
| </head> |
| <body> |
| <div id="app"><!--[--><!--[--><!--[--><span tabindex="-1"></span><a href="#main-content" class="vp-skip-link sr-only">跳至主要內容</a><!--]--><!--[--><div class="theme-container external-link-icon has-toc" vp-container><!--[--><header id="navbar" class="vp-navbar" vp-navbar><div class="vp-navbar-start"><button type="button" class="vp-toggle-sidebar-button" title="Toggle Sidebar"><span class="icon"></span></button><!--[--><a class="route-link vp-brand" href="/zh/" aria-label="带我回家"><img class="vp-nav-logo" src="/logo.png" alt><!----><!----></a><!--]--></div><div class="vp-navbar-center"><!--[--><!--]--></div><div class="vp-navbar-end"><!--[--><!--[--><div id="docsearch-container" style="display:none;"></div><div class="docsearch-placeholder"><button type="button" aria-label="搜索文档" aria-keyshortcuts="Control+k" class="DocSearch DocSearch-Button"><span class="DocSearch-Button-Container"><svg width="20" height="20" viewBox="0 0 24 24" aria-hidden="true" class="DocSearch-Search-Icon"><circle cx="11" cy="11" r="8" stroke="currentColor" fill="none" stroke-width="1.4"></circle><path d="m21 21-4.3-4.3" stroke="currentColor" fill="none" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">搜索文档</span></span><span class="DocSearch-Button-Keys"><kbd class="DocSearch-Button-Key DocSearch-Button-Key--ctrl">Ctrl</kbd><kbd class="DocSearch-Button-Key">K</kbd></span></button></div><!--]--><div><button id="custom-ask-ai-button"> ✨ Ask AI </button></div><nav class="vp-nav-links"><div class="vp-nav-item hide-in-mobile"><div class="vp-dropdown-wrapper"><button type="button" class="vp-dropdown-title" aria-label="文档"><!--[--><!---->文档<!--]--><span class="arrow"></span><ul class="vp-dropdown"><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/UserGuide/latest/QuickStart/QuickStart_apache.html" aria-label="v2.0.x"><!---->v2.0.x<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/UserGuide/V1.3.x/QuickStart/QuickStart_apache.html" aria-label="v1.3.x"><!---->v1.3.x<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/UserGuide/V1.2.x/QuickStart/QuickStart.html" aria-label="v1.2.x"><!---->v1.2.x<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/UserGuide/V0.13.x/QuickStart/QuickStart.html" aria-label="v0.13.x"><!---->v0.13.x<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><a class="auto-link external-link" href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177051872" aria-label="系统设计" rel="noopener noreferrer" target="_blank"><!---->系统设计<!----></a></div><div class="vp-nav-item hide-in-mobile"><a class="route-link auto-link" href="/zh/Download/" aria-label="下载"><!---->下载<!----></a></div><div class="vp-nav-item hide-in-mobile"><div class="vp-dropdown-wrapper"><button type="button" class="vp-dropdown-title" aria-label="社区"><!--[--><!---->社区<!--]--><span class="arrow"></span><ul class="vp-dropdown"><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/Community/About.html" aria-label="关于社区"><!---->关于社区<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/Community/Development-Guide.html" aria-label="贡献指南"><!---->贡献指南<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/Community/Community-Partners.html" aria-label="社区伙伴"><!---->社区伙伴<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/Community/Feedback.html" aria-label="交流与反馈"><!---->交流与反馈<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/Community/Events-and-Reports.html" aria-label="活动与报告"><!---->活动与报告<!----></a></li><li class="vp-dropdown-item"><a class="route-link auto-link" href="/zh/Community/Committers.html" aria-label="Committers"><!---->Committers<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><div class="vp-dropdown-wrapper"><button type="button" class="vp-dropdown-title" aria-label="ASF"><!--[--><!---->ASF<!--]--><span class="arrow"></span><ul class="vp-dropdown"><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://www.apache.org/" aria-label="基金会" rel="noopener noreferrer" target="_blank"><!---->基金会<!----></a></li><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://www.apache.org/licenses/" aria-label="许可证" rel="noopener noreferrer" target="_blank"><!---->许可证<!----></a></li><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://www.apache.org/security/" aria-label="安全" rel="noopener noreferrer" target="_blank"><!---->安全<!----></a></li><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://www.apache.org/foundation/sponsorship.html" aria-label="赞助" rel="noopener noreferrer" target="_blank"><!---->赞助<!----></a></li><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://www.apache.org/foundation/thanks.html" aria-label="致谢" rel="noopener noreferrer" target="_blank"><!---->致谢<!----></a></li><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://www.apache.org/events/current-event" aria-label="活动" rel="noopener noreferrer" target="_blank"><!---->活动<!----></a></li><li class="vp-dropdown-item"><a class="auto-link external-link" href="https://privacy.apache.org/policies/privacy-policy-public.html" aria-label="隐私" rel="noopener noreferrer" target="_blank"><!---->隐私<!----></a></li></ul></button></div></div></nav><div class="vp-nav-item"><div class="vp-dropdown-wrapper"><button type="button" class="vp-dropdown-title" aria-label="选择语言"><!--[--><svg xmlns="http://www.w3.org/2000/svg" class="icon i18n-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="i18n icon" name="i18n" style="width:1rem;height:1rem;vertical-align:middle;"><path d="M379.392 460.8 494.08 575.488l-42.496 102.4L307.2 532.48 138.24 701.44l-71.68-72.704L234.496 460.8l-45.056-45.056c-27.136-27.136-51.2-66.56-66.56-108.544h112.64c7.68 14.336 16.896 27.136 26.112 35.84l45.568 46.08 45.056-45.056C382.976 312.32 409.6 247.808 409.6 204.8H0V102.4h256V0h102.4v102.4h256v102.4H512c0 70.144-37.888 161.28-87.04 210.944L378.88 460.8zM576 870.4 512 1024H409.6l256-614.4H768l256 614.4H921.6l-64-153.6H576zM618.496 768h196.608L716.8 532.48 618.496 768z"></path></svg><!--]--><span class="arrow"></span><ul class="vp-dropdown"><li class="vp-dropdown-item"><a class="route-link auto-link" href="/UserGuide/dev-1.3/User-Manual/Streaming_apache.html" aria-label="English"><!---->English<!----></a></li><li class="vp-dropdown-item"><a class="route-link route-link-active auto-link" href="/zh/UserGuide/dev-1.3/User-Manual/Streaming_apache.html" aria-label="简体中文"><!---->简体中文<!----></a></li></ul></button></div></div><div class="vp-nav-item hide-in-mobile"><button type="button" class="vp-color-mode-switch" id="color-mode-switch"><svg xmlns="http://www.w3.org/2000/svg" class="icon auto-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="auto icon" name="auto" style="display:none;"><path d="M512 992C246.92 992 32 777.08 32 512S246.92 32 512 32s480 214.92 480 480-214.92 480-480 480zm0-840c-198.78 0-360 161.22-360 360 0 198.84 161.22 360 360 360s360-161.16 360-360c0-198.78-161.22-360-360-360zm0 660V212c165.72 0 300 134.34 300 300 0 165.72-134.28 300-300 300z"></path></svg><svg xmlns="http://www.w3.org/2000/svg" class="icon dark-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="dark icon" name="dark" style="display:none;"><path d="M524.8 938.667h-4.267a439.893 439.893 0 0 1-313.173-134.4 446.293 446.293 0 0 1-11.093-597.334A432.213 432.213 0 0 1 366.933 90.027a42.667 42.667 0 0 1 45.227 9.386 42.667 42.667 0 0 1 10.24 42.667 358.4 358.4 0 0 0 82.773 375.893 361.387 361.387 0 0 0 376.747 82.774 42.667 42.667 0 0 1 54.187 55.04 433.493 433.493 0 0 1-99.84 154.88 438.613 438.613 0 0 1-311.467 128z"></path></svg><svg xmlns="http://www.w3.org/2000/svg" class="icon light-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="light icon" name="light" style="display:block;"><path d="M952 552h-80a40 40 0 0 1 0-80h80a40 40 0 0 1 0 80zM801.88 280.08a41 41 0 0 1-57.96-57.96l57.96-58a41.04 41.04 0 0 1 58 58l-58 57.96zM512 752a240 240 0 1 1 0-480 240 240 0 0 1 0 480zm0-560a40 40 0 0 1-40-40V72a40 40 0 0 1 80 0v80a40 40 0 0 1-40 40zm-289.88 88.08-58-57.96a41.04 41.04 0 0 1 58-58l57.96 58a41 41 0 0 1-57.96 57.96zM192 512a40 40 0 0 1-40 40H72a40 40 0 0 1 0-80h80a40 40 0 0 1 40 40zm30.12 231.92a41 41 0 0 1 57.96 57.96l-57.96 58a41.04 41.04 0 0 1-58-58l58-57.96zM512 832a40 40 0 0 1 40 40v80a40 40 0 0 1-80 0v-80a40 40 0 0 1 40-40zm289.88-88.08 58 57.96a41.04 41.04 0 0 1-58 58l-57.96-58a41 41 0 0 1 57.96-57.96z"></path></svg></button></div><div class="vp-nav-item vp-action"><a class="vp-action-link" href="https://github.com/apache/iotdb" target="_blank" rel="noopener noreferrer" aria-label="GitHub"><svg xmlns="http://www.w3.org/2000/svg" class="icon github-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="github icon" name="github" style="width:1.25rem;height:1.25rem;vertical-align:middle;"><path d="M511.957 21.333C241.024 21.333 21.333 240.981 21.333 512c0 216.832 140.544 400.725 335.574 465.664 24.49 4.395 32.256-10.07 32.256-23.083 0-11.69.256-44.245 0-85.205-136.448 29.61-164.736-64.64-164.736-64.64-22.315-56.704-54.4-71.765-54.4-71.765-44.587-30.464 3.285-29.824 3.285-29.824 49.195 3.413 75.179 50.517 75.179 50.517 43.776 75.008 114.816 53.333 142.762 40.79 4.523-31.66 17.152-53.377 31.19-65.537-108.971-12.458-223.488-54.485-223.488-242.602 0-53.547 19.114-97.323 50.517-131.67-5.035-12.33-21.93-62.293 4.779-129.834 0 0 41.258-13.184 134.912 50.346a469.803 469.803 0 0 1 122.88-16.554c41.642.213 83.626 5.632 122.88 16.554 93.653-63.488 134.784-50.346 134.784-50.346 26.752 67.541 9.898 117.504 4.864 129.834 31.402 34.347 50.474 78.123 50.474 131.67 0 188.586-114.73 230.016-224.042 242.09 17.578 15.232 33.578 44.672 33.578 90.454v135.85c0 13.142 7.936 27.606 32.854 22.87C862.25 912.597 1002.667 728.747 1002.667 512c0-271.019-219.648-490.667-490.71-490.667z"></path></svg></a></div><!--]--><button type="button" class="vp-toggle-navbar-button" aria-label="Toggle Navbar" aria-expanded="false" aria-controls="nav-screen"><span><span class="vp-top"></span><span class="vp-middle"></span><span class="vp-bottom"></span></span></button></div></header><!----><!--]--><!----><div class="toggle-sidebar-wrapper"><span class="arrow start"></span></div><aside id="sidebar" class="vp-sidebar" vp-sidebar><!--[--><p class="vp-sidebar-header iotdb-sidebar-header"><span class="vp-sidebar-title"></span></p><!----><!--]--><ul class="vp-sidebar-links"></ul><!----></aside><!--[--><main id="main-content" class="vp-page"><!--[--><!----><!----><nav class="vp-breadcrumb disable"></nav><div class="vp-page-title"><h1><!---->流计算框架</h1><div class="page-info"><!----><!----><span class="page-date-info" aria-label="写作日期📅" data-balloon-pos="up"><svg xmlns="http://www.w3.org/2000/svg" class="icon calendar-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="calendar icon" name="calendar"><path d="M716.4 110.137c0-18.753-14.72-33.473-33.472-33.473-18.753 0-33.473 14.72-33.473 33.473v33.473h66.993v-33.473zm-334.87 0c0-18.753-14.72-33.473-33.473-33.473s-33.52 14.72-33.52 33.473v33.473h66.993v-33.473zm468.81 33.52H716.4v100.465c0 18.753-14.72 33.473-33.472 33.473a33.145 33.145 0 01-33.473-33.473V143.657H381.53v100.465c0 18.753-14.72 33.473-33.473 33.473a33.145 33.145 0 01-33.473-33.473V143.657H180.6A134.314 134.314 0 0046.66 277.595v535.756A134.314 134.314 0 00180.6 947.289h669.74a134.36 134.36 0 00133.94-133.938V277.595a134.314 134.314 0 00-133.94-133.938zm33.473 267.877H147.126a33.145 33.145 0 01-33.473-33.473c0-18.752 14.72-33.473 33.473-33.473h736.687c18.752 0 33.472 14.72 33.472 33.473a33.145 33.145 0 01-33.472 33.473z"></path></svg><span data-allow-mismatch="text">2023/7/13</span><meta property="datePublished" content="2023-07-13T09:54:11.000Z"></span><!----><span class="page-reading-time-info" aria-label="阅读时间⌛" data-balloon-pos="up"><svg xmlns="http://www.w3.org/2000/svg" class="icon timer-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="timer icon" name="timer"><path d="M799.387 122.15c4.402-2.978 7.38-7.897 7.38-13.463v-1.165c0-8.933-7.38-16.312-16.312-16.312H256.33c-8.933 0-16.311 7.38-16.311 16.312v1.165c0 5.825 2.977 10.874 7.637 13.592 4.143 194.44 97.22 354.963 220.201 392.763-122.204 37.542-214.893 196.511-220.2 389.397-4.661 5.049-7.638 11.651-7.638 19.03v5.825h566.49v-5.825c0-7.379-2.849-13.981-7.509-18.9-5.049-193.016-97.867-351.985-220.2-389.527 123.24-37.67 216.446-198.453 220.588-392.892zM531.16 450.445v352.632c117.674 1.553 211.787 40.778 211.787 88.676H304.097c0-48.286 95.149-87.382 213.728-88.676V450.445c-93.077-3.107-167.901-81.297-167.901-177.093 0-8.803 6.99-15.793 15.793-15.793 8.803 0 15.794 6.99 15.794 15.793 0 80.261 63.69 145.635 142.01 145.635s142.011-65.374 142.011-145.635c0-8.803 6.99-15.793 15.794-15.793s15.793 6.99 15.793 15.793c0 95.019-73.789 172.82-165.96 177.093z"></path></svg><span>大约 23 分钟</span><meta property="timeRequired" content="PT23M"></span><!----><!----></div><hr></div><!----><div class="" vp-content><!----><div id="markdown-content"><h1 id="流计算框架" tabindex="-1"><a class="header-anchor" href="#流计算框架"><span>流计算框架</span></a></h1><p>IoTDB 流处理框架允许用户实现自定义的流处理逻辑,可以实现对存储引擎变更的监听和捕获、实现对变更数据的变形、实现对变形后数据的向外推送等逻辑。</p><p>我们将<!---->。一个流处理任务(Pipe)包含三个子任务:</p><ul><li>抽取(Source)</li><li>处理(Process)</li><li>发送(Sink)</li></ul><p>流处理框架允许用户使用 Java 语言自定义编写三个子任务的处理逻辑,通过类似 UDF 的方式处理数据。<br> 在一个 Pipe 中,上述的三个子任务分别由三种插件执行实现,数据会依次经过这三个插件进行处理:<br> Pipe Source 用于抽取数据,Pipe Processor 用于处理数据,Pipe Sink 用于发送数据,最终数据将被发至外部系统。</p><p><strong>Pipe 任务的模型如下:</strong></p><figure><img src="/img/1706697228308.jpg" alt="任务模型图" tabindex="0" loading="lazy"><figcaption>任务模型图</figcaption></figure><p>描述一个数据流处理任务,本质就是描述 Pipe Source、Pipe Processor 和 Pipe Sink 插件的属性。<br> 用户可以通过 SQL 语句声明式地配置三个子任务的具体属性,通过组合不同的属性,实现灵活的数据 ETL 能力。</p><p>利用流处理框架,可以搭建完整的数据链路来满足端<em>边云同步、异地灾备、读写负载分库</em>等需求。</p><h2 id="自定义流处理插件开发" tabindex="-1"><a class="header-anchor" href="#自定义流处理插件开发"><span>自定义流处理插件开发</span></a></h2><h3 id="编程开发依赖" tabindex="-1"><a class="header-anchor" href="#编程开发依赖"><span>编程开发依赖</span></a></h3><p>推荐采用 maven 构建项目,在<code>pom.xml</code>中添加以下依赖。请注意选择和 IoTDB 服务器版本相同的依赖版本。</p><div class="language-xml line-numbers-mode" data-highlighter="shiki" data-ext="xml" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-xml"><span class="line"><span style="color:#ABB2BF;"><</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">>org.apache.iotdb</</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">>pipe-api</</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">>1.3.1</</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">scope</span><span style="color:#ABB2BF;">>provided</</span><span style="color:#E06C75;">scope</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"></</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="事件驱动编程模型" tabindex="-1"><a class="header-anchor" href="#事件驱动编程模型"><span>事件驱动编程模型</span></a></h3><p>流处理插件的用户编程接口设计,参考了事件驱动编程模型的通用设计理念。事件(Event)是用户编程接口中的数据抽象,而编程接口与具体的执行方式解耦,只需要专注于描述事件(数据)到达系统后,系统期望的处理方式即可。</p><p>在流处理插件的用户编程接口中,事件是数据库数据写入操作的抽象。事件由单机流处理引擎捕获,按照流处理三个阶段的流程,依次传递至 PipeSource 插件,PipeProcessor 插件和 PipeSink 插件,并依次在三个插件中触发用户逻辑的执行。</p><p>为了兼顾端侧低负载场景下的流处理低延迟和端侧高负载场景下的流处理高吞吐,流处理引擎会动态地在操作日志和数据文件中选择处理对象,因此,流处理的用户编程接口要求用户提供下列两类事件的处理逻辑:操作日志写入事件 TabletInsertionEvent 和数据文件写入事件 TsFileInsertionEvent。</p><h4 id="操作日志写入事件-tabletinsertionevent" tabindex="-1"><a class="header-anchor" href="#操作日志写入事件-tabletinsertionevent"><span><strong>操作日志写入事件(TabletInsertionEvent)</strong></span></a></h4><p>操作日志写入事件(TabletInsertionEvent)是对用户写入请求的高层数据抽象,它通过提供统一的操作接口,为用户提供了操纵写入请求底层数据的能力。</p><p>对于不同的数据库部署方式,操作日志写入事件对应的底层存储结构是不一样的。对于单机部署的场景,操作日志写入事件是对写前日志(WAL)条目的封装;对于分布式部署的场景,操作日志写入事件是对单个节点共识协议操作日志条目的封装。</p><p>对于数据库不同写入请求接口生成的写入操作,操作日志写入事件对应的请求结构体的数据结构也是不一样的。IoTDB 提供了 InsertRecord、InsertRecords、InsertTablet、InsertTablets 等众多的写入接口,每一种写入请求都使用了完全不同的序列化方式,生成的二进制条目也不尽相同。</p><p>操作日志写入事件的存在,为用户提供了一种统一的数据操作视图,它屏蔽了底层数据结构的实现差异,极大地降低了用户的编程门槛,提升了功能的易用性。</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#7F848E;font-style:italic;">/** TabletInsertionEvent is used to define the event of data insertion. */</span></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> interface</span><span style="color:#E5C07B;"> TabletInsertionEvent</span><span style="color:#C678DD;"> extends</span><span style="color:#E5C07B;"> Event</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * The consumer processes the data row by row and collects the results by RowCollector.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@return</span><span style="color:#7F848E;font-style:italic;"> {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * results collected by the RowCollector</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Iterable</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">TabletInsertionEvent</span><span style="color:#ABB2BF;">></span><span style="color:#61AFEF;"> processRowByRow</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">BiConsumer</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Row</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">RowCollector</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;font-style:italic;">consumer</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * The consumer processes the Tablet directly and collects the results by RowCollector.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@return</span><span style="color:#7F848E;font-style:italic;"> {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * results collected by the RowCollector</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Iterable</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">TabletInsertionEvent</span><span style="color:#ABB2BF;">></span><span style="color:#61AFEF;"> processTablet</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">BiConsumer</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Tablet</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">RowCollector</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;font-style:italic;">consumer</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="数据文件写入事件-tsfileinsertionevent" tabindex="-1"><a class="header-anchor" href="#数据文件写入事件-tsfileinsertionevent"><span><strong>数据文件写入事件(TsFileInsertionEvent)</strong></span></a></h4><p>数据文件写入事件(TsFileInsertionEvent) 是对数据库文件落盘操作的高层抽象,它是若干操作日志写入事件(TabletInsertionEvent)的数据集合。</p><p>IoTDB 的存储引擎是 LSM 结构的。数据写入时会先将写入操作落盘到日志结构的文件里,同时将写入数据保存在内存里。当内存达到控制上限,则会触发刷盘行为,即将内存中的数据转换为数据库文件,同时删除之前预写的操作日志。当内存中的数据转换为数据库文件中的数据时,会经过编码压缩和通用压缩两次压缩处理,因此数据库文件的数据相比内存中的原始数据占用的空间更少。</p><p>在极端的网络情况下,直接传输数据文件相比传输数据写入的操作要更加经济,它会占用更低的网络带宽,能实现更快的传输速度。当然,天下没有免费的午餐,对文件中的数据进行计算处理,相比直接对内存中的数据进行计算处理时,需要额外付出文件 I/O 的代价。但是,正是磁盘数据文件和内存写入操作两种结构各有优劣的存在,给了系统做动态权衡调整的机会,也正是基于这样的观察,插件的事件模型中才引入了数据文件写入事件。</p><p>综上,数据文件写入事件出现在流处理插件的事件流中,存在下面两种情况:</p><p>(1)历史数据抽取:一个流处理任务开始前,所有已经落盘的写入数据都会以 TsFile 的形式存在。一个流处理任务开始后,采集历史数据时,历史数据将以 TsFileInsertionEvent 作为抽象;</p><p>(2)实时数据抽取:一个流处理任务进行时,当数据流中实时处理操作日志写入事件的速度慢于写入请求速度一定进度之后,未来得及处理的操作日志写入事件会被被持久化至磁盘,以 TsFile 的形式存在,这一些数据被流处理引擎抽取到后,会以 TsFileInsertionEvent 作为抽象。</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#7F848E;font-style:italic;">/**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * which is compressed and encoded, and requires IO cost for computational processing.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> interface</span><span style="color:#E5C07B;"> TsFileInsertionEvent</span><span style="color:#C678DD;"> extends</span><span style="color:#E5C07B;"> Event</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@return</span><span style="color:#7F848E;font-style:italic;"> {@code Iterable<TabletInsertionEvent>} the list of TabletInsertionEvent</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Iterable</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">TabletInsertionEvent</span><span style="color:#ABB2BF;">></span><span style="color:#61AFEF;"> toTabletInsertionEvents</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="自定义流处理插件编程接口定义" tabindex="-1"><a class="header-anchor" href="#自定义流处理插件编程接口定义"><span>自定义流处理插件编程接口定义</span></a></h3><p>基于自定义流处理插件编程接口,用户可以轻松编写数据抽取插件、数据处理插件和数据发送插件,从而使得流处理功能灵活适配各种工业场景。</p><h4 id="数据抽取插件接口" tabindex="-1"><a class="header-anchor" href="#数据抽取插件接口"><span>数据抽取插件接口</span></a></h4><p>数据抽取是流处理数据从数据抽取到数据发送三阶段的第一阶段。数据抽取插件(PipeSource)是流处理引擎和存储引擎的桥梁,它通过监听存储引擎的行为,<br> 捕获各种数据写入事件。</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#7F848E;font-style:italic;">/**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSource</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>PipeSource is responsible for capturing events from sources.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>Various data sources can be supported by implementing different PipeSource classes.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>The lifecycle of a PipeSource is as follows:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>When a collaboration task is created, the KV pairs of `WITH SOURCE` clause in SQL are</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * parsed and the validation method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">validate(PipeParameterValidator)</span><span style="color:#7F848E;font-style:italic;">} will</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * be called to validate the parameters.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Before the collaboration task starts, the method {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} will be called to</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * config the runtime behavior of the PipeSource.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Then the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">start()</span><span style="color:#7F848E;font-style:italic;">} will be called to start the PipeSource.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>While the collaboration task is in progress, the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">supply()</span><span style="color:#7F848E;font-style:italic;">} will be</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * called to capture events from sources and then the events will be passed to the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>The method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">close()</span><span style="color:#7F848E;font-style:italic;">} will be called when the collaboration task is</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * cancelled (the `DROP PIPE` command is executed).</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> interface</span><span style="color:#E5C07B;"> PipeSource</span><span style="color:#C678DD;"> extends</span><span style="color:#E5C07B;"> PipePlugin</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> validator</span><span style="color:#7F848E;font-style:italic;"> the validator used to validate {@link PipeParameters}</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> if any parameter is not valid</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> validate</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">PipeParameterValidator</span><span style="color:#E06C75;font-style:italic;"> validator</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is mainly used to customize PipeSource. In this method, the user can do the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * following things:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Use PipeParameters to parse key-value pair attributes entered by the user.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Set the running configurations in PipeSourceRuntimeConfiguration.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>This method is called after the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">validate(PipeParameterValidator)</span><span style="color:#7F848E;font-style:italic;">}</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> parameters</span><span style="color:#7F848E;font-style:italic;"> used to parse the input parameters entered by the user</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> configuration</span><span style="color:#7F848E;font-style:italic;"> used to set the required properties of the running PipeSource</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> customize</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">PipeParameters</span><span style="color:#E06C75;font-style:italic;"> parameters</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">PipeSourceRuntimeConfiguration</span><span style="color:#E06C75;font-style:italic;"> configuration</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * Start the Source. After this method is called, events should be ready to be supplied by</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">supply()</span><span style="color:#7F848E;font-style:italic;">}. This method is called after {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> start</span><span style="color:#ABB2BF;">()</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * Supply single event from the Source and the caller will send the event to the processor.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is called after {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSource</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">start()</span><span style="color:#7F848E;font-style:italic;">} is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@return</span><span style="color:#7F848E;font-style:italic;"> the event to be supplied. the event may be null if the Source has no more events at</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * the moment, but the Source is still running for more events.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Event</span><span style="color:#61AFEF;"> supply</span><span style="color:#ABB2BF;">()</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="数据处理插件接口" tabindex="-1"><a class="header-anchor" href="#数据处理插件接口"><span>数据处理插件接口</span></a></h4><p>数据处理是流处理数据从数据抽取到数据发送三阶段的第二阶段。数据处理插件(PipeProcessor)主要用于过滤和转换由数据抽取插件(PipeSource)捕获的<br> 各种事件。</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#7F848E;font-style:italic;">/**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>PipeProcessor is used to filter and transform the Event formed by the PipeSource.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>The lifecycle of a PipeProcessor is as follows:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * parsed and the validation method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeProcessor</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">validate(PipeParameterValidator)</span><span style="color:#7F848E;font-style:italic;">}</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * will be called to validate the parameters.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Before the collaboration task starts, the method {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * to config the runtime behavior of the PipeProcessor.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>While the collaboration task is in progress:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>PipeSource captures the events and wraps them into three types of Event instances.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>PipeProcessor processes the event and then passes them to the PipeSink. The</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * following 3 methods will be called: {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#process(Event, EventCollector)}.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>PipeSink serializes the events into binaries and send them to sinks.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#close() } method will be called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> interface</span><span style="color:#E5C07B;"> PipeProcessor</span><span style="color:#C678DD;"> extends</span><span style="color:#E5C07B;"> PipePlugin</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> validator</span><span style="color:#7F848E;font-style:italic;"> the validator used to validate {@link PipeParameters}</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> if any parameter is not valid</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> validate</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">PipeParameterValidator</span><span style="color:#E06C75;font-style:italic;"> validator</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is mainly used to customize PipeProcessor. In this method, the user can do the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * following things:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Use PipeParameters to parse key-value pair attributes entered by the user.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Set the running configurations in PipeProcessorRuntimeConfiguration.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>This method is called after the method {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * events processing.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> parameters</span><span style="color:#7F848E;font-style:italic;"> used to parse the input parameters entered by the user</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> configuration</span><span style="color:#7F848E;font-style:italic;"> used to set the required properties of the running PipeProcessor</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> customize</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">PipeParameters</span><span style="color:#E06C75;font-style:italic;"> parameters</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">PipeProcessorRuntimeConfiguration</span><span style="color:#E06C75;font-style:italic;"> configuration</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is called to process the TabletInsertionEvent.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> tabletInsertionEvent</span><span style="color:#7F848E;font-style:italic;"> TabletInsertionEvent to be processed</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> eventCollector</span><span style="color:#7F848E;font-style:italic;"> used to collect result events after processing</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> process</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TabletInsertionEvent</span><span style="color:#E06C75;font-style:italic;"> tabletInsertionEvent</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">EventCollector</span><span style="color:#E06C75;font-style:italic;"> eventCollector</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is called to process the TsFileInsertionEvent.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> tsFileInsertionEvent</span><span style="color:#7F848E;font-style:italic;"> TsFileInsertionEvent to be processed</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> eventCollector</span><span style="color:#7F848E;font-style:italic;"> used to collect result events after processing</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> default</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> process</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TsFileInsertionEvent</span><span style="color:#E06C75;font-style:italic;"> tsFileInsertionEvent</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">EventCollector</span><span style="color:#E06C75;font-style:italic;"> eventCollector</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> TabletInsertionEvent</span><span style="color:#E06C75;"> tabletInsertionEvent</span><span style="color:#C678DD;"> :</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tsFileInsertionEvent</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">toTabletInsertionEvents</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#61AFEF;"> process</span><span style="color:#ABB2BF;">(tabletInsertionEvent, eventCollector);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is called to process the Event.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> event</span><span style="color:#7F848E;font-style:italic;"> Event to be processed</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> eventCollector</span><span style="color:#7F848E;font-style:italic;"> used to collect result events after processing</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> process</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">Event</span><span style="color:#E06C75;font-style:italic;"> event</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">EventCollector</span><span style="color:#E06C75;font-style:italic;"> eventCollector</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="数据发送插件接口" tabindex="-1"><a class="header-anchor" href="#数据发送插件接口"><span>数据发送插件接口</span></a></h4><p>数据发送是流处理数据从数据抽取到数据发送三阶段的第三阶段。数据发送插件(PipeSink)主要用于发送经由数据处理插件(PipeProcessor)处理过后的<br> 各种事件,它作为流处理框架的网络实现层,接口上应允许接入多种实时通信协议和多种连接器。</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#7F848E;font-style:italic;">/**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSink</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>PipeSink is responsible for sending events to sinks.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>Various network protocols can be supported by implementing different PipeSink classes.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>The lifecycle of a PipeSink is as follows:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>When a collaboration task is created, the KV pairs of `WITH SINK` clause in SQL are</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * parsed and the validation method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">validate(PipeParameterValidator)</span><span style="color:#7F848E;font-style:italic;">} will be</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * called to validate the parameters.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Before the collaboration task starts, the method {@link PipeSink#customize(PipeParameters,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSinkRuntimeConfiguration)} will be called to config the runtime behavior of the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSink and the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">handshake()</span><span style="color:#7F848E;font-style:italic;">} will be called to create a connection</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * with sink.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>While the collaboration task is in progress:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>PipeSource captures the events and wraps them into three types of Event instances.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>PipeProcessor processes the event and then passes them to the PipeSink.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>PipeSink serializes the events into binaries and send them to sinks. The following 3</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * methods will be called: {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">transfer(TabletInsertionEvent)</span><span style="color:#7F848E;font-style:italic;">}, {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSink#transfer(TsFileInsertionEvent)} and {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">transfer(Event)</span><span style="color:#7F848E;font-style:italic;">}.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSink#close() } method will be called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>In addition, the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">heartbeat()</span><span style="color:#7F848E;font-style:italic;">} will be called periodically to check</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * whether the connection with sink is still alive. The method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">handshake()</span><span style="color:#7F848E;font-style:italic;">} will be</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * called to create a new connection with the sink when the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">heartbeat()</span><span style="color:#7F848E;font-style:italic;">}</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * throws exceptions.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> interface</span><span style="color:#E5C07B;"> PipeSink</span><span style="color:#C678DD;"> extends</span><span style="color:#E5C07B;"> PipePlugin</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> validator</span><span style="color:#7F848E;font-style:italic;"> the validator used to validate {@link PipeParameters}</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> if any parameter is not valid</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> validate</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">PipeParameterValidator</span><span style="color:#E06C75;font-style:italic;"> validator</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is mainly used to customize PipeSink. In this method, the user can do the following</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * things:</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Use PipeParameters to parse key-value pair attributes entered by the user.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <li>Set the running configurations in PipeSinkRuntimeConfiguration.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </ul></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * <p>This method is called after the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">validate(PipeParameterValidator)</span><span style="color:#7F848E;font-style:italic;">} is</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * called and before the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">handshake()</span><span style="color:#7F848E;font-style:italic;">} is called.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> parameters</span><span style="color:#7F848E;font-style:italic;"> used to parse the input parameters entered by the user</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> configuration</span><span style="color:#7F848E;font-style:italic;"> used to set the required properties of the running PipeSink</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> customize</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">PipeParameters</span><span style="color:#E06C75;font-style:italic;"> parameters</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">PipeSinkRuntimeConfiguration</span><span style="color:#E06C75;font-style:italic;"> configuration</span><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is used to create a connection with sink. This method will be called after the</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">customize(PipeParameters, PipeSinkRuntimeConfiguration)</span><span style="color:#7F848E;font-style:italic;">} is called or</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * will be called when the method {</span><span style="color:#C678DD;font-style:italic;">@link</span><span style="color:#E5C07B;font-style:italic;"> PipeSink</span><span style="color:#7F848E;font-style:italic;">#</span><span style="color:#E06C75;font-style:italic;">heartbeat()</span><span style="color:#7F848E;font-style:italic;">} throws exceptions.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> if the connection is failed to be created</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> handshake</span><span style="color:#ABB2BF;">()</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method will be called periodically to check whether the connection with sink is still</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * alive.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> if the connection dies</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> heartbeat</span><span style="color:#ABB2BF;">()</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is used to transfer the TabletInsertionEvent.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> tabletInsertionEvent</span><span style="color:#7F848E;font-style:italic;"> TabletInsertionEvent to be transferred</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> PipeConnectionException</span><span style="color:#7F848E;font-style:italic;"> if the connection is broken</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> transfer</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TabletInsertionEvent</span><span style="color:#E06C75;font-style:italic;"> tabletInsertionEvent</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is used to transfer the TsFileInsertionEvent.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> tsFileInsertionEvent</span><span style="color:#7F848E;font-style:italic;"> TsFileInsertionEvent to be transferred</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> PipeConnectionException</span><span style="color:#7F848E;font-style:italic;"> if the connection is broken</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> default</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> transfer</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TsFileInsertionEvent</span><span style="color:#E06C75;font-style:italic;"> tsFileInsertionEvent</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> TabletInsertionEvent</span><span style="color:#E06C75;"> tabletInsertionEvent</span><span style="color:#C678DD;"> :</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tsFileInsertionEvent</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">toTabletInsertionEvents</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#61AFEF;"> transfer</span><span style="color:#ABB2BF;">(tabletInsertionEvent);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> } </span><span style="color:#C678DD;">finally</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tsFileInsertionEvent</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">close</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> /**</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * This method is used to transfer the generic events, including HeartbeatEvent.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> *</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@param</span><span style="color:#E06C75;font-style:italic;"> event</span><span style="color:#7F848E;font-style:italic;"> Event to be transferred</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> PipeConnectionException</span><span style="color:#7F848E;font-style:italic;"> if the connection is broken</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> * </span><span style="color:#C678DD;font-style:italic;">@throws</span><span style="color:#E5C07B;font-style:italic;"> Exception</span><span style="color:#7F848E;font-style:italic;"> the user can throw errors if necessary</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> */</span></span> |
| <span class="line"><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> transfer</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">Event</span><span style="color:#E06C75;font-style:italic;"> event</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> Exception</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="自定义流处理插件管理" tabindex="-1"><a class="header-anchor" href="#自定义流处理插件管理"><span>自定义流处理插件管理</span></a></h2><p>为了保证用户自定义插件在实际生产中的灵活性和易用性,系统还需要提供对插件进行动态统一管理的能力。<br> 本章节介绍的流处理插件管理语句提供了对插件进行动态统一管理的入口。</p><h3 id="加载插件语句" tabindex="-1"><a class="header-anchor" href="#加载插件语句"><span>加载插件语句</span></a></h3><p>在 IoTDB 中,若要在系统中动态载入一个用户自定义插件,则首先需要基于 PipeSource、 PipeProcessor 或者 PipeSink 实现一个具体的插件类,<br> 然后需要将插件类编译打包成 jar 可执行文件,最后使用加载插件的管理语句将插件载入 IoTDB。</p><p>加载插件的管理语句的语法如图所示。</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPEPLUGIN </span><span style="color:#E06C75;">[IF NOT EXISTS]</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">别名</span><span style="color:#56B6C2;">></span></span> |
| <span class="line"><span style="color:#C678DD;">AS</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">全类名</span><span style="color:#56B6C2;">></span></span> |
| <span class="line"><span style="color:#C678DD;">USING</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">JAR 包的 URI</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>IF NOT EXISTS 语义</strong>:用于创建操作中,确保当指定 Pipe Plugin 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe Plugin 而导致报错。</p><p>例如,用户实现了一个全类名为 edu.tsinghua.iotdb.pipe.ExampleProcessor 的数据处理插件,<br> 打包后的 jar 资源包存放到了 <a href="https://example.com:8080/iotdb/pipe-plugin.jar" target="_blank" rel="noopener noreferrer">https://example.com:8080/iotdb/pipe-plugin.jar</a> 上,用户希望在流处理引擎中使用这个插件,<br> 将插件标记为 example。那么,这个数据处理插件的创建语句如图所示。</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPEPLUGIN </span><span style="color:#C678DD;">IF</span><span style="color:#C678DD;"> NOT</span><span style="color:#C678DD;"> EXISTS</span><span style="color:#ABB2BF;"> example</span></span> |
| <span class="line"><span style="color:#C678DD;">AS</span><span style="color:#98C379;"> 'edu.tsinghua.iotdb.pipe.ExampleProcessor'</span></span> |
| <span class="line"><span style="color:#C678DD;">USING</span><span style="color:#ABB2BF;"> URI </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">https://</span><span style="color:#D19A66;">example</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">com</span><span style="color:#ABB2BF;">:</span><span style="color:#D19A66;">8080</span><span style="color:#ABB2BF;">/iotdb/pipe-</span><span style="color:#D19A66;">plugin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">jar</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="删除插件语句" tabindex="-1"><a class="header-anchor" href="#删除插件语句"><span>删除插件语句</span></a></h3><p>当用户不再想使用一个插件,需要将插件从系统中卸载时,可以使用如图所示的删除插件语句。</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">DROP</span><span style="color:#ABB2BF;"> PIPEPLUGIN </span><span style="color:#E06C75;">[IF EXISTS]</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">别名</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p><strong>IF EXISTS 语义</strong>:用于删除操作中,确保当指定 Pipe Plugin 存在时,执行删除命令,防止因尝试删除不存在的 Pipe Plugin 而导致报错。</p><h3 id="查看插件语句" tabindex="-1"><a class="header-anchor" href="#查看插件语句"><span>查看插件语句</span></a></h3><p>用户也可以按需查看系统中的插件。查看插件的语句如图所示。</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#ABB2BF;">SHOW PIPEPLUGINS</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h2 id="系统预置的流处理插件" tabindex="-1"><a class="header-anchor" href="#系统预置的流处理插件"><span>系统预置的流处理插件</span></a></h2><h3 id="预置-source-插件" tabindex="-1"><a class="header-anchor" href="#预置-source-插件"><span>预置 source 插件</span></a></h3><h4 id="iotdb-source" tabindex="-1"><a class="header-anchor" href="#iotdb-source"><span>iotdb-source</span></a></h4><p>作用:抽取 IoTDB 内部的历史或实时数据进入 pipe。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>source</td><td>iotdb-source</td><td>String: iotdb-source</td><td>required</td></tr><tr><td>source.pattern</td><td>用于筛选时间序列的路径前缀</td><td>String: 任意的时间序列前缀</td><td>optional: root</td></tr><tr><td>source.history.start-time</td><td>抽取的历史数据的开始 event time,包含 start-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>source.history.end-time</td><td>抽取的历史数据的结束 event time,包含 end-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>start-time(V1.3.1+)</td><td>start of synchronizing all data event time,including start-time. Will disable "history.start-time" "history.end-time" if configured</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>end-time(V1.3.1+)</td><td>end of synchronizing all data event time,including end-time. Will disable "history.start-time" "history.end-time" if configured</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr></tbody></table><blockquote><p>🚫 <strong>source.pattern 参数说明</strong></p><ul><li><p>Pattern 需用反引号修饰不合法字符或者是不合法路径节点,例如如果希望筛选 root.`a@b` 或者 root.`123`,应设置 pattern 为 root.`a@b` 或者 root.`123`(具体参考 <a href="https://iotdb.apache.org/zh/Download/#_1-0-%E7%89%88%E6%9C%AC%E4%B8%8D%E5%85%BC%E5%AE%B9%E7%9A%84%E8%AF%AD%E6%B3%95%E8%AF%A6%E7%BB%86%E8%AF%B4%E6%98%8E" target="_blank" rel="noopener noreferrer">单双引号和反引号的使用时机</a>)</p></li><li><p>在底层实现中,当检测到 pattern 为 root(默认值)时,抽取效率较高,其他任意格式都将降低性能</p></li><li><p>路径前缀不需要能够构成完整的路径。例如,当创建一个包含参数为 'source.pattern'='root.aligned.1' 的 pipe 时:</p><ul><li>root.aligned.1TS</li><li>root.aligned.1TS.`1`</li><li>root.aligned.100T</li></ul><p>的数据会被抽取;</p><ul><li>root.aligned.`1`</li><li>root.aligned.`123`</li></ul><p>的数据不会被抽取。</p></li></ul></blockquote><blockquote><p>❗️<strong>source.history 的 start-time,end-time 参数说明</strong></p><ul><li>start-time,end-time 应为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00</li></ul></blockquote><blockquote><p>✅ <strong>一条数据从生产到落库 IoTDB,包含两个关键的时间概念</strong></p><ul><li><strong>event time:</strong> 数据实际生产时的时间(或者数据生产系统给数据赋予的生成时间,是数据点中的时间项),也称为事件时间。</li><li><strong>arrival time:</strong> 数据到达 IoTDB 系统内的时间。</li></ul><p>我们常说的乱序数据,指的是数据到达时,其 <strong>event time</strong> 远落后于当前系统时间(或者已经落库的最大 <strong>event time</strong>)的数据。另一方面,不论是乱序数据还是顺序数据,只要它们是新到达系统的,那它们的 <strong>arrival time</strong> 都是会随着数据到达 IoTDB 的顺序递增的。</p></blockquote><blockquote><p>💎 <strong>iotdb-source 的工作可以拆分成两个阶段</strong></p><ol><li>历史数据抽取:所有 <strong>arrival time</strong> < 创建 pipe 时<strong>当前系统时间</strong>的数据称为历史数据</li><li>实时数据抽取:所有 <strong>arrival time</strong> >= 创建 pipe 时<strong>当前系统时间</strong>的数据称为实时数据</li></ol><p>历史数据传输阶段和实时数据传输阶段,<strong>两阶段串行执行,只有当历史数据传输阶段完成后,才执行实时数据传输阶段。</strong></p></blockquote><h3 id="预置-processor-插件" tabindex="-1"><a class="header-anchor" href="#预置-processor-插件"><span>预置 processor 插件</span></a></h3><h4 id="do-nothing-processor" tabindex="-1"><a class="header-anchor" href="#do-nothing-processor"><span>do-nothing-processor</span></a></h4><p>作用:不对 source 传入的事件做任何的处理。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>processor</td><td>do-nothing-processor</td><td>String: do-nothing-processor</td><td>required</td></tr></tbody></table><h3 id="预置-sink-插件" tabindex="-1"><a class="header-anchor" href="#预置-sink-插件"><span>预置 sink 插件</span></a></h3><h4 id="do-nothing-sink" tabindex="-1"><a class="header-anchor" href="#do-nothing-sink"><span>do-nothing-sink</span></a></h4><p>作用:不对 processor 传入的事件做任何的处理。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>sink</td><td>do-nothing-sink</td><td>String: do-nothing-sink</td><td>required</td></tr></tbody></table><h2 id="流处理任务管理" tabindex="-1"><a class="header-anchor" href="#流处理任务管理"><span>流处理任务管理</span></a></h2><h3 id="创建流处理任务" tabindex="-1"><a class="header-anchor" href="#创建流处理任务"><span>创建流处理任务</span></a></h3><p>使用 <code>CREATE PIPE</code> 语句来创建流处理任务。以数据同步流处理任务的创建为例,示例 SQL 语句如下:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span><span style="color:#7F848E;font-style:italic;"> -- PipeId 是能够唯一标定流处理任务的名字</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SOURCE (</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 默认的 IoTDB 数据抽取插件</span></span> |
| <span class="line"><span style="color:#98C379;"> 'source'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'iotdb-source'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 路径前缀,只有能够匹配该路径前缀的数据才会被抽取,用作后续的处理和发送</span></span> |
| <span class="line"><span style="color:#98C379;"> 'source.pattern'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'root.timecho'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 描述被抽取的历史数据的时间范围,表示最早时间</span></span> |
| <span class="line"><span style="color:#98C379;"> 'source.history.start-time'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '2011.12.03T10:15:30+01:00'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 描述被抽取的历史数据的时间范围,表示最晚时间</span></span> |
| <span class="line"><span style="color:#98C379;"> 'source.history.end-time'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '2022.12.03T10:15:30+01:00'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> PROCESSOR (</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 默认的数据处理插件,即不做任何处理</span></span> |
| <span class="line"><span style="color:#98C379;"> 'processor'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'do-nothing-processor'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SINK (</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- IoTDB 数据发送插件,目标端为 IoTDB</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.ip'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.port'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '6667'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>创建流处理任务时需要配置 PipeId 以及三个插件部分的参数:</strong></p><table><thead><tr><th>配置项</th><th>说明</th><th>是否必填</th><th>默认实现</th><th>默认实现说明</th><th>是否允许自定义实现</th></tr></thead><tbody><tr><td>PipeId</td><td>全局唯一标定一个流处理任务的名称</td><td><!----></td><td>-</td><td>-</td><td>-</td></tr><tr><td>source</td><td>Pipe Source 插件,负责在数据库底层抽取流处理数据</td><td>选填</td><td>iotdb-source</td><td>将数据库的全量历史数据和后续到达的实时数据接入流处理任务</td><td>否</td></tr><tr><td>processor</td><td>Pipe Processor 插件,负责处理数据</td><td>选填</td><td>do-nothing-processor</td><td>对传入的数据不做任何处理</td><td><!----></td></tr><tr><td>sink</td><td>Pipe Sink 插件,负责发送数据</td><td><!----></td><td>-</td><td>-</td><td><!----></td></tr></tbody></table><p>示例中,使用了 iotdb-source、do-nothing-processor 和 iotdb-thrift-sink 插件构建数据流处理任务。IoTDB 还内置了其他的流处理插件,<strong>请查看“系统预置流处理插件”一节</strong>。</p><p><strong>一个最简的 CREATE PIPE 语句示例如下:</strong></p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span><span style="color:#7F848E;font-style:italic;"> -- PipeId 是能够唯一标定流处理任务的名字</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SINK (</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- IoTDB 数据发送插件,目标端为 IoTDB</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.ip'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.port'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '6667'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>其表达的语义是:将本数据库实例中的全量历史数据和后续到达的实时数据,同步到目标为 127.0.0.1:6667 的 IoTDB 实例上。</p><p><strong>注意:</strong></p><ul><li><p>SOURCE 和 PROCESSOR 为选填配置,若不填写配置参数,系统则会采用相应的默认实现</p></li><li><p>SINK 为必填配置,需要在 CREATE PIPE 语句中声明式配置</p></li><li><p>SINK 具备自复用能力。对于不同的流处理任务,如果他们的 SINK 具备完全相同 KV 属性的(所有属性的 key 对应的 value 都相同),<strong>那么系统最终只会创建一个 SINK 实例</strong>,以实现对连接资源的复用。</p><ul><li>例如,有下面 pipe1, pipe2 两个流处理任务的声明:</li></ul><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPE pipe1</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SINK (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.ip'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'localhost'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.port'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '9999'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPE pipe2</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SINK (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.port'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '9999'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink.ip'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'localhost'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ul><li>因为它们对 SINK 的声明完全相同(<strong>即使某些属性声明时的顺序不同</strong>),所以框架会自动对它们声明的 SINK 进行复用,最终 pipe1, pipe2 的 SINK 将会是同一个实例。</li></ul></li><li><p>请不要构建出包含数据循环同步的应用场景(会导致无限循环):</p><ul><li>IoTDB A -> IoTDB B -> IoTDB A</li><li>IoTDB A -> IoTDB A</li></ul></li></ul><h3 id="启动流处理任务" tabindex="-1"><a class="header-anchor" href="#启动流处理任务"><span>启动流处理任务</span></a></h3><p>CREATE PIPE 语句成功执行后,流处理任务相关实例会被创建,但整个流处理任务的运行状态会被置为 STOPPED(V1.3.0),即流处理任务不会立刻处理数据。在 1.3.1 及以上的版本,流处理任务的运行状态在创建后将被立即置为 RUNNING。</p><p>可以使用 START PIPE 语句使流处理任务开始处理数据:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">START</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h3 id="停止流处理任务" tabindex="-1"><a class="header-anchor" href="#停止流处理任务"><span>停止流处理任务</span></a></h3><p>使用 STOP PIPE 语句使流处理任务停止处理数据:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">STOP</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h3 id="删除流处理任务" tabindex="-1"><a class="header-anchor" href="#删除流处理任务"><span>删除流处理任务</span></a></h3><p>使用 DROP PIPE 语句使流处理任务停止处理数据(当流处理任务状态为 RUNNING 时),然后删除整个流处理任务流处理任务:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#C678DD;">DROP</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>用户在删除流处理任务前,不需要执行 STOP 操作。</p><h3 id="展示流处理任务" tabindex="-1"><a class="header-anchor" href="#展示流处理任务"><span>展示流处理任务</span></a></h3><p>使用 SHOW PIPES 语句查看所有流处理任务:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#ABB2BF;">SHOW PIPES</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>查询结果如下:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| ID| CreationTime| </span><span style="color:#C678DD;">State</span><span style="color:#ABB2BF;">|PipeSource|PipeProcessor|PipeSink|ExceptionMessage|</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">|iotdb-kafka|</span><span style="color:#D19A66;">2022</span><span style="color:#ABB2BF;">-</span><span style="color:#D19A66;">03</span><span style="color:#ABB2BF;">-30T20:</span><span style="color:#D19A66;">58</span><span style="color:#ABB2BF;">:</span><span style="color:#D19A66;">30</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">689</span><span style="color:#ABB2BF;">|RUNNING| ...| ...| ...| {}|</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">|iotdb-iotdb|</span><span style="color:#D19A66;">2022</span><span style="color:#ABB2BF;">-</span><span style="color:#D19A66;">03</span><span style="color:#ABB2BF;">-31T12:</span><span style="color:#D19A66;">55</span><span style="color:#ABB2BF;">:</span><span style="color:#D19A66;">28</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">129</span><span style="color:#ABB2BF;">|</span><span style="color:#C678DD;">STOPPED</span><span style="color:#ABB2BF;">| ...| ...| ...| TException: ...|</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">-----------+-----------------------+-------+----------+-------------+--------+----------------+</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>可以使用 <code><PipeId></code> 指定想看的某个流处理任务状态:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#ABB2BF;">SHOW PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>您也可以通过 where 子句,判断某个 <PipeId> 使用的 Pipe Sink 被复用的情况。</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-sql"><span class="line"><span style="color:#ABB2BF;">SHOW PIPES</span></span> |
| <span class="line"><span style="color:#C678DD;">WHERE</span><span style="color:#ABB2BF;"> SINK USED </span><span style="color:#C678DD;">BY</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="流处理任务运行状态迁移" tabindex="-1"><a class="header-anchor" href="#流处理任务运行状态迁移"><span>流处理任务运行状态迁移</span></a></h3><p>一个流处理 pipe 在其生命周期中会经过多种状态:</p><ul><li><strong>RUNNING:</strong> pipe 正在正常工作 <ul><li>当一个 pipe 被成功创建之后,其初始状态为工作状态(V1.3.1+)</li></ul></li><li><strong>STOPPED:</strong> pipe 处于停止运行状态。当管道处于该状态时,有如下几种可能: <ul><li>当一个 pipe 被成功创建之后,其初始状态为暂停状态(V1.3.0)</li><li>用户手动将一个处于正常运行状态的 pipe 暂停,其状态会被动从 RUNNING 变为 STOPPED</li><li>当一个 pipe 运行过程中出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED</li></ul></li><li><strong>DROPPED:</strong> pipe 任务被永久删除</li></ul><p>下图表明了所有状态以及状态的迁移:</p><figure><img src="/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png" alt="状态迁移图" tabindex="0" loading="lazy"><figcaption>状态迁移图</figcaption></figure><h2 id="权限管理" tabindex="-1"><a class="header-anchor" href="#权限管理"><span>权限管理</span></a></h2><h3 id="流处理任务" tabindex="-1"><a class="header-anchor" href="#流处理任务"><span>流处理任务</span></a></h3><table><thead><tr><th>权限名称</th><th>描述</th></tr></thead><tbody><tr><td>USE_PIPE</td><td>注册流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>开启流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>停止流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>卸载流处理任务。路径无关。</td></tr><tr><td>USE_PIPE</td><td>查询流处理任务。路径无关。</td></tr></tbody></table><h3 id="流处理任务插件" tabindex="-1"><a class="header-anchor" href="#流处理任务插件"><span>流处理任务插件</span></a></h3><table><thead><tr><th style="text-align:left;">权限名称</th><th>描述</th></tr></thead><tbody><tr><td style="text-align:left;">USE_PIPE</td><td>注册流处理任务插件。路径无关。</td></tr><tr><td style="text-align:left;">USE_PIPE</td><td>卸载流处理任务插件。路径无关。</td></tr><tr><td style="text-align:left;">USE_PIPE</td><td>查询流处理任务插件。路径无关。</td></tr></tbody></table><h2 id="配置参数" tabindex="-1"><a class="header-anchor" href="#配置参数"><span>配置参数</span></a></h2><p>在 iotdb-system.properties 中:</p><p>V1.3.0:</p><div class="language-properties line-numbers-mode" data-highlighter="shiki" data-ext="properties" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-properties"><span class="line"><span style="color:#7F848E;font-style:italic;">####################</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">### Pipe Configuration</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">####################</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Uncomment the following field to configure the pipe lib directory.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Windows platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_lib_dir=ext\\pipe</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Linux platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is "/", then the path is absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_lib_dir=ext/pipe</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_subtask_executor_max_thread_num=5</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The connection timeout (in milliseconds) for the thrift client.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_connector_timeout_ms=900000</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of selectors that can be used in the async connector.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_async_connector_selector_number=1</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The core number of clients that can be used in the async connector.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_async_connector_core_client_number=8</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of clients that can be used in the async connector.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_async_connector_max_client_number=16</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>V1.3.1+:</p><div class="language-properties line-numbers-mode" data-highlighter="shiki" data-ext="properties" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-properties"><span class="line"><span style="color:#7F848E;font-style:italic;">####################</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">### Pipe Configuration</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">####################</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Uncomment the following field to configure the pipe lib directory.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Windows platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_lib_dir=ext\\pipe</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Linux platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is "/", then the path is absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_lib_dir=ext/pipe</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_subtask_executor_max_thread_num=5</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The connection timeout (in milliseconds) for the thrift client.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_sink_timeout_ms=900000</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of selectors that can be used in the sink.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Recommend to set this value to less than or equal to pipe_sink_max_client_number.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_sink_selector_number=4</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of clients that can be used in the sink.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_sink_max_client_number=16</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div></div><!----><!----><!----></div><footer class="vp-page-meta"><div class="vp-meta-item edit-link"><a class="auto-link external-link vp-meta-label" href="https://github.com/apache/iotdb-docs/edit/main/src/zh/UserGuide/dev-1.3/User-Manual/Streaming_apache.md" aria-label="发现错误?在 GitHub 上编辑此页" rel="noopener noreferrer" target="_blank"><!--[--><svg xmlns="http://www.w3.org/2000/svg" class="icon edit-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="edit icon" name="edit"><path d="M430.818 653.65a60.46 60.46 0 0 1-50.96-93.281l71.69-114.012 7.773-10.365L816.038 80.138A60.46 60.46 0 0 1 859.225 62a60.46 60.46 0 0 1 43.186 18.138l43.186 43.186a60.46 60.46 0 0 1 0 86.373L588.879 565.55l-8.637 8.637-117.466 68.234a60.46 60.46 0 0 1-31.958 11.229z"></path><path d="M728.802 962H252.891A190.883 190.883 0 0 1 62.008 771.98V296.934a190.883 190.883 0 0 1 190.883-192.61h267.754a60.46 60.46 0 0 1 0 120.92H252.891a69.962 69.962 0 0 0-69.098 69.099V771.98a69.962 69.962 0 0 0 69.098 69.098h475.911A69.962 69.962 0 0 0 797.9 771.98V503.363a60.46 60.46 0 1 1 120.922 0V771.98A190.883 190.883 0 0 1 728.802 962z"></path></svg><!--]-->发现错误?在 GitHub 上编辑此页<!----></a></div><div class="vp-meta-item git-info"><div class="update-time"><span class="vp-meta-label">最近更新: </span><time class="vp-meta-info" datetime="2025-04-03T09:29:12.000Z" data-allow-mismatch>2025/4/3 09:29</time></div><!----></div></footer><!----><!----><!----><!--]--></main><!--]--><footer class="site-footer"><span id="doc-version" style="display:none;">dev-1.3</span><p class="copyright-text">版权所有 © 2026 Apache软件基金会。 |
| Apache IoTDB,IoTDB,Apache,Apache 羽毛标志和 Apache IoTDB 项目标志是 Apache 软件基金会在所有国家的注册商标或商标</p><p style="text-align:center;margin-top:10px;color:#909399;font-size:12px;margin:0 30px;"><strong>有问题吗?</strong> 在 QQ、微信或 Slack 上联系我们。 <a href="https://github.com/apache/iotdb/issues/1995">立即加入社区</a></p></footer></div><!--]--><!--]--><!--[--><!----><!--]--><!--]--></div> |
| <script type="module" src="/assets/app-DLe9S4ys.js" defer></script> |
| </body> |
| </html> |