<!doctype html>
<html lang="zh-CN" data-theme="light">
  <head>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width,initial-scale=1" />
    <meta name="generator" content="VuePress 2.0.0-rc.0" />
    <meta name="theme" content="VuePress Theme Hope 2.0.0-rc.2" />
    <style>
      html {
        background: var(--bg-color, #fff);
      }

      html[data-theme="dark"] {
        background: var(--bg-color, #1d1e1f);
      }

      body {
        background: var(--bg-color);
      }
    </style>
    <script>
      const userMode = localStorage.getItem("vuepress-theme-hope-scheme");
      const systemDarkMode =
        window.matchMedia &&
        window.matchMedia("(prefers-color-scheme: dark)").matches;

      if (userMode === "dark" || (userMode !== "light" && systemDarkMode)) {
        document.documentElement.setAttribute("data-theme", "dark");
      }
    </script>
    <link rel="alternate" hreflang="en-us" href="https://iotdb.apache.org/UserGuide/latest/User-Manual/IoTDB-Data-Pipe_timecho.html"><meta property="og:url" content="https://iotdb.apache.org/zh/UserGuide/latest/User-Manual/IoTDB-Data-Pipe_timecho.html"><meta property="og:site_name" content="IoTDB Website"><meta property="og:title" content="IoTDB 数据订阅"><meta property="og:description" content="IoTDB 数据订阅功能可以将 IoTDB 的数据传输到另一个数据平台，我们将一个数据订阅任务称为 Pipe。 一个 Pipe 包含三个子任务（插件）： 抽取（Extract）; 处理（Process）; 发送（Connect）; Pipe 允许用户自定义三个子任务的处理逻辑，通过类似 UDF 的方式处理数据。在一个 Pipe 中，上述的子任务分别由三..."><meta property="og:type" content="article"><meta property="og:locale" content="zh-CN"><meta property="og:locale:alternate" content="en-US"><meta property="og:updated_time" content="2024-03-27T07:22:10.000Z"><meta property="article:modified_time" content="2024-03-27T07:22:10.000Z"><script type="application/ld+json">{"@context":"https://schema.org","@type":"Article","headline":"IoTDB 数据订阅","image":[""],"dateModified":"2024-03-27T07:22:10.000Z","author":[]}</script><link rel="icon" href="/favicon.ico"><meta name="Description" content="Apache IoTDB: Time Series Database for IoT"><meta name="Keywords" content="TSDB, time series, time series database, IoTDB, IoT database, IoT data management,时序数据库, 时间序列管理, IoTDB, 物联网数据库, 实时数据库, 物联网数据管理, 物联网数据"><meta name="baidu-site-verification" content="wfKETzB3OT"><meta name="google-site-verification" content="mZWAoRY0yj_HAr-s47zHCGHzx5Ju-RVm5wDbPnwQYFo"><script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["setDoNotTrack", true]);
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
  var u="https://analytics.apache.org/";
  _paq.push(['setTrackerUrl', u+'matomo.php']);
  _paq.push(['setSiteId', '56']);
  var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
  g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
       </script><title>IoTDB 数据订阅 | IoTDB Website</title><meta name="description" content="IoTDB 数据订阅功能可以将 IoTDB 的数据传输到另一个数据平台，我们将一个数据订阅任务称为 Pipe。 一个 Pipe 包含三个子任务（插件）： 抽取（Extract）; 处理（Process）; 发送（Connect）; Pipe 允许用户自定义三个子任务的处理逻辑，通过类似 UDF 的方式处理数据。在一个 Pipe 中，上述的子任务分别由三...">
    <link rel="preload" href="/assets/style-vuIfcoxv.css" as="style"><link rel="stylesheet" href="/assets/style-vuIfcoxv.css">
    <link rel="modulepreload" href="/assets/app-LsTKUu1f.js"><link rel="modulepreload" href="/assets/IoTDB-Data-Pipe_timecho.html-B4D42xqm.js"><link rel="modulepreload" href="/assets/IoTDB-Data-Pipe_timecho.html-PkxG5F0x.js">
    
  </head>
  <body>
    <div id="app"><!--[--><!--[--><!--[--><span tabindex="-1"></span><a href="#main-content" class="vp-skip-link sr-only">跳至主要內容</a><!--]--><!--[--><div class="theme-container has-toc"><!--[--><header id="navbar" class="vp-navbar hide-icon"><div class="vp-navbar-start"><button type="button" class="vp-toggle-sidebar-button" title="Toggle Sidebar"><span class="icon"></span></button><!--[--><!----><!--]--><!--[--><a class="vp-link vp-brand vp-brand" href="/zh/"><img class="vp-nav-logo" src="/logo.png" alt="IoTDB Website"><!----><span class="vp-site-name hide-in-pad">IoTDB Website</span></a><!--]--><!--[--><!----><!--]--></div><div class="vp-navbar-center"><!--[--><!----><!--]--><!--[--><!--]--><!--[--><!----><!--]--></div><div class="vp-navbar-end"><!--[--><!----><!--]--><!--[--><div id="docsearch-container"></div><nav class="vp-nav-links"><div class="nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="文档"><span class="title"><!---->文档</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a aria-label="v1.3.x" class="vp-link nav-link nav-link" href="/zh/UserGuide/latest/QuickStart/QuickStart.html"><!---->v1.3.x<!----></a></li><li class="dropdown-item"><a aria-label="v1.2.x" class="vp-link nav-link nav-link" href="/zh/UserGuide/V1.2.x/QuickStart/QuickStart.html"><!---->v1.2.x<!----></a></li><li class="dropdown-item"><a aria-label="v1.1.x" class="vp-link nav-link nav-link" href="/zh/UserGuide/V1.1.x/QuickStart/QuickStart.html"><!---->v1.1.x<!----></a></li><li class="dropdown-item"><a aria-label="v1.0.x" class="vp-link nav-link nav-link" href="/zh/UserGuide/V1.0.x/QuickStart/QuickStart.html"><!---->v1.0.x<!----></a></li><li class="dropdown-item"><a aria-label="v0.13.x" class="vp-link nav-link nav-link" href="/zh/UserGuide/V0.13.x/QuickStart/QuickStart.html"><!---->v0.13.x<!----></a></li></ul></button></div></div><div class="nav-item hide-in-mobile"><a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177051872" rel="noopener noreferrer" target="_blank" aria-label="系统设计" class="nav-link"><!---->系统设计<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></div><div class="nav-item hide-in-mobile"><a aria-label="下载" class="vp-link nav-link nav-link" href="/zh/Download/"><!---->下载<!----></a></div><div class="nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="社区"><span class="title"><!---->社区</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a aria-label="关于社区" class="vp-link nav-link nav-link" href="/zh/Community/About.html"><!---->关于社区<!----></a></li><li class="dropdown-item"><a aria-label="交流与反馈" class="vp-link nav-link nav-link" href="/zh/Community/Feedback.html"><!---->交流与反馈<!----></a></li><li class="dropdown-item"><a aria-label="活动与报告" class="vp-link nav-link nav-link" href="/zh/Community/Materials.html"><!---->活动与报告<!----></a></li></ul></button></div></div><div class="nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="开发"><span class="title"><!---->开发</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a aria-label="成为开发者" class="vp-link nav-link nav-link" href="/zh/Development/Community-Project-Committers.html"><!---->成为开发者<!----></a></li><li class="dropdown-item"><a aria-label="开发指南" class="vp-link nav-link nav-link" href="/zh/Development/Development-Guide.html"><!---->开发指南<!----></a></li><li class="dropdown-item"><a aria-label="Power by" class="vp-link nav-link nav-link" href="/zh/Development/Powered-By.html"><!---->Power by<!----></a></li></ul></button></div></div><div class="nav-item hide-in-mobile"><div class="dropdown-wrapper"><button type="button" class="dropdown-title" aria-label="ASF"><span class="title"><!---->ASF</span><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a href="https://www.apache.org/" rel="noopener noreferrer" target="_blank" aria-label="基金会" class="nav-link"><!---->基金会<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/licenses/" rel="noopener noreferrer" target="_blank" aria-label="许可证" class="nav-link"><!---->许可证<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/security/" rel="noopener noreferrer" target="_blank" aria-label="安全" class="nav-link"><!---->安全<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/foundation/sponsorship.html" rel="noopener noreferrer" target="_blank" aria-label="赞助" class="nav-link"><!---->赞助<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/foundation/thanks.html" rel="noopener noreferrer" target="_blank" aria-label="致谢" class="nav-link"><!---->致谢<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://www.apache.org/events/current-event" rel="noopener noreferrer" target="_blank" aria-label="活动" class="nav-link"><!---->活动<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li><li class="dropdown-item"><a href="https://privacy.apache.org/policies/privacy-policy-public.html" rel="noopener noreferrer" target="_blank" aria-label="隐私" class="nav-link"><!---->隐私<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></li></ul></button></div></div></nav><div class="nav-item"><div class="dropdown-wrapper i18n-dropdown"><button type="button" class="dropdown-title" aria-label="选择语言"><!--[--><svg xmlns="http://www.w3.org/2000/svg" class="icon i18n-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="i18n icon" style="width:1rem;height:1rem;vertical-align:middle;"><path d="M379.392 460.8 494.08 575.488l-42.496 102.4L307.2 532.48 138.24 701.44l-71.68-72.704L234.496 460.8l-45.056-45.056c-27.136-27.136-51.2-66.56-66.56-108.544h112.64c7.68 14.336 16.896 27.136 26.112 35.84l45.568 46.08 45.056-45.056C382.976 312.32 409.6 247.808 409.6 204.8H0V102.4h256V0h102.4v102.4h256v102.4H512c0 70.144-37.888 161.28-87.04 210.944L378.88 460.8zM576 870.4 512 1024H409.6l256-614.4H768l256 614.4H921.6l-64-153.6H576zM618.496 768h196.608L716.8 532.48 618.496 768z"></path></svg><!--]--><span class="arrow"></span><ul class="nav-dropdown"><li class="dropdown-item"><a aria-label="English" class="vp-link nav-link nav-link" href="/UserGuide/latest/User-Manual/IoTDB-Data-Pipe_timecho.html"><!---->English<!----></a></li><li class="dropdown-item"><a aria-label="简体中文" class="vp-link nav-link active nav-link active" href="/zh/UserGuide/latest/User-Manual/IoTDB-Data-Pipe_timecho.html"><!---->简体中文<!----></a></li></ul></button></div></div><div class="nav-item hide-in-mobile"><button type="button" id="appearance-switch"><svg xmlns="http://www.w3.org/2000/svg" class="icon auto-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="auto icon" style="display:block;"><path d="M512 992C246.92 992 32 777.08 32 512S246.92 32 512 32s480 214.92 480 480-214.92 480-480 480zm0-840c-198.78 0-360 161.22-360 360 0 198.84 161.22 360 360 360s360-161.16 360-360c0-198.78-161.22-360-360-360zm0 660V212c165.72 0 300 134.34 300 300 0 165.72-134.28 300-300 300z"></path></svg><svg xmlns="http://www.w3.org/2000/svg" class="icon dark-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="dark icon" style="display:none;"><path d="M524.8 938.667h-4.267a439.893 439.893 0 0 1-313.173-134.4 446.293 446.293 0 0 1-11.093-597.334A432.213 432.213 0 0 1 366.933 90.027a42.667 42.667 0 0 1 45.227 9.386 42.667 42.667 0 0 1 10.24 42.667 358.4 358.4 0 0 0 82.773 375.893 361.387 361.387 0 0 0 376.747 82.774 42.667 42.667 0 0 1 54.187 55.04 433.493 433.493 0 0 1-99.84 154.88 438.613 438.613 0 0 1-311.467 128z"></path></svg><svg xmlns="http://www.w3.org/2000/svg" class="icon light-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="light icon" style="display:none;"><path d="M952 552h-80a40 40 0 0 1 0-80h80a40 40 0 0 1 0 80zM801.88 280.08a41 41 0 0 1-57.96-57.96l57.96-58a41.04 41.04 0 0 1 58 58l-58 57.96zM512 752a240 240 0 1 1 0-480 240 240 0 0 1 0 480zm0-560a40 40 0 0 1-40-40V72a40 40 0 0 1 80 0v80a40 40 0 0 1-40 40zm-289.88 88.08-58-57.96a41.04 41.04 0 0 1 58-58l57.96 58a41 41 0 0 1-57.96 57.96zM192 512a40 40 0 0 1-40 40H72a40 40 0 0 1 0-80h80a40 40 0 0 1 40 40zm30.12 231.92a41 41 0 0 1 57.96 57.96l-57.96 58a41.04 41.04 0 0 1-58-58l58-57.96zM512 832a40 40 0 0 1 40 40v80a40 40 0 0 1-80 0v-80a40 40 0 0 1 40-40zm289.88-88.08 58 57.96a41.04 41.04 0 0 1-58 58l-57.96-58a41 41 0 0 1 57.96-57.96z"></path></svg></button></div><div class="nav-item vp-repo"><a class="vp-repo-link" href="https://github.com/apache/iotdb" target="_blank" rel="noopener noreferrer" aria-label="GitHub"><svg xmlns="http://www.w3.org/2000/svg" class="icon github-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="github icon" style="width:1.25rem;height:1.25rem;vertical-align:middle;"><path d="M511.957 21.333C241.024 21.333 21.333 240.981 21.333 512c0 216.832 140.544 400.725 335.574 465.664 24.49 4.395 32.256-10.07 32.256-23.083 0-11.69.256-44.245 0-85.205-136.448 29.61-164.736-64.64-164.736-64.64-22.315-56.704-54.4-71.765-54.4-71.765-44.587-30.464 3.285-29.824 3.285-29.824 49.195 3.413 75.179 50.517 75.179 50.517 43.776 75.008 114.816 53.333 142.762 40.79 4.523-31.66 17.152-53.377 31.19-65.537-108.971-12.458-223.488-54.485-223.488-242.602 0-53.547 19.114-97.323 50.517-131.67-5.035-12.33-21.93-62.293 4.779-129.834 0 0 41.258-13.184 134.912 50.346a469.803 469.803 0 0 1 122.88-16.554c41.642.213 83.626 5.632 122.88 16.554 93.653-63.488 134.784-50.346 134.784-50.346 26.752 67.541 9.898 117.504 4.864 129.834 31.402 34.347 50.474 78.123 50.474 131.67 0 188.586-114.73 230.016-224.042 242.09 17.578 15.232 33.578 44.672 33.578 90.454v135.85c0 13.142 7.936 27.606 32.854 22.87C862.25 912.597 1002.667 728.747 1002.667 512c0-271.019-219.648-490.667-490.71-490.667z"></path></svg></a></div><!--]--><!--[--><!----><!--]--><button type="button" class="vp-toggle-navbar-button" aria-label="Toggle Navbar" aria-expanded="false" aria-controls="nav-screen"><span><span class="vp-top"></span><span class="vp-middle"></span><span class="vp-bottom"></span></span></button></div></header><!----><!--]--><!----><div class="toggle-sidebar-wrapper"><span class="arrow start"></span></div><aside id="sidebar" class="vp-sidebar"><!--[--><!----><!--]--><ul class="vp-sidebar-links"><li><section class="vp-sidebar-group"><p class="vp-sidebar-heading"><!----><span class="vp-sidebar-title">IoTDB用户手册 (V1.3.x)</span><!----></p><ul class="vp-sidebar-links"></ul></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">关于IoTDB</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">快速上手</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">基础概念</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">部署与运维</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">使用手册</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">工具体系</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">应用编程接口</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">系统集成</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">SQL手册</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">FAQ</span><span class="vp-arrow end"></span></button><!----></section></li><li><section class="vp-sidebar-group"><button class="vp-sidebar-heading clickable" type="button"><!----><span class="vp-sidebar-title">参考</span><span class="vp-arrow end"></span></button><!----></section></li></ul><!--[--><!----><!--]--></aside><!--[--><main id="main-content" class="vp-page"><!--[--><!--[--><!----><!--]--><!----><nav class="vp-breadcrumb disable"></nav><div class="vp-page-title"><h1><!---->IoTDB 数据订阅</h1><div class="page-info"><!----><!----><span class="page-date-info" aria-label="写作日期"><svg xmlns="http://www.w3.org/2000/svg" class="icon calendar-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="calendar icon"><path d="M716.4 110.137c0-18.753-14.72-33.473-33.472-33.473-18.753 0-33.473 14.72-33.473 33.473v33.473h66.993v-33.473zm-334.87 0c0-18.753-14.72-33.473-33.473-33.473s-33.52 14.72-33.52 33.473v33.473h66.993v-33.473zm468.81 33.52H716.4v100.465c0 18.753-14.72 33.473-33.472 33.473a33.145 33.145 0 01-33.473-33.473V143.657H381.53v100.465c0 18.753-14.72 33.473-33.473 33.473a33.145 33.145 0 01-33.473-33.473V143.657H180.6A134.314 134.314 0 0046.66 277.595v535.756A134.314 134.314 0 00180.6 947.289h669.74a134.36 134.36 0 00133.94-133.938V277.595a134.314 134.314 0 00-133.94-133.938zm33.473 267.877H147.126a33.145 33.145 0 01-33.473-33.473c0-18.752 14.72-33.473 33.473-33.473h736.687c18.752 0 33.472 14.72 33.472 33.473a33.145 33.145 0 01-33.472 33.473z"></path></svg><span><!----></span><meta property="datePublished" content="2024-03-27T07:22:10.000Z"></span><!----><span class="page-reading-time-info" aria-label="阅读时间"><svg xmlns="http://www.w3.org/2000/svg" class="icon timer-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="timer icon"><path d="M799.387 122.15c4.402-2.978 7.38-7.897 7.38-13.463v-1.165c0-8.933-7.38-16.312-16.312-16.312H256.33c-8.933 0-16.311 7.38-16.311 16.312v1.165c0 5.825 2.977 10.874 7.637 13.592 4.143 194.44 97.22 354.963 220.201 392.763-122.204 37.542-214.893 196.511-220.2 389.397-4.661 5.049-7.638 11.651-7.638 19.03v5.825h566.49v-5.825c0-7.379-2.849-13.981-7.509-18.9-5.049-193.016-97.867-351.985-220.2-389.527 123.24-37.67 216.446-198.453 220.588-392.892zM531.16 450.445v352.632c117.674 1.553 211.787 40.778 211.787 88.676H304.097c0-48.286 95.149-87.382 213.728-88.676V450.445c-93.077-3.107-167.901-81.297-167.901-177.093 0-8.803 6.99-15.793 15.793-15.793 8.803 0 15.794 6.99 15.794 15.793 0 80.261 63.69 145.635 142.01 145.635s142.011-65.374 142.011-145.635c0-8.803 6.99-15.793 15.794-15.793s15.793 6.99 15.793 15.793c0 95.019-73.789 172.82-165.96 177.093z"></path></svg><span>大约 29 分钟</span><meta property="timeRequired" content="PT29M"></span><!----><!----></div><hr></div><div class="toc-place-holder"><aside id="toc"><!--[--><!----><!--]--><div class="toc-header">此页内容<button type="button" class="print-button" title="打印"><svg xmlns="http://www.w3.org/2000/svg" class="icon print-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="print icon"><path d="M819.2 364.8h-44.8V128c0-17.067-14.933-32-32-32H281.6c-17.067 0-32 14.933-32 32v236.8h-44.8C145.067 364.8 96 413.867 96 473.6v192c0 59.733 49.067 108.8 108.8 108.8h44.8V896c0 17.067 14.933 32 32 32h460.8c17.067 0 32-14.933 32-32V774.4h44.8c59.733 0 108.8-49.067 108.8-108.8v-192c0-59.733-49.067-108.8-108.8-108.8zM313.6 160h396.8v204.8H313.6V160zm396.8 704H313.6V620.8h396.8V864zM864 665.6c0 25.6-19.2 44.8-44.8 44.8h-44.8V588.8c0-17.067-14.933-32-32-32H281.6c-17.067 0-32 14.933-32 32v121.6h-44.8c-25.6 0-44.8-19.2-44.8-44.8v-192c0-25.6 19.2-44.8 44.8-44.8h614.4c25.6 0 44.8 19.2 44.8 44.8v192z"></path></svg></button></div><div class="toc-wrapper"><ul class="toc-list"><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#创建流水线">创建流水线</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#启动流水线">启动流水线</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#停止流水线">停止流水线</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#删除流水线">删除流水线</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#展示流水线">展示流水线</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#流水线运行状态迁移">流水线运行状态迁移</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#预置-extractor">预置 extractor</a></li><li><ul class="toc-list"><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#iotdb-extractor">iotdb-extractor</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#预置-processor">预置 processor</a></li><li><ul class="toc-list"><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#do-nothing-processor">do-nothing-processor</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#预置-connector">预置 connector</a></li><li><ul class="toc-list"><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#iotdb-thrift-connector-v1-别名-iotdb-thrift-connector">iotdb-thrift-connector-v1（别名：iotdb-thrift-connector）</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#iotdb-thrift-connector-v2">iotdb-thrift-connector-v2</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#iotdb-sync-connector">iotdb-sync-connector</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#do-nothing-connector">do-nothing-connector</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#编程开发依赖">编程开发依赖</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#事件驱动编程模型">事件驱动编程模型</a></li><li><ul class="toc-list"><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#操作日志写入事件-tabletinsertionevent">操作日志写入事件（TabletInsertionEvent）</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#数据文件写入事件-tsfileinsertionevent">数据文件写入事件（TsFileInsertionEvent）</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#自定义数据订阅插件编程接口定义">自定义数据订阅插件编程接口定义</a></li><li><ul class="toc-list"><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#数据抽取插件接口">数据抽取插件接口</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#数据处理插件接口">数据处理插件接口</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level3 toc-link level3" href="#数据发送插件接口">数据发送插件接口</a></li><!----><!--]--></ul></li><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#加载插件语句">加载插件语句</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#删除插件语句">删除插件语句</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#查看插件语句">查看插件语句</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#pipe-任务">Pipe 任务</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#pipe-插件">Pipe 插件</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#最少一次语义保证-at-least-once">最少一次语义保证 at-least-once</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#源端-数据写入与-pipe-处理、发送数据异步解耦">源端：数据写入与 Pipe 处理、发送数据异步解耦</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#源端-可自适应数据写入负载的数据传输策略">源端：可自适应数据写入负载的数据传输策略</a></li><!----><!--]--><!--[--><li class="toc-item"><a class="vp-link toc-link level2 toc-link level2" href="#源端-高可用集群部署时-pipe-服务高可用">源端：高可用集群部署时，Pipe 服务高可用</a></li><!----><!--]--></ul><div class="toc-marker" style="top:-1.7rem;"></div></div><!--[--><!----><!--]--></aside></div><!--[--><!----><!--]--><div class="theme-hope-content"><!--

    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.

--><h1 id="iotdb-数据订阅" tabindex="-1"><a class="header-anchor" href="#iotdb-数据订阅" aria-hidden="true">#</a> IoTDB 数据订阅</h1><p><strong>IoTDB 数据订阅功能可以将 IoTDB 的数据传输到另一个数据平台，我们将<!---->。</strong></p><p><strong>一个 Pipe 包含三个子任务（插件）：</strong></p><ul><li>抽取（Extract）</li><li>处理（Process）</li><li>发送（Connect）</li></ul><p>**Pipe 允许用户自定义三个子任务的处理逻辑，通过类似 UDF 的方式处理数据。**在一个 Pipe 中，上述的子任务分别由三种插件执行实现，数据会依次经过这三个插件进行处理：Pipe Extractor 用于抽取数据，Pipe Processor 用于处理数据，Pipe Connector 用于发送数据，最终数据将被发至外部系统。</p><p><strong>Pipe 任务的模型如下：</strong></p><figure><img src="https://alioss.timecho.com/docs/img/任务模型图.png" alt="任务模型图" tabindex="0" loading="lazy"><figcaption>任务模型图</figcaption></figure><p>描述一个数据订阅任务，本质就是描述 Pipe Extractor、Pipe Processor 和 Pipe Connector 插件的属性。用户可以通过 SQL 语句声明式地配置三个子任务的具体属性，通过组合不同的属性，实现灵活的数据 ETL 能力。</p><p>利用数据订阅功能，可以搭建完整的数据链路来满足端<em>边云同步、异地灾备、读写负载分库</em>等需求。</p><h1 id="快速开始" tabindex="-1"><a class="header-anchor" href="#快速开始" aria-hidden="true">#</a> 快速开始</h1><p><strong>🎯 目标：实现 IoTDB A -&gt; IoTDB B 的全量数据订阅</strong></p><ul><li><p>启动两个 IoTDB，A（datanode -&gt; 127.0.0.1:6667） B（datanode -&gt; 127.0.0.1:6668）</p></li><li><p>创建 A -&gt; B 的 Pipe，在 A 上执行</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">create</span> pipe a2b
<span class="token keyword">with</span> connector <span class="token punctuation">(</span>
  <span class="token string">&#39;connector&#39;</span><span class="token operator">=</span><span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.ip&#39;</span><span class="token operator">=</span><span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.port&#39;</span><span class="token operator">=</span><span class="token string">&#39;6668&#39;</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div></li><li><p>启动 A -&gt; B 的 Pipe，在 A 上执行</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">start</span> pipe a2b
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div></li><li><p>向 A 写入数据</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">INSERT</span> <span class="token keyword">INTO</span> root<span class="token punctuation">.</span>db<span class="token punctuation">.</span>d<span class="token punctuation">(</span><span class="token keyword">time</span><span class="token punctuation">,</span> m<span class="token punctuation">)</span> <span class="token keyword">values</span> <span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div></li><li><p>在 B 检查由 A 同步过来的数据</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">SELECT</span> <span class="token operator">*</span><span class="token operator">*</span> <span class="token keyword">FROM</span> root
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div></li></ul><blockquote><p>❗️<strong>注：目前的 IoTDB -&gt; IoTDB 的数据订阅实现并不支持 DDL 同步</strong></p><p>即：不支持 ttl，trigger，别名，模板，视图，创建/删除序列，创建/删除数据库等操作<strong>IoTDB -&gt; IoTDB 的数据订阅要求目标端 IoTDB：</strong></p><ul><li>开启自动创建元数据：需要人工配置数据类型的编码和压缩与发送端保持一致</li><li>不开启自动创建元数据：手工创建与源端一致的元数据</li></ul></blockquote><h1 id="pipe-同步任务管理" tabindex="-1"><a class="header-anchor" href="#pipe-同步任务管理" aria-hidden="true">#</a> Pipe 同步任务管理</h1><h2 id="创建流水线" tabindex="-1"><a class="header-anchor" href="#创建流水线" aria-hidden="true">#</a> 创建流水线</h2><p>可以使用 <code>CREATE PIPE</code> 语句来创建一条数据订阅任务，SQL 语句如下所示：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId 是能够唯一标定流水线任务的名字</span>
<span class="token keyword">WITH</span> EXTRACTOR <span class="token punctuation">(</span>
  <span class="token comment">-- 默认的 IoTDB 数据抽取插件</span>
  <span class="token string">&#39;extractor&#39;</span>                    <span class="token operator">=</span> <span class="token string">&#39;iotdb-extractor&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 路径前缀，只有能够匹配该路径前缀的数据才会被抽取，用作后续的处理和发送</span>
  <span class="token string">&#39;extractor.pattern&#39;</span>            <span class="token operator">=</span> <span class="token string">&#39;root.timecho&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 是否抽取历史数据</span>
  <span class="token string">&#39;extractor.history.enable&#39;</span>     <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 描述被抽取的历史数据的时间范围，表示最早时间</span>
  <span class="token string">&#39;extractor.history.start-time&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;2011.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 描述被抽取的历史数据的时间范围，表示最晚时间</span>
  <span class="token string">&#39;extractor.history.end-time&#39;</span>   <span class="token operator">=</span> <span class="token string">&#39;2022.12.03T10:15:30+01:00&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 是否抽取实时数据</span>
  <span class="token string">&#39;extractor.realtime.enable&#39;</span>    <span class="token operator">=</span> <span class="token string">&#39;true&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 描述实时数据的抽取方式</span>
  <span class="token string">&#39;extractor.realtime.mode&#39;</span>      <span class="token operator">=</span> <span class="token string">&#39;hybrid&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> PROCESSOR <span class="token punctuation">(</span>
  <span class="token comment">-- 默认的数据处理插件，即不做任何处理</span>
  <span class="token string">&#39;processor&#39;</span>                    <span class="token operator">=</span> <span class="token string">&#39;do-nothing-processor&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
  <span class="token comment">-- IoTDB 数据发送插件，目标端为 IoTDB</span>
  <span class="token string">&#39;connector&#39;</span>                    <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</span>
  <span class="token string">&#39;connector.ip&#39;</span>                 <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</span>
  <span class="token string">&#39;connector.port&#39;</span>               <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>创建流水线时需要配置 PipeId 以及三个插件部分的参数：</strong></p><table><thead><tr><th>配置项</th><th>说明</th><th>是否必填</th><th>默认实现</th><th>默认实现说明</th><th>是否允许自定义实现</th></tr></thead><tbody><tr><td>PipeId</td><td>全局唯一标定一个同步流水线的名称</td><td><!----></td><td>-</td><td>-</td><td>-</td></tr><tr><td>extractor</td><td>Pipe Extractor 插件，负责在数据库底层抽取同步数据</td><td>选填</td><td>iotdb-extractor</td><td>将数据库的全量历史数据和后续到达的实时数据接入同步流水线</td><td>否</td></tr><tr><td>processor</td><td>Pipe Processor 插件，负责处理数据</td><td>选填</td><td>do-nothing-processor</td><td>对传入的数据不做任何处理</td><td><!----></td></tr><tr><td>connector</td><td>Pipe Connector 插件，负责发送数据</td><td><!----></td><td>-</td><td>-</td><td><!----></td></tr></tbody></table><p>示例中，使用了 iotdb-extractor、do-nothing-processor 和 iotdb-thrift-connector 插件构建数据订阅任务。IoTDB 还内置了其他的数据订阅插件，<strong>请查看“系统预置数据订阅插件”一节</strong>。</p><p><strong>一个最简的 CREATE PIPE 语句示例如下：</strong></p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span> <span class="token comment">-- PipeId 是能够唯一标定流水线任务的名字</span>
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
  <span class="token comment">-- IoTDB 数据发送插件，目标端为 IoTDB</span>
  <span class="token string">&#39;connector&#39;</span>      <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</span>
  <span class="token string">&#39;connector.ip&#39;</span>   <span class="token operator">=</span> <span class="token string">&#39;127.0.0.1&#39;</span><span class="token punctuation">,</span>
  <span class="token comment">-- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</span>
  <span class="token string">&#39;connector.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;6667&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>其表达的语义是：将本数据库实例中的全量历史数据和后续到达的实时数据，同步到目标为 127.0.0.1:6667 的 IoTDB 实例上。</p><p><strong>注意：</strong></p><ul><li><p>EXTRACTOR 和 PROCESSOR 为选填配置，若不填写配置参数，系统则会采用相应的默认实现</p></li><li><p>CONNECTOR 为必填配置，需要在 CREATE PIPE 语句中声明式配置</p></li><li><p>CONNECTOR 具备自复用能力。对于不同的流水线，如果他们的 CONNECTOR 具备完全相同 KV 属性的（所有属性的 key 对应的 value 都相同），<strong>那么系统最终只会创建一个 CONNECTOR 实例</strong>，以实现对连接资源的复用。</p><ul><li>例如，有下面 pipe1, pipe2 两个流水线的声明：</li></ul><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE pipe1
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
  <span class="token string">&#39;connector&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.thrift.host&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.thrift.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>

<span class="token keyword">CREATE</span> PIPE pipe2
<span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span>
  <span class="token string">&#39;connector&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;iotdb-thrift-connector&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.thrift.port&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;9999&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.thrift.host&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;localhost&#39;</span><span class="token punctuation">,</span>
  <span class="token string">&#39;connector.id&#39;</span> <span class="token operator">=</span> <span class="token string">&#39;1&#39;</span><span class="token punctuation">,</span>
<span class="token punctuation">)</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ul><li>因为它们对 CONNECTOR 的声明完全相同（<strong>即使某些属性声明时的顺序不同</strong>），所以框架会自动对它们声明的 CONNECTOR 进行复用，最终 pipe1, pipe2 的CONNECTOR 将会是同一个实例。</li></ul></li><li><p>请不要构建出包含数据循环同步的应用场景（会导致无限循环）：</p><ul><li>IoTDB A -&gt; IoTDB B -&gt; IoTDB A</li><li>IoTDB A -&gt; IoTDB A</li></ul></li></ul><h2 id="启动流水线" tabindex="-1"><a class="header-anchor" href="#启动流水线" aria-hidden="true">#</a> 启动流水线</h2><p>CREATE PIPE 语句成功执行后，流水线相关实例会被创建，但整个流水线的运行状态会被置为 STOPPED，即流水线不会立刻处理数据。</p><p>可以使用 START PIPE 语句使流水线开始处理数据：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">START</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h2 id="停止流水线" tabindex="-1"><a class="header-anchor" href="#停止流水线" aria-hidden="true">#</a> 停止流水线</h2><p>使用 STOP PIPE 语句使流水线停止处理数据：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code>STOP PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h2 id="删除流水线" tabindex="-1"><a class="header-anchor" href="#删除流水线" aria-hidden="true">#</a> 删除流水线</h2><p>使用 DROP PIPE 语句使流水线停止处理数据（当流水线状态为 RUNNING 时），然后删除整个流水线同步任务：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>用户在删除流水线前，不需要执行 STOP 操作。</p><h2 id="展示流水线" tabindex="-1"><a class="header-anchor" href="#展示流水线" aria-hidden="true">#</a> 展示流水线</h2><p>使用 SHOW PIPES 语句查看所有流水线：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>查询结果如下：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
<span class="token operator">|</span>         ID<span class="token operator">|</span>          CreationTime <span class="token operator">|</span>  State<span class="token operator">|</span>PipeExtractor<span class="token operator">|</span>PipeProcessor<span class="token operator">|</span>PipeConnector<span class="token operator">|</span>ExceptionMessage<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>kafka<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">30</span>T20:<span class="token number">58</span>:<span class="token number">30.689</span><span class="token operator">|</span>RUNNING<span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>            None<span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
<span class="token operator">|</span>iotdb<span class="token operator">-</span>iotdb<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">31</span>T12:<span class="token number">55</span>:<span class="token number">28.129</span><span class="token operator">|</span>STOPPED<span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> TException: <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>
<span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>可以使用 <code>&lt;PipeId&gt;</code> 指定想看的某个同步任务状态：</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPE <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>您也可以通过 where 子句，判断某个 &lt;PipeId&gt; 使用的 Pipe Connector 被复用的情况。</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES
<span class="token keyword">WHERE</span> CONNECTOR USED <span class="token keyword">BY</span> <span class="token operator">&lt;</span>PipeId<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="流水线运行状态迁移" tabindex="-1"><a class="header-anchor" href="#流水线运行状态迁移" aria-hidden="true">#</a> 流水线运行状态迁移</h2><p>一个数据订阅 pipe 在其被管理的生命周期中会经过多种状态：</p><ul><li>**STOPPED：**pipe 处于停止运行状态。当管道处于该状态时，有如下几种可能： <ul><li>当一个 pipe 被成功创建之后，其初始状态为暂停状态</li><li>用户手动将一个处于正常运行状态的 pipe 暂停，其状态会被动从 RUNNING 变为 STOPPED</li><li>当一个 pipe 运行过程中出现无法恢复的错误时，其状态会自动从 RUNNING 变为 STOPPED</li></ul></li><li>**RUNNING：**pipe 正在正常工作</li><li>**DROPPED：**pipe 任务被永久删除</li></ul><p>下图表明了所有状态以及状态的迁移：</p><figure><img src="https://alioss.timecho.com/docs/img/状态迁移图.png" alt="状态迁移图" tabindex="0" loading="lazy"><figcaption>状态迁移图</figcaption></figure><h1 id="系统预置数据订阅插件" tabindex="-1"><a class="header-anchor" href="#系统预置数据订阅插件" aria-hidden="true">#</a> <strong>系统预置数据订阅插件</strong></h1><h2 id="预置-extractor" tabindex="-1"><a class="header-anchor" href="#预置-extractor" aria-hidden="true">#</a> 预置 extractor</h2><h3 id="iotdb-extractor" tabindex="-1"><a class="header-anchor" href="#iotdb-extractor" aria-hidden="true">#</a> iotdb-extractor</h3><p>作用：抽取 IoTDB 内部的历史或实时数据进入流水线。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>extractor</td><td>iotdb-extractor</td><td>String: iotdb-extractor</td><td>required</td></tr><tr><td>extractor.pattern</td><td>用于筛选时间序列的路径前缀</td><td>String: 任意的时间序列前缀</td><td>optional: root</td></tr><tr><td>extractor.history.enable</td><td>是否同步历史数据</td><td>Boolean: true, false</td><td>optional: true</td></tr><tr><td>extractor.history.start-time</td><td>同步历史数据的开始 event time，包含 start-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>extractor.history.end-time</td><td>同步历史数据的结束 event time，包含 end-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>extractor.realtime.enable</td><td>是否同步实时数据</td><td>Boolean: true, false</td><td>optional: true</td></tr><tr><td>extractor.realtime.mode</td><td>实时数据的抽取模式</td><td>String: hybrid, log, file</td><td>optional: hybrid</td></tr></tbody></table><blockquote><p>🚫 <strong>extractor.pattern 参数说明</strong></p><ul><li><p>Pattern 需用反引号修饰不合法字符或者是不合法路径节点，例如如果希望筛选 root.`a@b` 或者 root.`123`，应设置 pattern 为 root.`a@b` 或者 root.`123`（具体参考 <a href="https://iotdb.apache.org/zh/Download/#_1-0-%E7%89%88%E6%9C%AC%E4%B8%8D%E5%85%BC%E5%AE%B9%E7%9A%84%E8%AF%AD%E6%B3%95%E8%AF%A6%E7%BB%86%E8%AF%B4%E6%98%8E" target="_blank" rel="noopener noreferrer">单双引号和反引号的使用时机<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span></a>）</p></li><li><p>在底层实现中，当检测到 pattern 为 root（默认值）时，同步效率较高，其他任意格式都将降低性能</p></li><li><p>路径前缀不需要能够构成完整的路径。例如，当创建一个包含参数为 &#39;extractor.pattern&#39;=&#39;root.aligned.1&#39; 的 pipe 时：</p><ul><li>root.aligned.1TS</li><li>root.aligned.1TS.`1`</li><li>root.aligned.100TS</li></ul><p>的数据会被同步；</p><ul><li>root.aligned.`1`</li><li>root.aligned.`123`</li></ul><p>的数据不会被同步。</p></li><li><p>root.__system 的数据不会被 pipe 抽取，即不会被同步到目标端。用户虽然可以在 extractor.pattern 中包含任意前缀，包括带有（或覆盖） root.__system 的前缀，但是 root.__system 下的数据总是会被 pipe 忽略的</p></li></ul></blockquote><blockquote><p>❗️<strong>extractor.history 的 start-time，end-time 参数说明</strong></p><ul><li>start-time，end-time 应为 ISO 格式，例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00</li></ul></blockquote><blockquote><p>✅ <strong>一条数据从生产到落库 IoTDB，包含两个关键的时间概念</strong></p><ul><li>**event time：**数据实际生产时的时间（或者数据生产系统给数据赋予的生成时间，是数据点中的时间项），也称为事件时间。</li><li>**arrival time：**数据到达 IoTDB 系统内的时间。</li></ul><p>我们常说的乱序数据，指的是数据到达时，其 <strong>event time</strong> 远落后于当前系统时间（或者已经落库的最大 <strong>event time</strong>）的数据。另一方面，不论是乱序数据还是顺序数据，只要它们是新到达系统的，那它们的 <strong>arrival time</strong> 都是会随着数据到达 IoTDB 的顺序递增的。</p></blockquote><blockquote><p>💎 <strong>iotdb-extractor 的工作可以拆分成两个阶段</strong></p><ol><li>历史数据抽取：所有 <strong>arrival time</strong> &lt; 创建 pipe 时<strong>当前系统时间</strong>的数据称为历史数据</li><li>实时数据抽取：所有 <strong>arrival time</strong> &gt;= 创建 pipe 时<strong>当前系统时间</strong>的数据称为实时数据</li></ol><p>历史数据传输阶段和实时数据传输阶段，<strong>两阶段串行执行，只有当历史数据传输阶段完成后，才执行实时数据传输阶段。</strong></p><p>用户可以指定 iotdb-extractor 进行：</p><ul><li>历史数据抽取（<code>&#39;extractor.history.enable&#39; = &#39;true&#39;</code>, <code>&#39;extractor.realtime.enable&#39; = &#39;false&#39;</code> ）</li><li>实时数据抽取（<code>&#39;extractor.history.enable&#39; = &#39;false&#39;</code>, <code>&#39;extractor.realtime.enable&#39; = &#39;true&#39;</code> ）</li><li>全量数据抽取（<code>&#39;extractor.history.enable&#39; = &#39;true&#39;</code>, <code>&#39;extractor.realtime.enable&#39; = &#39;true&#39;</code> ）</li><li>禁止同时设置 extractor.history.enable 和 extractor.relatime.enable 为 false</li></ul></blockquote><blockquote><p>📌 <strong>extractor.realtime.mode：数据抽取的模式</strong></p><ul><li>log：该模式下，流水线仅使用操作日志进行数据处理、发送</li><li>file：该模式下，流水线仅使用数据文件进行数据处理、发送</li><li>hybrid：该模式，考虑了按操作日志逐条目发送数据时延迟低但吞吐低的特点，以及按数据文件批量发送时发送吞吐高但延迟高的特点，能够在不同的写入负载下自动切换适合的数据抽取方式，首先采取基于操作日志的数据抽取方式以保证低发送延迟，当产生数据积压时自动切换成基于数据文件的数据抽取方式以保证高发送吞吐，积压消除时自动切换回基于操作日志的数据抽取方式，避免了采用单一数据抽取算法难以平衡数据发送延迟或吞吐的问题。</li></ul></blockquote><h2 id="预置-processor" tabindex="-1"><a class="header-anchor" href="#预置-processor" aria-hidden="true">#</a> 预置 processor</h2><h3 id="do-nothing-processor" tabindex="-1"><a class="header-anchor" href="#do-nothing-processor" aria-hidden="true">#</a> do-nothing-processor</h3><p>作用：不对 extractor 传入的事件做任何的处理。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>processor</td><td>do-nothing-processor</td><td>String: do-nothing-processor</td><td>required</td></tr></tbody></table><h2 id="预置-connector" tabindex="-1"><a class="header-anchor" href="#预置-connector" aria-hidden="true">#</a> 预置 connector</h2><h3 id="iotdb-thrift-connector-v1-别名-iotdb-thrift-connector" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-connector-v1-别名-iotdb-thrift-connector" aria-hidden="true">#</a> iotdb-thrift-connector-v1（别名：iotdb-thrift-connector）</h3><p>作用：主要用于 IoTDB（v1.2.0+）与 IoTDB（v1.2.0+）之间的数据传输。使用 Thrift RPC 框架传输数据，单线程 blocking IO 模型。保证接收端 apply 数据的顺序与发送端接受写入请求的顺序一致。</p><p>限制：源端 IoTDB 与 目标端 IoTDB 版本都需要在 v1.2.0+。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>iotdb-thrift-connector 或 iotdb-thrift-connector-v1</td><td>String: iotdb-thrift-connector 或 iotdb-thrift-connector-v1</td><td>required</td></tr><tr><td>connector.ip</td><td>目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</td><td>String</td><td>required</td></tr><tr><td>connector.port</td><td>目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</td><td>Integer</td><td>required</td></tr></tbody></table><blockquote><p>📌 请确保接收端已经创建了发送端的所有时间序列，或是开启了自动创建元数据，否则将会导致 pipe 运行失败。</p></blockquote><h3 id="iotdb-thrift-connector-v2" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-connector-v2" aria-hidden="true">#</a> iotdb-thrift-connector-v2</h3><p>作用：主要用于 IoTDB（v1.2.0+）与 IoTDB（v1.2.0+）之间的数据传输。使用 Thrift RPC 框架传输数据，多线程 async non-blocking IO 模型，传输性能高，尤其适用于目标端为分布式时的场景。不保证接收端 apply 数据的顺序与发送端接受写入请求的顺序一致，但是保证数据发送的完整性（at-least-once）。</p><p>限制：源端 IoTDB 与 目标端 IoTDB 版本都需要在 v1.2.0+。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>iotdb-thrift-connector-v2</td><td>String: iotdb-thrift-connector-v2</td><td>required</td></tr><tr><td>connector.node-urls</td><td>目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url</td><td>String。例：&#39;127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669&#39;&#39;127.0.0.1:6667&#39;</td><td>required</td></tr></tbody></table><blockquote><p>📌 请确保接收端已经创建了发送端的所有时间序列，或是开启了自动创建元数据，否则将会导致 pipe 运行失败。</p></blockquote><h3 id="iotdb-sync-connector" tabindex="-1"><a class="header-anchor" href="#iotdb-sync-connector" aria-hidden="true">#</a> iotdb-sync-connector</h3><p>作用：主要用于 IoTDB（v1.2.0+）向更低版本的 IoTDB 传输数据，使用 v1.2.0 版本前的数据同步（Sync）协议。使用 Thrift RPC 框架传输数据。单线程 sync blocking IO 模型，传输性能较弱。</p><p>限制：源端 IoTDB 版本需要在 v1.2.0+，目标端 IoTDB 版本可以是 v1.2.0+、v1.1.x（更低版本的 IoTDB 理论上也支持，但是未经测试）。</p><p>注意：理论上 v1.2.0+ IoTDB 可作为 v1.2.0 版本前的任意版本的数据同步（Sync）接收端。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>iotdb-sync-connector</td><td>String: iotdb-sync-connector</td><td>required</td></tr><tr><td>connector.ip</td><td>目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip</td><td>String</td><td>required</td></tr><tr><td>connector.port</td><td>目标端 IoTDB 其中一个 DataNode 节点的数据服务 port</td><td>Integer</td><td>required</td></tr><tr><td>connector.user</td><td>目标端 IoTDB 的用户名，注意该用户需要支持数据写入、TsFile Load 的权限</td><td>String</td><td>optional: root</td></tr><tr><td>connector.password</td><td>目标端 IoTDB 的密码，注意该用户需要支持数据写入、TsFile Load 的权限</td><td>String</td><td>optional: root</td></tr><tr><td>connector.version</td><td>目标端 IoTDB 的版本，用于伪装自身实际版本，绕过目标端的版本一致性检查</td><td>String</td><td>optional: 1.1</td></tr></tbody></table><blockquote><p>📌 请确保接收端已经创建了发送端的所有时间序列，或是开启了自动创建元数据，否则将会导致 pipe 运行失败。</p></blockquote><h3 id="do-nothing-connector" tabindex="-1"><a class="header-anchor" href="#do-nothing-connector" aria-hidden="true">#</a> do-nothing-connector</h3><p>作用：不对 processor 传入的事件做任何的处理。</p><table><thead><tr><th>key</th><th>value</th><th>value 取值范围</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>do-nothing-connector</td><td>String: do-nothing-connector</td><td>required</td></tr></tbody></table><h1 id="自定义数据订阅插件开发" tabindex="-1"><a class="header-anchor" href="#自定义数据订阅插件开发" aria-hidden="true">#</a> 自定义数据订阅插件开发</h1><h2 id="编程开发依赖" tabindex="-1"><a class="header-anchor" href="#编程开发依赖" aria-hidden="true">#</a> 编程开发依赖</h2><p>推荐采用 maven 构建项目，在<code>pom.xml</code>中添加以下依赖。请注意选择和 IoTDB 服务器版本相同的依赖版本。</p><div class="language-xml line-numbers-mode" data-ext="xml"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.iotdb<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>pipe-api<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>1.2.0<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>scope</span><span class="token punctuation">&gt;</span></span>provided<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>scope</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="事件驱动编程模型" tabindex="-1"><a class="header-anchor" href="#事件驱动编程模型" aria-hidden="true">#</a> 事件驱动编程模型</h2><p>数据订阅插件的用户编程接口设计，参考了事件驱动编程模型的通用设计理念。事件（Event）是用户编程接口中的数据抽象，而编程接口与具体的执行方式解耦，只需要专注于描述事件（数据）到达系统后，系统期望的处理方式即可。</p><p>在数据订阅插件的用户编程接口中，事件是数据库数据写入操作的抽象。事件由单机同步引擎捕获，按照同步三个阶段的流程，依次传递至 PipeExtractor 插件，PipeProcessor 插件和 PipeConnector 插件，并依次在三个插件中触发用户逻辑的执行。</p><p>为了兼顾端侧低负载场景下的同步低延迟和端侧高负载场景下的同步高吞吐，同步引擎会动态地在操作日志和数据文件中选择处理对象，因此，同步的用户编程接口要求用户提供下列两类事件的处理逻辑：操作日志写入事件 TabletInsertionEvent 和数据文件写入事件 TsFileInsertionEvent。</p><h3 id="操作日志写入事件-tabletinsertionevent" tabindex="-1"><a class="header-anchor" href="#操作日志写入事件-tabletinsertionevent" aria-hidden="true">#</a> <strong>操作日志写入事件（TabletInsertionEvent）</strong></h3><p>操作日志写入事件（TabletInsertionEvent）是对用户写入请求的高层数据抽象，它通过提供统一的操作接口，为用户提供了操纵写入请求底层数据的能力。</p><p>对于不同的数据库部署方式，操作日志写入事件对应的底层存储结构是不一样的。对于单机部署的场景，操作日志写入事件是对写前日志（WAL）条目的封装；对于分布式部署的场景，操作日志写入事件是对单个节点共识协议操作日志条目的封装。</p><p>对于数据库不同写入请求接口生成的写入操作，操作日志写入事件对应的请求结构体的数据结构也是不一样的。IoTDB 提供了 InsertRecord、InsertRecords、InsertTablet、InsertTablets 等众多的写入接口，每一种写入请求都使用了完全不同的序列化方式，生成的二进制条目也不尽相同。</p><p>操作日志写入事件的存在，为用户提供了一种统一的数据操作视图，它屏蔽了底层数据结构的实现差异，极大地降低了用户的编程门槛，提升了功能的易用性。</p><div class="language-java line-numbers-mode" data-ext="java"><pre class="language-java"><code><span class="token doc-comment comment">/** TabletInsertionEvent is used to define the event of data insertion. */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TabletInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>

  <span class="token doc-comment comment">/**
   * The consumer processes the data row by row and collects the results by RowCollector.
   *
   * <span class="token keyword">@return</span> Iterable<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> a list of new TabletInsertionEvent contains the results
   *     collected by the RowCollector
   */</span>
  <span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processRowByRow</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Row</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * The consumer processes the Tablet directly and collects the results by RowCollector.
   *
   * <span class="token keyword">@return</span> Iterable<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> a list of new TabletInsertionEvent contains the results
   *     collected by the RowCollector
   */</span>
  <span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">processTablet</span><span class="token punctuation">(</span><span class="token class-name">BiConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">Tablet</span><span class="token punctuation">,</span> <span class="token class-name">RowCollector</span><span class="token punctuation">&gt;</span></span> consumer<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="数据文件写入事件-tsfileinsertionevent" tabindex="-1"><a class="header-anchor" href="#数据文件写入事件-tsfileinsertionevent" aria-hidden="true">#</a> <strong>数据文件写入事件（TsFileInsertionEvent）</strong></h3><p>数据文件写入事件（TsFileInsertionEvent） 是对数据库文件落盘操作的高层抽象，它是若干操作日志写入事件（TabletInsertionEvent）的数据集合。</p><p>IoTDB 的存储引擎是 LSM 结构的。数据写入时会先将写入操作落盘到日志结构的文件里，同时将写入数据保存在内存里。当内存达到控制上限，则会触发刷盘行为，即将内存中的数据转换为数据库文件，同时删除之前预写的操作日志。当内存中的数据转换为数据库文件中的数据时，会经过编码压缩和通用压缩两次压缩处理，因此数据库文件的数据相比内存中的原始数据占用的空间更少。</p><p>在极端的网络情况下，直接传输数据文件相比传输数据写入的操作要更加经济，它会占用更低的网络带宽，能实现更快的传输速度。当然，天下没有免费的午餐，对文件中的数据进行计算处理，相比直接对内存中的数据进行计算处理时，需要额外付出文件 I/O 的代价。但是，正是磁盘数据文件和内存写入操作两种结构各有优劣的存在，给了系统做动态权衡调整的机会，也正是基于这样的观察，插件的事件模型中才引入了数据文件写入事件。</p><p>综上，数据文件写入事件出现在同步插件的事件流中，存在下面两种情况：</p><p>（1）历史数据抽取：一个同步任务开始前，所有已经落盘的写入数据都会以 TsFile 的形式存在。一个同步任务开始后，采集历史数据时，历史数据将以 TsFileInsertionEvent 作为抽象；</p><ol><li>（2）实时数据抽取：一个同步任务进行时，当数据流中实时处理操作日志写入事件的速度慢于写入请求速度一定进度之后，未来得及处理的操作日志写入事件会被被持久化至磁盘，以 TsFile 的形式存在，这一些数据被同步引擎采集到后，会以 TsFileInsertionEvent 作为抽象。</li></ol><div class="language-java line-numbers-mode" data-ext="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
 * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
 * which is compressed and encoded, and requires IO cost for computational processing.
 */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">TsFileInsertionEvent</span> <span class="token keyword">extends</span> <span class="token class-name">Event</span> <span class="token punctuation">{</span>

  <span class="token doc-comment comment">/**
   * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
   *
   * <span class="token keyword">@return</span> the list of TsFileInsertionEvent
   */</span>
  <span class="token class-name">Iterable</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">&gt;</span></span> <span class="token function">toTabletInsertionEvents</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="自定义数据订阅插件编程接口定义" tabindex="-1"><a class="header-anchor" href="#自定义数据订阅插件编程接口定义" aria-hidden="true">#</a> 自定义数据订阅插件编程接口定义</h2><p>基于自定义数据订阅插件编程接口，用户可以轻松编写数据抽取插件、 数据处理插件和数据发送插件，从而使得同步功能灵活适配各种工业场景。</p><h3 id="数据抽取插件接口" tabindex="-1"><a class="header-anchor" href="#数据抽取插件接口" aria-hidden="true">#</a> 数据抽取插件接口</h3><p>数据抽取是同步数据从数据抽取到数据发送三阶段的第一阶段。数据抽取插件（PipeExtractor）是同步引擎和存储引擎的桥梁，它通过监听存储引擎的行为，捕获各种数据写入事件。</p><div class="language-java line-numbers-mode" data-ext="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
 * PipeExtractor
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeExtractor is responsible for capturing events from sources.
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various data sources can be supported by implementing different PipeExtractor classes.
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeExtractor is as follows:
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of `WITH EXTRACTOR` clause in SQL are
 *       parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
 *       will be called to validate the parameters.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *       <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
 *       to config the runtime behavior of the PipeExtractor.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Then the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to start the PipeExtractor.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will
 *       be called to capture events from sources and then the events will be passed to the
 *       PipeProcessor.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called when the collaboration task is
 *       cancelled (the `DROP PIPE` command is executed).
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
 */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeExtractor</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>

  <span class="token doc-comment comment">/**
   * This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
   */</span>
  <span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is mainly used to customize PipeExtractor. In this method, the user can do the
   * following things:
   *
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
   *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeExtractorRuntimeConfiguration.
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
   *
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
   * <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeExtractor
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
      <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * Start the extractor. After this method is called, events should be ready to be supplied by
   * <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>. This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeExtractorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * Supply single event from the extractor and the caller will send the event to the processor.
   * This method is called after <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeExtractor</span><span class="token punctuation">#</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@return</span> the event to be supplied. the event may be null if the extractor has no more events at
   *     the moment, but the extractor is still running for more events.
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token class-name">Event</span> <span class="token function">supply</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="数据处理插件接口" tabindex="-1"><a class="header-anchor" href="#数据处理插件接口" aria-hidden="true">#</a> 数据处理插件接口</h3><p>数据处理是同步数据从数据抽取到数据发送三阶段的第二阶段。数据处理插件（PipeProcessor）主要用于过滤和转换由数据抽取插件（PipeExtractor）捕获的各种事件。</p><div class="language-java line-numbers-mode" data-ext="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
 * PipeProcessor
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeProcessor is used to filter and transform the Event formed by the PipeExtractor.
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeProcessor is as follows:
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
 *       parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
 *       will be called to validate the parameters.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *       <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
 *       to config the runtime behavior of the PipeProcessor.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
 *       <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
 *         <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeExtractor captures the events and wraps them into three types of Event instances.
 *         <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeConnector. The
 *             following 3 methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *             <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *             <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *             <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">,</span> <span class="token class-name">EventCollector</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
 *         <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeConnector serializes the events into binaries and send them to sinks.
 *       <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *       <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
 */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeProcessor</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>

  <span class="token doc-comment comment">/**
   * This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
   */</span>
  <span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is mainly used to customize PipeProcessor. In this method, the user can do the
   * following things:
   *
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
   *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeProcessorRuntimeConfiguration.
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
   *
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeProcessor</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called and before the beginning of the
   * events processing.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
   * <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeProcessor
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeProcessorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
      <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is called to process the TabletInsertionEvent.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be processed
   * <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
      <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is called to process the TsFileInsertionEvent.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be processed
   * <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span>
      <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is called to process the Event.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be processed
   * <span class="token keyword">@param</span> <span class="token parameter">eventCollector</span> used to collect result events after processing
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">,</span> <span class="token class-name">EventCollector</span> eventCollector<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="数据发送插件接口" tabindex="-1"><a class="header-anchor" href="#数据发送插件接口" aria-hidden="true">#</a> 数据发送插件接口</h3><p>数据发送是同步数据从数据抽取到数据发送三阶段的第三阶段。数据发送插件（PipeConnector）主要用于发送经由数据处理插件（PipeProcessor）处理过后的各种事件，它作为数据订阅框架的网络实现层，接口上应允许接入多种实时通信协议和多种连接器。</p><div class="language-java line-numbers-mode" data-ext="java"><pre class="language-java"><code><span class="token doc-comment comment">/**
 * PipeConnector
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>PipeConnector is responsible for sending events to sinks.
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>Various network protocols can be supported by implementing different PipeConnector classes.
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>The lifecycle of a PipeConnector is as follows:
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When a collaboration task is created, the KV pairs of `WITH CONNECTOR` clause in SQL are
 *       parsed and the validation method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
 *       will be called to validate the parameters.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Before the collaboration task starts, the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *       <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called
 *       to config the runtime behavior of the PipeConnector and the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *       <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called to create a connection with sink.
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>While the collaboration task is in progress:
 *       <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
 *         <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeExtractor captures the events and wraps them into three types of Event instances.
 *         <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeProcessor processes the event and then passes them to the PipeConnector.
 *         <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>PipeConnector serializes the events into binaries and send them to sinks. The
 *             following 3 methods will be called: <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *             <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>, <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *             <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> and <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *             <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>.
 *       <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
 *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the <span class="token punctuation">{</span><span class="token keyword">@link</span>
 *       <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span> <span class="token punctuation">}</span> method will be called.
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
 *
 * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>In addition, the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> will be called periodically to check
 * whether the connection with sink is still alive. The method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span>
 * will be called to create a new connection with the sink when the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
 * <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> throws exceptions.
 */</span>
<span class="token keyword">public</span> <span class="token keyword">interface</span> <span class="token class-name">PipeConnector</span> <span class="token keyword">extends</span> <span class="token class-name">PipePlugin</span> <span class="token punctuation">{</span>

  <span class="token doc-comment comment">/**
   * This method is mainly used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span> and it is executed before <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">validator</span> the validator used to validate <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeParameters</span></span><span class="token punctuation">}</span>
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if any parameter is not valid
   */</span>
  <span class="token keyword">void</span> <span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span> validator<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is mainly used to customize PipeConnector. In this method, the user can do the
   * following things:
   *
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>ul</span><span class="token punctuation">&gt;</span></span>
   *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Use PipeParameters to parse key-value pair attributes entered by the user.
   *   <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>li</span><span class="token punctuation">&gt;</span></span>Set the running configurations in PipeConnectorRuntimeConfiguration.
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>ul</span><span class="token punctuation">&gt;</span></span>
   *
   * <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>p</span><span class="token punctuation">&gt;</span></span>This method is called after the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">validate</span><span class="token punctuation">(</span><span class="token class-name">PipeParameterValidator</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called and before the method <span class="token punctuation">{</span><span class="token keyword">@link</span>
   * <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is called.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">parameters</span> used to parse the input parameters entered by the user
   * <span class="token keyword">@param</span> <span class="token parameter">configuration</span> used to set the required properties of the running PipeConnector
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span> parameters<span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span> configuration<span class="token punctuation">)</span>
      <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is used to create a connection with sink. This method will be called after the
   * method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">customize</span><span class="token punctuation">(</span><span class="token class-name">PipeParameters</span><span class="token punctuation">,</span> <span class="token class-name">PipeConnectorRuntimeConfiguration</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> is
   * called or will be called when the method <span class="token punctuation">{</span><span class="token keyword">@link</span> <span class="token reference"><span class="token class-name">PipeConnector</span><span class="token punctuation">#</span><span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span></span><span class="token punctuation">}</span> throws exceptions.
   *
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection is failed to be created
   */</span>
  <span class="token keyword">void</span> <span class="token function">handshake</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method will be called periodically to check whether the connection with sink is still
   * alive.
   *
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> if the connection dies
   */</span>
  <span class="token keyword">void</span> <span class="token function">heartbeat</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is used to transfer the TabletInsertionEvent.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">tabletInsertionEvent</span> TabletInsertionEvent to be transferred
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TabletInsertionEvent</span> tabletInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is used to transfer the TsFileInsertionEvent.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">tsFileInsertionEvent</span> TsFileInsertionEvent to be transferred
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">TsFileInsertionEvent</span> tsFileInsertionEvent<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>

  <span class="token doc-comment comment">/**
   * This method is used to transfer the Event.
   *
   * <span class="token keyword">@param</span> <span class="token parameter">event</span> Event to be transferred
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">PipeConnectionException</span></span> if the connection is broken
   * <span class="token keyword">@throws</span> <span class="token reference"><span class="token class-name">Exception</span></span> the user can throw errors if necessary
   */</span>
  <span class="token keyword">void</span> <span class="token function">transfer</span><span class="token punctuation">(</span><span class="token class-name">Event</span> event<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h1 id="自定义数据订阅插件管理" tabindex="-1"><a class="header-anchor" href="#自定义数据订阅插件管理" aria-hidden="true">#</a> 自定义数据订阅插件管理</h1><p>为了保证用户自定义插件在实际生产中的灵活性和易用性，系统还需要提供对插件进行动态统一管理的能力。本章节介绍的数据订阅插件管理语句提供了对插件进行动态统一管理的入口。</p><h2 id="加载插件语句" tabindex="-1"><a class="header-anchor" href="#加载插件语句" aria-hidden="true">#</a> 加载插件语句</h2><p>在 IoTDB 中，若要在系统中动态载入一个用户自定义插件，则首先需要基于 PipeExtractor、 PipeProcessor 或者 PipeConnector 实现一个具体的插件类，然后需要将插件类编译打包成 jar 可执行文件，最后使用加载插件的管理语句将插件载入 IoTDB。</p><p>加载插件的管理语句的语法如图所示。</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPEPLUGIN <span class="token operator">&lt;</span>别名<span class="token operator">&gt;</span>
<span class="token keyword">AS</span> <span class="token operator">&lt;</span>全类名<span class="token operator">&gt;</span>
<span class="token keyword">USING</span> <span class="token operator">&lt;</span>JAR 包的 URI<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>例如，用户实现了一个全类名为 edu.tsinghua.iotdb.pipe.ExampleProcessor 的数据处理插件，打包后的 jar 资源包存放到了 <a href="https://example.com:8080/iotdb/pipe-plugin.jar" target="_blank" rel="noopener noreferrer">https://example.com:8080/iotdb/pipe-plugin.jar<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span></a> 上，用户希望在同步引擎中使用这个插件，将插件标记为 example。那么，这个数据处理插件的创建语句如图所示。</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPEPLUGIN example
<span class="token keyword">AS</span> <span class="token string">&#39;edu.tsinghua.iotdb.pipe.ExampleProcessor&#39;</span>
<span class="token keyword">USING</span> URI <span class="token string">&#39;&lt;https://example.com:8080/iotdb/pipe-plugin.jar&gt;&#39;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="删除插件语句" tabindex="-1"><a class="header-anchor" href="#删除插件语句" aria-hidden="true">#</a> 删除插件语句</h2><p>当用户不再想使用一个插件，需要将插件从系统中卸载时，可以使用如图所示的删除插件语句。</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPEPLUGIN <span class="token operator">&lt;</span>别名<span class="token operator">&gt;</span>
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h2 id="查看插件语句" tabindex="-1"><a class="header-anchor" href="#查看插件语句" aria-hidden="true">#</a> 查看插件语句</h2><p>用户也可以按需查看系统中的插件。查看插件的语句如图所示。</p><div class="language-sql line-numbers-mode" data-ext="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPEPLUGINS
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h1 id="权限管理" tabindex="-1"><a class="header-anchor" href="#权限管理" aria-hidden="true">#</a> 权限管理</h1><h2 id="pipe-任务" tabindex="-1"><a class="header-anchor" href="#pipe-任务" aria-hidden="true">#</a> Pipe 任务</h2><table><thead><tr><th>权限名称</th><th>描述</th></tr></thead><tbody><tr><td>CREATE_PIPE</td><td>注册流水线。路径无关。</td></tr><tr><td>START_PIPE</td><td>开启流水线。路径无关。</td></tr><tr><td>STOP_PIPE</td><td>停止流水线。路径无关。</td></tr><tr><td>DROP_PIPE</td><td>卸载流水线。路径无关。</td></tr><tr><td>SHOW_PIPES</td><td>查询流水线。路径无关。</td></tr></tbody></table><h2 id="pipe-插件" tabindex="-1"><a class="header-anchor" href="#pipe-插件" aria-hidden="true">#</a> Pipe 插件</h2><table><thead><tr><th>权限名称</th><th>描述</th></tr></thead><tbody><tr><td>CREATE_PIPEPLUGIN</td><td>注册流水线插件。路径无关。</td></tr><tr><td>DROP_PIPEPLUGIN</td><td>开启流水线插件。路径无关。</td></tr><tr><td>SHOW_PIPEPLUGINS</td><td>查询流水线插件。路径无关。</td></tr></tbody></table><h1 id="功能特性" tabindex="-1"><a class="header-anchor" href="#功能特性" aria-hidden="true">#</a> 功能特性</h1><h2 id="最少一次语义保证-at-least-once" tabindex="-1"><a class="header-anchor" href="#最少一次语义保证-at-least-once" aria-hidden="true">#</a> 最少一次语义保证 <strong>at-least-once</strong></h2><p>数据订阅功能向外部系统传输数据时，提供 at-least-once 的传输语义。在大部分场景下，同步功能可提供 exactly-once 保证，即所有数据被恰好同步一次。</p><p>但是在以下场景中，可能存在部分数据被同步多次**（断点续传）**的情况：</p><ul><li>临时的网络故障：某次数据传输请求失败后，系统会进行重试发送，直至到达最大尝试次数</li><li>Pipe 插件逻辑实现异常：插件运行中抛出错误，系统会进行重试发送，直至到达最大尝试次数</li><li>数据节点宕机、重启等导致的数据分区切主：分区变更完成后，受影响的数据会被重新传输</li><li>集群不可用：集群可用后，受影响的数据会重新传输</li></ul><h2 id="源端-数据写入与-pipe-处理、发送数据异步解耦" tabindex="-1"><a class="header-anchor" href="#源端-数据写入与-pipe-处理、发送数据异步解耦" aria-hidden="true">#</a> 源端：数据写入与 Pipe 处理、发送数据异步解耦</h2><p>数据订阅功能中，数据传输采用的是异步复制模式。</p><p>数据订阅与写入操作完全脱钩，不存在对写入关键路径的影响。该机制允许框架在保证持续数据订阅的前提下，保持时序数据库的写入速度。</p><h2 id="源端-可自适应数据写入负载的数据传输策略" tabindex="-1"><a class="header-anchor" href="#源端-可自适应数据写入负载的数据传输策略" aria-hidden="true">#</a> 源端：可自适应数据写入负载的数据传输策略</h2><p>支持根据写入负载，动态调整数据传输方式，同步默认使用 TsFile 文件与操作流动态混合传输（<code>&#39;extractor.realtime.mode&#39;=&#39;hybrid&#39;</code>）。</p><p>在数据写入负载高时，优先选择 TsFile 传输的方式。TsFile 压缩比高，节省网络带宽。</p><p>在数据写入负载低时，优先选择操作流同步传输的方式。操作流传输实时性高。</p><h2 id="源端-高可用集群部署时-pipe-服务高可用" tabindex="-1"><a class="header-anchor" href="#源端-高可用集群部署时-pipe-服务高可用" aria-hidden="true">#</a> 源端：高可用集群部署时，Pipe 服务高可用</h2><p>当发送端 IoTDB 为高可用集群部署模式时，数据订阅服务也将是高可用的。 数据订阅框架将监控每个数据节点的数据订阅进度，并定期做轻量级的分布式一致性快照以保存同步状态。</p><ul><li>当发送端集群某数据节点宕机时，数据订阅框架可以利用一致性快照以及保存在副本上的数据快速恢复同步，以此实现数据订阅服务的高可用。</li><li>当发送端集群整体宕机并重启时，数据订阅框架也能使用快照恢复同步服务。</li></ul><h1 id="配置参数" tabindex="-1"><a class="header-anchor" href="#配置参数" aria-hidden="true">#</a> 配置参数</h1><p>在 iotdb-common.properties 中：</p><div class="language-Properties line-numbers-mode" data-ext="Properties"><pre class="language-Properties"><code>####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Window platform
# If its prefix is a drive specifier followed by &quot;\\&quot;, or if its prefix is &quot;\\\\&quot;, then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.
pipe_lib_dir=ext/pipe

# The name of the directory that stores the tsfiles temporarily hold or generated by the pipe module.
# The directory is located in the data directory of IoTDB.
pipe_hardlink_tsfile_dir_name=pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
pipe_subtask_executor_max_thread_num=5

# The number of events that need to be consumed before a checkpoint is triggered.
pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count=10000

# The time duration (in milliseconds) between checkpoints.
pipe_subtask_executor_basic_check_point_interval_by_time_duration=10000

# The maximum blocking time (in milliseconds) for the pending queue.
pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000

# The default size of ring buffer in the realtime extractor&#39;s disruptor queue.
pipe_extractor_assigner_disruptor_ring_buffer_size=65536

# The maximum number of entries the deviceToExtractorsCache can hold.
pipe_extractor_matcher_cache_size=1024

# The capacity for the number of tablet events that can be stored in the pending queue of the hybrid realtime extractor.
pipe_extractor_pending_queue_capacity=128

# The limit for the number of tablet events that can be held in the pending queue of the hybrid realtime extractor.
# Noted that: this should be less than or equals to realtimeExtractorPendingQueueCapacity
pipe_extractor_pending_queue_tablet_limit=64

# The buffer size used for reading file during file transfer.
pipe_connector_read_file_buffer_size=8388608

# The delay period (in milliseconds) between each retry when a connection failure occurs.
pipe_connector_retry_interval_ms=1000

# The size of the pending queue for the PipeConnector to store the events.
pipe_connector_pending_queue_size=1024

# The number of heartbeat loop cycles before collecting pipe meta once
pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100

# The initial delay before starting the PipeMetaSyncer service.
pipe_meta_syncer_initial_sync_delay_minutes=3

# The sync regular interval (in minutes) for the PipeMetaSyncer service.
pipe_meta_syncer_sync_interval_minutes=3
</code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div></div><!--[--><!----><!--]--><footer class="page-meta"><div class="meta-item edit-link"><a href="https://github.com/apache/iotdb-docs/edit/main/src/zh/UserGuide/latest/User-Manual/IoTDB-Data-Pipe_timecho.md" rel="noopener noreferrer" target="_blank" aria-label="发现错误？在 GitHub 上编辑此页" class="nav-link label"><!--[--><svg xmlns="http://www.w3.org/2000/svg" class="icon edit-icon" viewBox="0 0 1024 1024" fill="currentColor" aria-label="edit icon"><path d="M430.818 653.65a60.46 60.46 0 0 1-50.96-93.281l71.69-114.012 7.773-10.365L816.038 80.138A60.46 60.46 0 0 1 859.225 62a60.46 60.46 0 0 1 43.186 18.138l43.186 43.186a60.46 60.46 0 0 1 0 86.373L588.879 565.55l-8.637 8.637-117.466 68.234a60.46 60.46 0 0 1-31.958 11.229z"></path><path d="M728.802 962H252.891A190.883 190.883 0 0 1 62.008 771.98V296.934a190.883 190.883 0 0 1 190.883-192.61h267.754a60.46 60.46 0 0 1 0 120.92H252.891a69.962 69.962 0 0 0-69.098 69.099V771.98a69.962 69.962 0 0 0 69.098 69.098h475.911A69.962 69.962 0 0 0 797.9 771.98V503.363a60.46 60.46 0 1 1 120.922 0V771.98A190.883 190.883 0 0 1 728.802 962z"></path></svg><!--]-->发现错误？在 GitHub 上编辑此页<span><svg class="external-link-icon" xmlns="http://www.w3.org/2000/svg" aria-hidden="true" focusable="false" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path><polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg><span class="external-link-icon-sr-only">open in new window</span></span><!----></a></div><div class="meta-item git-info"><div class="update-time"><span class="label">上次编辑于: </span><!----></div><div class="contributors"><span class="label">贡献者: </span><!--[--><!--[--><span class="contributor" title="email: 163960898+SihanLiu2024@users.noreply.github.com">SihanLiu2024</span><!--]--><!--]--></div></div></footer><!----><!----><!--[--><!----><!--]--><!--]--></main><!--]--><footer style="padding-bottom:2rem;"><span id="doc-version" style="display:none;">latest</span><p style="text-align:center;color:#909399;font-size:12px;margin:0 30px;">Copyright © 2024 The Apache Software Foundation.<br> Apache and the Apache feather logo are trademarks of The Apache Software Foundation</p><p style="text-align:center;margin-top:10px;color:#909399;font-size:12px;margin:0 30px;"><strong>Have a question?</strong> Connect with us on QQ, WeChat, or Slack. <a href="https://github.com/apache/iotdb/issues/1995">Join the community</a> now.</p></footer></div><!--]--><!--]--><!----><!--]--></div>
    <script type="module" src="/assets/app-LsTKUu1f.js" defer></script>
  </body>
</html>
