blob: efc3a764956d6f1f9521919aef0aa50eabc8f3de [file] [log] [blame]
<!doctype html>
<html lang="zh-CN" dir="ltr" class="docs-wrapper docs-doc-page docs-version-1.10.0 plugin-docs plugin-id-default docs-doc-id-design_and_concept/how_to_extend_data_node_for_sort">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.3.1">
<title data-rh="true">Sort 插件 | Apache InLong</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://inlong.apache.org/zh-CN/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort"><meta data-rh="true" name="docusaurus_locale" content="zh-CN"><meta data-rh="true" name="docsearch:language" content="zh-CN"><meta data-rh="true" name="docusaurus_version" content="1.10.0"><meta data-rh="true" name="docusaurus_tag" content="docs-default-1.10.0"><meta data-rh="true" name="docsearch:version" content="1.10.0"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-1.10.0"><meta data-rh="true" property="og:title" content="Sort 插件 | Apache InLong"><meta data-rh="true" name="description" content="总览"><meta data-rh="true" property="og:description" content="总览"><link data-rh="true" rel="icon" href="/zh-CN/img/logo.svg"><link data-rh="true" rel="canonical" href="https://inlong.apache.org/zh-CN/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort" hreflang="en"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/zh-CN/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort" hreflang="zh-CN"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://YUW9QEL53E-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/zh-CN/blog/rss.xml" title="Apache InLong RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/zh-CN/blog/atom.xml" title="Apache InLong Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="Apache InLong" href="/zh-CN/opensearch.xml">
<script src="https://www.apachecon.com/event-images/snippet.js" async></script><link rel="stylesheet" href="/zh-CN/assets/css/styles.09deabdb.css">
<link rel="preload" href="/zh-CN/assets/js/runtime~main.933f32cb.js" as="script">
<link rel="preload" href="/zh-CN/assets/js/main.32851df5.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function e(e){document.documentElement.setAttribute("data-theme",e)}var t=function(){var e=null;try{e=localStorage.getItem("theme")}catch(e){}return e}();null!==t?e(t):window.matchMedia("(prefers-color-scheme: dark)").matches?e("dark"):(window.matchMedia("(prefers-color-scheme: light)").matches,e("light"))}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="主导航" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="切换导航栏" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/zh-CN/"><div class="navbar__logo"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--light_HNdA"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache InLong</b></a></div><div class="navbar__items navbar__items--right"><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a class="navbar__link" aria-haspopup="true" aria-expanded="false" role="button" href="/zh-CN/docs/introduction">文档</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/zh-CN/docs/next/introduction">Next</a></li><li><a class="dropdown__link" href="/zh-CN/docs/introduction">1.11.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.10.0/introduction">1.10.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.9.0/introduction">1.9.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.8.0/introduction">1.8.0</a></li><li><a class="dropdown__link" href="/zh-CN/versions/">All versions</a></li></ul></div><a class="navbar__item navbar__link" href="/zh-CN/downloads">下载</a><a class="navbar__item navbar__link" href="/zh-CN/community/how-to-contribute">社区</a><a class="navbar__item navbar__link" href="/zh-CN/blog">博客</a><a class="navbar__item navbar__link" href="/zh-CN/team">团队</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Apache</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation</a></li><li><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="dropdown__link">证书</a></li><li><a href="https://www.apache.org/events/current-event" target="_blank" rel="noopener noreferrer" class="dropdown__link">事件</a></li><li><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="dropdown__link">安全</a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">赞助</a></li><li><a href="https://www.apache.org/foundation/policies/privacy.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Privacy</a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">致谢</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>简体中文</a><ul class="dropdown__menu"><li><a href="/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="en">English</a></li><li><a href="/zh-CN/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="zh-CN">简体中文</a></li></ul></div><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="切换浅色/暗黑模式(当前为浅色模式)" aria-label="切换浅色/暗黑模式(当前为浅色模式)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="搜索"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">搜索</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="回到顶部" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="文档侧边栏" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/1.10.0/introduction">简介</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/zh-CN/docs/1.10.0/design_and_concept/basic_concept">设计和概念</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/basic_concept">基本概念</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/the_format_in_inlong">Format</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_write_plugin_agent">Agent 插件</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_sort">Sort 插件</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_write_plugin_dashboard">Dashboard 插件</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_extend_data_node_for_manager">Manager 插件</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_write_plugin_dataproxy">DataProxy 插件</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/quick_start/how_to_build">快速开始</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/deployment/standalone">安装部署</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/modules/agent/overview">组件介绍</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/data_node/extract_node/overview">数据节点</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/sdk/dataproxy-sdk/cpp">SDK</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/user_guide/dashboard_usage">用户指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/development/inlong_msg">开发指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/1.10.0/administration/user_management">Administration</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/1.10.0/contact">联系我们</a></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="theme-doc-version-banner alert alert--warning margin-bottom--md" role="alert"><div>This is documentation for <!-- -->Apache InLong<!-- --> <b>1.10.0</b>, which is no longer actively maintained.</div><div class="margin-top--md">For up-to-date documentation, see the <b><a href="/zh-CN/docs/design_and_concept/how_to_extend_data_node_for_sort">latest version</a></b> (<!-- -->1.11.0<!-- -->).</div></div><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="页面路径"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="主页面" class="breadcrumbs__link" href="/zh-CN/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">设计和概念</span><meta itemprop="position" content="1"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Sort 插件</span><meta itemprop="position" content="2"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">版本:1.10.0</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><header><h1>Sort 插件</h1></header><h2 class="anchor anchorWithStickyNavbar_LWe7" id="总览">总览<a href="#总览" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>InLong Sort 是一个基于 Apache Flink SQL 的 ETL 服务。Flink SQL 强大的表达能力带来的高可扩展性、灵活性,基本上 Flink SQL 支持的语意,InLong Sort 都支持。
当 Flink SQL 内置的函数不满足需求时,还可通过 UDF 来扩展。这对于曾经使用过 SQL 尤其是 Flink SQL 的开发者非常友好。</p><p>本文介绍如何在 InLong Sort 中扩展一个新的 source(在 InLong 中抽象为 Extract Node)或一个新的 sink(在InLong中抽象为 Load Node )。
InLong Sort 架构的 UML 对象关系图如下:</p><p><img loading="lazy" alt="sort_uml" src="/zh-CN/assets/images/sort_uml-d90bb6f0835781e064b7417f266b7b30.png" width="2576" height="869" class="img_ev3q"></p><p>其中各个组件的概念为:</p><table><thead><tr><th><strong>名称</strong></th><th><strong>描述</strong></th></tr></thead><tbody><tr><td>Group</td><td>数据流组,包含多个数据流,一个 Group 代表一个数据接入</td></tr><tr><td>Stream</td><td>数据流,一个数据流有具体的流向</td></tr><tr><td>GroupInfo</td><td>Sort 中对数据流向的封装,一个 GroupInfo 可包含多个 DataFlowInfo</td></tr><tr><td>StreamInfo</td><td>Sort 中数据流向的抽象,包含该数据流的各种来源、转换、去向等</td></tr><tr><td>Node</td><td>数据同步中数据源、数据转换、数据去向的抽象</td></tr><tr><td>ExtractNode</td><td>数据同步的来源端抽象</td></tr><tr><td>TransformNode</td><td>数据同步的转换过程抽象</td></tr><tr><td>LoadNode</td><td>数据同步的去向端抽象</td></tr><tr><td>NodeRelationShip</td><td>数据同步中各个节点关系抽象</td></tr><tr><td>FieldRelationShip</td><td>数据同步中上下游节点字段间关系的抽象</td></tr><tr><td>FieldInfo</td><td>节点字段</td></tr><tr><td>MetaFieldInfo</td><td>节点 Meta 字段</td></tr><tr><td>Function</td><td>转换函数的抽象</td></tr><tr><td>FunctionParam</td><td>函数的入参抽象</td></tr><tr><td>ConstantParam</td><td>常量参数</td></tr></tbody></table><p>扩展 Extract Node 或 Load Node 需要做的工作是:</p><ul><li>继承 Node 类(例如 MyExtractNode),构建具体的 extract 或 load 使用逻辑;</li><li>在具体的 Node 类(例如 MyExtractNode)中,指定对应 Flink connector;</li><li>在具体的 ETL 实现逻辑中使用具体的 Node 类(例如 MyExtractNode)。</li></ul><p>其中第二步中可以使用已有的 Flink Connector,或者用户自己扩展,如何扩展 Flink Connector 请参考 Flink 官方文档<a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors" target="_blank" rel="noopener noreferrer">DataStream Connectors </a>.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="扩展-extract-node">扩展 Extract Node<a href="#扩展-extract-node" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>扩展一个 ExtractNode 分为三个步骤:</p><p><strong>第一步</strong>:继承 ExtractNode 类,类的位置在:</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>在实现的 ExtractNode 中指定 connector;</p><div class="language-Java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-Java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// 继承 ExtractNode 类,实现具体的类,例如 MongoExtractNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@EqualsAndHashCode(callSuper = true)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonTypeName(&quot;MongoExtract&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@Data</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class MongoExtractNode extends ExtractNode implements Serializable {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonInclude(Include.NON_NULL)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonProperty(&quot;primaryKey&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private String primaryKey;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonCreator</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public MongoExtractNode(@JsonProperty(&quot;id&quot;) String id, ...) { ... }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public Map&lt;String, String&gt; tableOptions() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Map&lt;String, String&gt; options = super.tableOptions();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // 配置指定的 connector,这里指定的是 mongodb-cdc</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;mongodb-cdc&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return options;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>第二步</strong>:在 ExtractNode 和 Node 中的 JsonSubTypes 添加该 Extract</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// 在 ExtractNode 和 Node 的 JsonSubTypes 中添加字段</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = MongoExtractNode.class, name = &quot;mongoExtract&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public abstract class ExtractNode implements Node{...}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = MongoExtractNode.class, name = &quot;mongoExtract&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public interface Node {...}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>第三步</strong>:扩展 Sort Connector,查看此(<code>inlong-sort/sort-connectors/mongodb-cdc</code>)目录下是否已经存在对应的 connector。如果没有,则需要参考 Flink 官方文档 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors" target="_blank" rel="noopener noreferrer">DataStream Connectors </a> 来扩展,
调用已有的 Flink-connector(例如<code>inlong-sort/sort-connectors/mongodb-cdc</code>)或自行实现相关的 connector 均可。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="扩展-load-node">扩展 Load Node<a href="#扩展-load-node" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>扩展一个 LoadNode 分为三个步骤:</p><p><strong>第一步</strong>:继承 LoadNode 类,类的位置在:</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>在实现的LoadNode 中指定 connector;</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// 继承 LoadNode 类,实现具体的类,例如 KafkaLoadNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@EqualsAndHashCode(callSuper = true)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonTypeName(&quot;kafkaLoad&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@Data</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@NoArgsConstructor</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class KafkaLoadNode extends LoadNode implements Serializable {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Nonnull</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonProperty(&quot;topic&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private String topic;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonCreator</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public KafkaLoadNode(@Nonnull @JsonProperty(&quot;topic&quot;) String topic, ...) {...}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // 根据不同的条件配置使用不同的 connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public Map&lt;String, String&gt; tableOptions() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (StringUtils.isEmpty(this.primaryKey)) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // kafka connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;kafka&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.putAll(format.generateOptions(false));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } else {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // upsert-kafka connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;upsert-kafka&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.putAll(format.generateOptions(true));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // kafka-inlong connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;kafka-inlong&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.putAll(format.generateOptions(false));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } else {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> throw new IllegalArgumentException(&quot;kafka load Node format is IllegalArgument&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return options;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>第二步</strong>:在 LoadNode 和 Node 中的 JsonSubTypes 添加该 Load</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// 在 LoadNode 和 Node 的 JsonSubTypes 中添加字段</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = KafkaLoadNode.class, name = &quot;kafkaLoad&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public abstract class LoadNode implements Node{...}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = KafkaLoadNode.class, name = &quot;kafkaLoad&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public interface Node {...}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>第三步</strong>:扩展 Sort Connector,Kafka 的 sort connector 在 <code>inlong-sort/sort-connectors/kafka</code> 目录下。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="集成到-entrance">集成到 Entrance<a href="#集成到-entrance" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>将 Extract 和 Load 集成到 InLong Sort 主流程中,需要构建总览小节中提到的语意:Group、Stream、Node 等。
InLong Sort 的入口类在:</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Extract 和 Load 如何集成至 InLong Sort,可参考下面的 UT,首先构建对应的 ExtractNode、LoadNode,再构建 NodeRelation、StreamInfo、GroupInfo,最后通过 FlinkSqlParser 执行。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class MongoExtractToKafkaLoad extends AbstractTestBase {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // 构建 MongoExtractNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private MongoExtractNode buildMongoNode() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;FieldInfo&gt; fields = Arrays.asList(new FieldInfo(&quot;name&quot;, new StringFormatInfo()), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new MongoExtractNode(..., fields, ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // 构建 KafkaLoadNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private KafkaLoadNode buildAllMigrateKafkaNode() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;FieldInfo&gt; fields = Arrays.asList(new FieldInfo(&quot;name&quot;, new StringFormatInfo()), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;FieldRelation&gt; relations = Arrays.asList(new FieldRelation(new FieldInfo(&quot;name&quot;, new StringFormatInfo()), ...), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> CsvFormat csvFormat = new CsvFormat();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new KafkaLoadNode(..., fields, relations, csvFormat, ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // 构建 NodeRelation</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private NodeRelation buildNodeRelation(List&lt;Node&gt; inputs, List&lt;Node&gt; outputs) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;String&gt; inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;String&gt; outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new NodeRelation(inputIds, outputIds);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // 测试主流程 MongoDB to Kafka</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Test</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public void testMongoDbToKafka() throws Exception {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> EnvironmentSettings settings = EnvironmentSettings. ... .build();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Node inputNode = buildMongoNode();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Node outputNode = buildAllMigrateKafkaNode();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> StreamInfo streamInfo = new StreamInfo(&quot;1&quot;, Arrays.asList(inputNode, outputNode), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> GroupInfo groupInfo = new GroupInfo(&quot;1&quot;, Collections.singletonList(streamInfo));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ParseResult result = parser.parse();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Assert.assertTrue(result.tryExecute());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/inlong-website/edit/master/versioned_docs/version-1.10.0/design_and_concept/how_to_extend_data_node_for_sort.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_write_plugin_agent"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">Agent 插件</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/zh-CN/docs/1.10.0/design_and_concept/how_to_write_plugin_dashboard"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">Dashboard 插件</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#总览" class="table-of-contents__link toc-highlight">总览</a></li><li><a href="#扩展-extract-node" class="table-of-contents__link toc-highlight">扩展 Extract Node</a></li><li><a href="#扩展-load-node" class="table-of-contents__link toc-highlight">扩展 Load Node</a></li><li><a href="#集成到-entrance" class="table-of-contents__link toc-highlight">集成到 Entrance</a></li></ul></div></div></div></div></main></div></div><footer class="footer"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">事件</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apachecon.com/" target="_blank" rel="noopener noreferrer" class="footer__link-item">ApacheCon<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a class="acevent" data-format="square" data-mode="dark" data-event="random"></a></li></ul></div><div class="col footer__col"><div class="footer__title">社区</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheInlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://inlong.apache.org/img/apache-inlong-wechat.jpg" target="_blank" rel="noopener noreferrer" class="footer__link-item">WeChat<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="mailto:dev@inlong.apache.org" target="_blank" rel="noopener noreferrer" class="footer__link-item">Email</a></li></ul></div><div class="col footer__col"><div class="footer__title">更多</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/zh-CN/blog">Blog</a></li><li class="footer__item"><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></div><div class="footer__copyright"><div style="font-family: Avenir-Medium;font-size: 14px;color: #999;">
<div>Copyright © 2020-2024 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div>
<div style="margin-top: 20px; padding-top: 20px; border-top: 1px solid #666;line-height: 20px;">The Apache Software Foundation Apache InLong, InLong, Apache, the Apache feather, and the Apache InLong project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div>
</div></div></div></div></footer></div>
<script src="/zh-CN/assets/js/runtime~main.933f32cb.js"></script>
<script src="/zh-CN/assets/js/main.32851df5.js"></script>
</body>
</html>