blob: 26c6d530410767209e2c0bc3ac299827d37d304d [file] [log] [blame]
<!doctype html>
<html lang="en" dir="ltr" class="docs-wrapper docs-doc-page docs-version-1.8.0 plugin-docs plugin-id-default docs-doc-id-design_and_concept/how_to_extend_data_node_for_sort">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.3.1">
<title data-rh="true">Sort Plugin | Apache InLong</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://inlong.apache.org/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort"><meta data-rh="true" name="docusaurus_locale" content="en"><meta data-rh="true" name="docsearch:language" content="en"><meta data-rh="true" name="docusaurus_version" content="1.8.0"><meta data-rh="true" name="docusaurus_tag" content="docs-default-1.8.0"><meta data-rh="true" name="docsearch:version" content="1.8.0"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-1.8.0"><meta data-rh="true" property="og:title" content="Sort Plugin | Apache InLong"><meta data-rh="true" name="description" content="Overview"><meta data-rh="true" property="og:description" content="Overview"><link data-rh="true" rel="icon" href="/img/logo.svg"><link data-rh="true" rel="canonical" href="https://inlong.apache.org/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort" hreflang="en"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/zh-CN/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort" hreflang="zh-CN"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://YUW9QEL53E-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/blog/rss.xml" title="Apache InLong RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/blog/atom.xml" title="Apache InLong Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="Apache InLong" href="/opensearch.xml">
<script src="https://www.apachecon.com/event-images/snippet.js" async></script><link rel="stylesheet" href="/assets/css/styles.c64edd51.css">
<link rel="preload" href="/assets/js/runtime~main.63c98e82.js" as="script">
<link rel="preload" href="/assets/js/main.070aef2a.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function e(e){document.documentElement.setAttribute("data-theme",e)}var t=function(){var e=null;try{e=localStorage.getItem("theme")}catch(e){}return e}();null!==t?e(t):window.matchMedia("(prefers-color-scheme: dark)").matches?e("dark"):(window.matchMedia("(prefers-color-scheme: light)").matches,e("light"))}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="Main" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="Toggle navigation bar" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/"><div class="navbar__logo"><img src="/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--light_HNdA"><img src="/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache InLong</b></a></div><div class="navbar__items navbar__items--right"><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a class="navbar__link" aria-haspopup="true" aria-expanded="false" role="button" href="/docs/introduction">Docs</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/docs/next/introduction">Next</a></li><li><a class="dropdown__link" href="/docs/introduction">1.11.0</a></li><li><a class="dropdown__link" href="/docs/1.10.0/introduction">1.10.0</a></li><li><a class="dropdown__link" href="/docs/1.9.0/introduction">1.9.0</a></li><li><a class="dropdown__link" href="/docs/1.8.0/introduction">1.8.0</a></li><li><a class="dropdown__link" href="/versions/">All versions</a></li></ul></div><a class="navbar__item navbar__link" href="/downloads">Download</a><a class="navbar__item navbar__link" href="/community/how-to-contribute">Community</a><a class="navbar__item navbar__link" href="/blog">Blog</a><a class="navbar__item navbar__link" href="/team">Team</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">ASF</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation</a></li><li><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="dropdown__link">License</a></li><li><a href="https://www.apache.org/events/current-event" target="_blank" rel="noopener noreferrer" class="dropdown__link">Events</a></li><li><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Security</a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Sponsorship</a></li><li><a href="https://www.apache.org/foundation/policies/privacy.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Privacy</a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Thanks</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>English</a><ul class="dropdown__menu"><li><a href="/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="en">English</a></li><li><a href="/zh-CN/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="zh-CN">简体中文</a></li></ul></div><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="Switch between dark and light mode (currently light mode)" aria-label="Switch between dark and light mode (currently light mode)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="Search"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">Search</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="Scroll back to top" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="Docs sidebar" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/docs/1.8.0/introduction">Introduction</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/docs/1.8.0/design_and_concept/basic_concept">Design and Concept</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.8.0/design_and_concept/basic_concept">Basic Concept</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.8.0/design_and_concept/the_format_in_inlong">Format</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.8.0/design_and_concept/how_to_write_plugin_agent">Agent Plugin</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_sort">Sort Plugin</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.8.0/design_and_concept/how_to_write_plugin_dashboard">Dashboard Plugin</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.8.0/design_and_concept/how_to_extend_data_node_for_manager">Manager Plugin</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.8.0/design_and_concept/how_to_write_plugin_dataproxy">DataProxy Plugin</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/quick_start/how_to_build">Quick Start</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/deployment/standalone">Deployment</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/modules/agent/overview">Components</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/data_node/extract_node/overview">Data Nodes</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/sdk/dataproxy-sdk/cpp">SDK</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/user_guide/dashboard_usage">User Guide</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/development/inlong_msg">Development</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.8.0/administration/user_management">Administration</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/docs/1.8.0/contact">Contact Us</a></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="theme-doc-version-banner alert alert--warning margin-bottom--md" role="alert"><div>This is documentation for <!-- -->Apache InLong<!-- --> <b>1.8.0</b>, which is no longer actively maintained.</div><div class="margin-top--md">For up-to-date documentation, see the <b><a href="/docs/design_and_concept/how_to_extend_data_node_for_sort">latest version</a></b> (<!-- -->1.11.0<!-- -->).</div></div><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="Breadcrumbs"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="Home page" class="breadcrumbs__link" href="/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">Design and Concept</span><meta itemprop="position" content="1"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Sort Plugin</span><meta itemprop="position" content="2"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">Version: 1.8.0</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><header><h1>Sort Plugin</h1></header><h2 class="anchor anchorWithStickyNavbar_LWe7" id="overview">Overview<a href="#overview" class="hash-link" aria-label="Direct link to Overview" title="Direct link to Overview"></a></h2><p>InLong Sort is an ETL service based on Apache Flink SQL, the powerful expressive power of Flink SQL brings high scalability and flexibility.
Basically, the semantics supported by Flink SQL are supported by InLong Sort. In some scenarios, when the built-in functions of Flink SQL do not meet the requirements,
they can also be extended through various UDFs in InLong Sort. At the same time, it will be easier for those who have used SQL, especially Flink SQL, to get started.</p><p>This article describes how to extend a new source (abstracted as extract node in inlong) or a new sink (abstracted as load node in inlong) in InLong Sort.
The architecture of inlong sort can be represented by UML object relation diagram as:</p><p><img loading="lazy" alt="sort_UML" src="/assets/images/sort_uml-d90bb6f0835781e064b7417f266b7b30.png" width="2576" height="869" class="img_ev3q"></p><p>The concepts of each component are:</p><table><thead><tr><th><strong>Name</strong></th><th><strong>Description</strong></th></tr></thead><tbody><tr><td><strong>Group</strong></td><td>data flow group, including multiple data flows, one group represents one data access</td></tr><tr><td><strong>Stream</strong></td><td>data flow, a data flow has a specific flow direction</td></tr><tr><td><strong>GroupInfo</strong></td><td>encapsulation of data flow in sort. a groupinfo can contain multiple dataflowinfo</td></tr><tr><td><strong>StreamInfo</strong></td><td>abstract of data flow in sort, including various sources, transformations, destinations, etc.</td></tr><tr><td><strong>Node</strong></td><td>abstraction of data source, data transformation and data destination in data synchronization</td></tr><tr><td><strong>ExtractNode</strong></td><td>source-side abstraction for data synchronization</td></tr><tr><td><strong>TransformNode</strong></td><td>transformation process abstraction of data synchronization</td></tr><tr><td><strong>LoadNode</strong></td><td>destination abstraction for data synchronization</td></tr><tr><td><strong>NodeRelationShip</strong></td><td>abstraction of each node relationship in data synchronization</td></tr><tr><td><strong>FieldRelationShip</strong></td><td>abstraction of the relationship between upstream and downstream node fields in data synchronization</td></tr><tr><td><strong>FieldInfo</strong></td><td>node field</td></tr><tr><td><strong>MetaFieldInfo</strong></td><td>node meta fields</td></tr><tr><td><strong>Function</strong></td><td>abstraction of transformation function</td></tr><tr><td><strong>FunctionParam</strong></td><td>input parameter abstraction of function</td></tr><tr><td><strong>ConstantParam</strong></td><td>constant parameters</td></tr></tbody></table><p>To extend the extract node or load node, you need to do the following:</p><ul><li>Inherit the node class (such as MyExtractNode) and build specific extract or load usage logic;</li><li>In a specific node class (such as MyExtractNode), specify the corresponding Flink connector;</li><li>Use specific node classes in specific ETL implementation logic (such as MyExtractNode)</li></ul><p>In the second step, you can use the existing flick connector or extend it yourself. How to extend the flink connector, please refer to the official flink documentation<a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors" target="_blank" rel="noopener noreferrer">DataStream Connectors </a>.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="extend-extract-node">Extend Extract Node<a href="#extend-extract-node" class="hash-link" aria-label="Direct link to Extend Extract Node" title="Direct link to Extend Extract Node"></a></h2><p>There are three steps to extend an ExtractNode: </p><p><strong>Step 1</strong>:Inherit the ExtractNode class,the location of the class is:</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Specify the connector in the implemented ExtractNode.</p><div class="language-Java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-Java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// Inherit ExtractNode class and implement specific classes, such as MongoExtractNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@EqualsAndHashCode(callSuper = true)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonTypeName(&quot;MongoExtract&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@Data</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class MongoExtractNode extends ExtractNode implements Serializable {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonInclude(Include.NON_NULL)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonProperty(&quot;primaryKey&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private String primaryKey;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonCreator</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public MongoExtractNode(@JsonProperty(&quot;id&quot;) String id, ...) { ... }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public Map&lt;String, String&gt; tableOptions() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Map&lt;String, String&gt; options = super.tableOptions();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // configure the specified connector, here is mongodb-cdc</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;mongodb-cdc&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return options;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>Step 2</strong>:add the Extract to JsonSubTypes in ExtractNode and Node</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// add field in JsonSubTypes of ExtractNode and Node</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = MongoExtractNode.class, name = &quot;mongoExtract&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public abstract class ExtractNode implements Node{...}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = MongoExtractNode.class, name = &quot;mongoExtract&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public interface Node {...}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>Step 3</strong>:Expand the Sort connector and check whether the corresponding connector already exists in the (<code>InLong Agentinlong-sort/sort-connectors/mongodb-cdc</code>) directory. If you haven&#x27;t already,
you need to refer to the official flink documentation <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors" target="_blank" rel="noopener noreferrer">DataStream Connectors</a> to extend,
directly call the existing flink-connector (such as<code>inlong-sort/sort-connectors/mongodb-cdc</code>) or implement the related connector by yourself.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="extend-load-node">Extend Load Node<a href="#extend-load-node" class="hash-link" aria-label="Direct link to Extend Load Node" title="Direct link to Extend Load Node"></a></h2><p>There are three steps to extend an LoadNode: </p><p><strong>Step 1</strong>:Inherit the LoadNode class, the location of the class is:</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>specify the connector in the implemented LoadNode.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// Inherit LoadNode class and implement specific classes, such as KafkaLoadNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@EqualsAndHashCode(callSuper = true)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonTypeName(&quot;kafkaLoad&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@Data</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@NoArgsConstructor</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class KafkaLoadNode extends LoadNode implements Serializable {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Nonnull</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonProperty(&quot;topic&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private String topic;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonCreator</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public KafkaLoadNode(@Nonnull @JsonProperty(&quot;topic&quot;) String topic, ...) {...}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // configure and use different connectors according to different conditions</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public Map&lt;String, String&gt; tableOptions() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (StringUtils.isEmpty(this.primaryKey)) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // kafka connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;kafka&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.putAll(format.generateOptions(false));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } else {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;upsert-kafka&quot;); // upsert-kafka connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.putAll(format.generateOptions(true));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // kafka-inlong connector</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.put(&quot;connector&quot;, &quot;kafka-inlong&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> options.putAll(format.generateOptions(false));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } else {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> throw new IllegalArgumentException(&quot;kafka load Node format is IllegalArgument&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return options;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>Step 2</strong>:add the Load to JsonSubTypes in ExtractNode and Node</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">// add field in JsonSubTypes of LoadNode and Node</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = KafkaLoadNode.class, name = &quot;kafkaLoad&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public abstract class LoadNode implements Node{...}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">@JsonSubTypes({</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @JsonSubTypes.Type(value = KafkaLoadNode.class, name = &quot;kafkaLoad&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">})</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">public interface Node {...}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>Step 3</strong>:Extend the Sort connector, Kafka&#x27;s sort connector is in <code>inlong-sort/sort-connectors/kafka</code>.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="integrate-entrance">Integrate Entrance<a href="#integrate-entrance" class="hash-link" aria-label="Direct link to Integrate Entrance" title="Direct link to Integrate Entrance"></a></h2><p>To integrate extract and load into the InLong Sort mainstream, you need to implement the semantics mentioned in the overview section: group, stream, node, etc. The entry class of InLong Sort is in :</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>How to integrate extract and load into InLong Sort can refer to the following ut. First, build the corresponding extractnode and loadnode, then build noderelation, streaminfo and groupinfo, and finally use FlinkSqlParser to execute.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class MongoExtractToKafkaLoad extends AbstractTestBase {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // create MongoExtractNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private MongoExtractNode buildMongoNode() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;FieldInfo&gt; fields = Arrays.asList(new FieldInfo(&quot;name&quot;, new StringFormatInfo()), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new MongoExtractNode(..., fields, ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // create KafkaLoadNode</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private KafkaLoadNode buildAllMigrateKafkaNode() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;FieldInfo&gt; fields = Arrays.asList(new FieldInfo(&quot;name&quot;, new StringFormatInfo()), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;FieldRelation&gt; relations = Arrays.asList(new FieldRelation(new FieldInfo(&quot;name&quot;, new StringFormatInfo()), ...), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> CsvFormat csvFormat = new CsvFormat();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new KafkaLoadNode(..., fields, relations, csvFormat, ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // create NodeRelation</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private NodeRelation buildNodeRelation(List&lt;Node&gt; inputs, List&lt;Node&gt; outputs) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;String&gt; inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;String&gt; outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new NodeRelation(inputIds, outputIds);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> // test the main flow: mongodb to kafka</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> @Test</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public void testMongoDbToKafka() throws Exception {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> EnvironmentSettings settings = EnvironmentSettings. ... .build();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ...</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Node inputNode = buildMongoNode();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Node outputNode = buildAllMigrateKafkaNode();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> StreamInfo streamInfo = new StreamInfo(&quot;1&quot;, Arrays.asList(inputNode, outputNode), ...);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> GroupInfo groupInfo = new GroupInfo(&quot;1&quot;, Collections.singletonList(streamInfo));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ParseResult result = parser.parse();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Assert.assertTrue(result.tryExecute());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/inlong-website/edit/master/versioned_docs/version-1.8.0/design_and_concept/how_to_extend_data_node_for_sort.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/docs/1.8.0/design_and_concept/how_to_write_plugin_agent"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">Agent Plugin</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/docs/1.8.0/design_and_concept/how_to_write_plugin_dashboard"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">Dashboard Plugin</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#overview" class="table-of-contents__link toc-highlight">Overview</a></li><li><a href="#extend-extract-node" class="table-of-contents__link toc-highlight">Extend Extract Node</a></li><li><a href="#extend-load-node" class="table-of-contents__link toc-highlight">Extend Load Node</a></li><li><a href="#integrate-entrance" class="table-of-contents__link toc-highlight">Integrate Entrance</a></li></ul></div></div></div></div></main></div></div><footer class="footer"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">Events</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apachecon.com/" target="_blank" rel="noopener noreferrer" class="footer__link-item">ApacheCon<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a class="acevent" data-format="square" data-mode="dark" data-event="random"></a></li></ul></div><div class="col footer__col"><div class="footer__title">Community</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheInlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://inlong.apache.org/img/apache-inlong-wechat.jpg" target="_blank" rel="noopener noreferrer" class="footer__link-item">WeChat<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="mailto:dev@inlong.apache.org" target="_blank" rel="noopener noreferrer" class="footer__link-item">Email</a></li></ul></div><div class="col footer__col"><div class="footer__title">More</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/blog">Blog</a></li><li class="footer__item"><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><img src="/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></div><div class="footer__copyright"><div style="font-family: Avenir-Medium;font-size: 14px;color: #999;">
<div>Copyright © 2020-2024 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div>
<div style="margin-top: 20px; padding-top: 20px; border-top: 1px solid #666;line-height: 20px;">The Apache Software Foundation Apache InLong, InLong, Apache, the Apache feather, and the Apache InLong project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div>
</div></div></div></div></footer></div>
<script src="/assets/js/runtime~main.63c98e82.js"></script>
<script src="/assets/js/main.070aef2a.js"></script>
</body>
</html>