blob: e1fae26bfd4cb952e5ffd4476d7740508003bc94 [file] [log] [blame]
<!doctype html>
<html lang="en" dir="ltr" class="docs-wrapper docs-doc-page docs-version-1.5.0 plugin-docs plugin-id-default docs-doc-id-data_node/load_node/kafka">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.3.1">
<title data-rh="true">Kafka | Apache InLong</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://inlong.apache.org/docs/1.5.0/data_node/load_node/kafka"><meta data-rh="true" name="docusaurus_locale" content="en"><meta data-rh="true" name="docsearch:language" content="en"><meta data-rh="true" name="docusaurus_version" content="1.5.0"><meta data-rh="true" name="docusaurus_tag" content="docs-default-1.5.0"><meta data-rh="true" name="docsearch:version" content="1.5.0"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-1.5.0"><meta data-rh="true" property="og:title" content="Kafka | Apache InLong"><meta data-rh="true" name="description" content="Overview"><meta data-rh="true" property="og:description" content="Overview"><link data-rh="true" rel="icon" href="/img/logo.svg"><link data-rh="true" rel="canonical" href="https://inlong.apache.org/docs/1.5.0/data_node/load_node/kafka"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/1.5.0/data_node/load_node/kafka" hreflang="en"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/zh-CN/docs/1.5.0/data_node/load_node/kafka" hreflang="zh-CN"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/1.5.0/data_node/load_node/kafka" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://YUW9QEL53E-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/blog/rss.xml" title="Apache InLong RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/blog/atom.xml" title="Apache InLong Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="Apache InLong" href="/opensearch.xml">
<script src="https://www.apachecon.com/event-images/snippet.js" async></script><link rel="stylesheet" href="/assets/css/styles.c64edd51.css">
<link rel="preload" href="/assets/js/runtime~main.63c98e82.js" as="script">
<link rel="preload" href="/assets/js/main.070aef2a.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function e(e){document.documentElement.setAttribute("data-theme",e)}var t=function(){var e=null;try{e=localStorage.getItem("theme")}catch(e){}return e}();null!==t?e(t):window.matchMedia("(prefers-color-scheme: dark)").matches?e("dark"):(window.matchMedia("(prefers-color-scheme: light)").matches,e("light"))}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="Main" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="Toggle navigation bar" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/"><div class="navbar__logo"><img src="/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--light_HNdA"><img src="/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache InLong</b></a></div><div class="navbar__items navbar__items--right"><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a class="navbar__link" aria-haspopup="true" aria-expanded="false" role="button" href="/docs/introduction">Docs</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/docs/next/introduction">Next</a></li><li><a class="dropdown__link" href="/docs/introduction">1.11.0</a></li><li><a class="dropdown__link" href="/docs/1.10.0/introduction">1.10.0</a></li><li><a class="dropdown__link" href="/docs/1.9.0/introduction">1.9.0</a></li><li><a class="dropdown__link" href="/docs/1.8.0/introduction">1.8.0</a></li><li><a class="dropdown__link" href="/versions/">All versions</a></li></ul></div><a class="navbar__item navbar__link" href="/downloads">Download</a><a class="navbar__item navbar__link" href="/community/how-to-contribute">Community</a><a class="navbar__item navbar__link" href="/blog">Blog</a><a class="navbar__item navbar__link" href="/team">Team</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">ASF</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation</a></li><li><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="dropdown__link">License</a></li><li><a href="https://www.apache.org/events/current-event" target="_blank" rel="noopener noreferrer" class="dropdown__link">Events</a></li><li><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Security</a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Sponsorship</a></li><li><a href="https://www.apache.org/foundation/policies/privacy.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Privacy</a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Thanks</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>English</a><ul class="dropdown__menu"><li><a href="/docs/1.5.0/data_node/load_node/kafka" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="en">English</a></li><li><a href="/zh-CN/docs/1.5.0/data_node/load_node/kafka" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="zh-CN">简体中文</a></li></ul></div><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="Switch between dark and light mode (currently light mode)" aria-label="Switch between dark and light mode (currently light mode)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="Search"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">Search</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="Scroll back to top" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="Docs sidebar" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/docs/1.5.0/introduction">Introduction</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/design_and_concept/basic_concept">Design and Concept</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/quick_start/how_to_build">Quick Start</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/deployment/standalone">Deployment</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/modules/agent/overview">Components</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/docs/1.5.0/data_node/extract_node/overview">Data Nodes</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/docs/1.5.0/data_node/extract_node/overview">Extract Nodes</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" tabindex="0" href="/docs/1.5.0/data_node/load_node/overview">Load Nodes</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/overview">Overview</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/auto_consumption">Auto Consumption</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/clickhouse">ClickHouse</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/elasticsearch">Elasticsearch</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/greenplum">Greenplum</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/hbase">HBase</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/hdfs">HDFS</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/hive">Hive</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/iceberg">Iceberg</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/docs/1.5.0/data_node/load_node/kafka">Kafka</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/mysql">MySQL</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/oracle">Oracle</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/postgresql">PostgreSQL</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/sqlserver">SQLServer</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/tdsql-postgresql">TDSQL-PostgreSQL</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/doris">Doris</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/starrocks">StarRocks</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/1.5.0/data_node/load_node/hudi">Hudi</a></li></ul></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/sdk/dataproxy-sdk/cpp">SDK</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/user_guide/dashboard_usage">User Guide</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/development/inlong_msg">Development</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/1.5.0/administration/user_management">Administration</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/docs/1.5.0/contact">Contact Us</a></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="theme-doc-version-banner alert alert--warning margin-bottom--md" role="alert"><div>This is documentation for <!-- -->Apache InLong<!-- --> <b>1.5.0</b>, which is no longer actively maintained.</div><div class="margin-top--md">For up-to-date documentation, see the <b><a href="/docs/data_node/load_node/kafka">latest version</a></b> (<!-- -->1.11.0<!-- -->).</div></div><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="Breadcrumbs"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="Home page" class="breadcrumbs__link" href="/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">Data Nodes</span><meta itemprop="position" content="1"></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">Load Nodes</span><meta itemprop="position" content="2"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Kafka</span><meta itemprop="position" content="3"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">Version: 1.5.0</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><header><h1>Kafka</h1></header><h2 class="anchor anchorWithStickyNavbar_LWe7" id="overview">Overview<a href="#overview" class="hash-link" aria-label="Direct link to Overview" title="Direct link to Overview"></a></h2><p>The <code>Kafka Load Node</code> supports to write data into Kafka topics. It can support to write data in the normal fashion and write data in the
upsert fashion. The <code>upsert-kafka</code> connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as
normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="supported-version">Supported Version<a href="#supported-version" class="hash-link" aria-label="Direct link to Supported Version" title="Direct link to Supported Version"></a></h2><table><thead><tr><th>Load Node</th><th>Kafka version</th></tr></thead><tbody><tr><td><a href="/docs/1.5.0/data_node/load_node/kafka">Kafka</a></td><td>0.10+</td></tr></tbody></table><h2 class="anchor anchorWithStickyNavbar_LWe7" id="dependencies">Dependencies<a href="#dependencies" class="hash-link" aria-label="Direct link to Dependencies" title="Direct link to Dependencies"></a></h2><p>In order to set up the <code>Kafka Load Node</code>, the following provides dependency information for both projects using a
build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="maven-dependency">Maven dependency<a href="#maven-dependency" class="hash-link" aria-label="Direct link to Maven dependency" title="Direct link to Maven dependency"></a></h3><div class="codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-text codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">&lt;dependency&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &lt;groupId&gt;org.apache.inlong&lt;/groupId&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &lt;artifactId&gt;sort-connector-kafka&lt;/artifactId&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &lt;version&gt;1.5.0&lt;/version&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">&lt;/dependency&gt;</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="how-to-create-a-kafka-load-node">How to create a Kafka Load Node<a href="#how-to-create-a-kafka-load-node" class="hash-link" aria-label="Direct link to How to create a Kafka Load Node" title="Direct link to How to create a Kafka Load Node"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="usage-for-sql-api">Usage for SQL API<a href="#usage-for-sql-api" class="hash-link" aria-label="Direct link to Usage for SQL API" title="Direct link to Usage for SQL API"></a></h3><p>The example below shows how to create a Kafka Load Node with <code>Flink SQL</code> :</p><ul><li>connector is <code>kafka-inlong</code></li></ul><div class="language-sql codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-sql codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token comment" style="color:rgb(98, 114, 164)">-- Create a Kafka table &#x27;kafka_load_node&#x27; in Flink SQL</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">Flink </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">SQL</span><span class="token operator">&gt;</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">CREATE</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">TABLE</span><span class="token plain"> kafka_load_node </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">id</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">INT</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">name</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> STRINTG</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">WITH</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;connector&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;kafka-inlong&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;topic&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;user&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;properties.bootstrap.servers&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;localhost:9092&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;properties.group.id&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;testGroup&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;format&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;csv&#x27;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><ul><li>connector is <code>upsert-kafka</code></li></ul><div class="language-sql codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-sql codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token comment" style="color:rgb(98, 114, 164)">-- Create a Kafka table &#x27;kafka_load_node&#x27; in Flink SQL</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">Flink </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">SQL</span><span class="token operator">&gt;</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">CREATE</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">TABLE</span><span class="token plain"> kafka_load_node </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">id</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">INT</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">name</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> STRINTG</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">PRIMARY</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">KEY</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">id</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><span class="token operator">NOT</span><span class="token plain"> ENFORCED</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">WITH</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;connector&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;upsert-kafka-inlong&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;topic&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;user&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;properties.bootstrap.servers&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;localhost:9092&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;key.format&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;csv&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;value.format&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;csv&#x27;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="usage-for-inlong-dashboard">Usage for InLong Dashboard<a href="#usage-for-inlong-dashboard" class="hash-link" aria-label="Direct link to Usage for InLong Dashboard" title="Direct link to Usage for InLong Dashboard"></a></h3><p>When creating a data flow, select <code>Kafka</code> for the data stream direction, and click &quot;Add&quot; to configure it.</p><p><img loading="lazy" alt="Kafka Configuration" src="/assets/images/kafka-71655dc487b667900ea52a6c279612b8.png" width="1488" height="661" class="img_ev3q"></p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="usage-for-inlong-manager-client">Usage for InLong Manager Client<a href="#usage-for-inlong-manager-client" class="hash-link" aria-label="Direct link to Usage for InLong Manager Client" title="Direct link to Usage for InLong Manager Client"></a></h3><p>TODO: It will be supported in the future.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="kafka-load-node-options">Kafka Load Node Options<a href="#kafka-load-node-options" class="hash-link" aria-label="Direct link to Kafka Load Node Options" title="Direct link to Kafka Load Node Options"></a></h2><table><thead><tr><th>Option</th><th>Required</th><th>Default</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td>connector</td><td>required</td><td>(none)</td><td>String</td><td>Specify which connector to use, valid values are: 1. for the Upsert Kafka use: <code>upsert-kafka-inlong</code> 2. for normal Kafka use: <code>kafka-inlong</code></td></tr><tr><td>topic</td><td>required</td><td>(none)</td><td>String</td><td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <code>topic-1;topic-2</code>. Note, only one of <code>topic-pattern</code> and <code>topic</code> can be specified for sources.</td></tr><tr><td>topic-pattern</td><td>optional</td><td>(none)</td><td>String</td><td>Dynamic topic extraction pattern, like &#x27;${VARIABLE_NAME}&#x27;, which is only used in kafka multiple sink scenarios and is valid when &#x27;format&#x27; is &#x27;raw&#x27;.</td></tr><tr><td>sink.multiple.format</td><td>optional</td><td>(none)</td><td>String</td><td>Format of kafka raw data, currently only supports <!-- -->[canal-json<!-- -->|<!-- -->debezium-json]<!-- --> which is only used in kafka multiple sink scenarios and is valid when &#x27;format&#x27; is &#x27;raw&#x27;.</td></tr><tr><td>properties.bootstrap.servers</td><td>required</td><td>(none)</td><td>String</td><td>Comma separated list of Kafka brokers.</td></tr><tr><td>properties.*</td><td>optional</td><td>(none)</td><td>String</td><td>This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration" target="_blank" rel="noopener noreferrer">Kafka Configuration documentation</a>. Flink will remove the <code>properties.</code> key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <code>properties.allow.auto.create.topics</code> = <code>false</code>. But there are some configurations that do not support to set, because Flink will override them, e.g. <code>key.deserializer</code> and <code>value.deserializer</code>.</td></tr><tr><td>format</td><td>required for normal Kafka</td><td>(none)</td><td>String</td><td>The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">format</a> options. Note: Either this option or the <code>value.format</code> option are required.</td></tr><tr><td>key.format</td><td>optional</td><td>(none)</td><td>String</td><td>The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the &#x27;key.fields&#x27; option is required as well. Otherwise the Kafka records will have an empty key.</td></tr><tr><td>key.fields</td><td>optional</td><td>[]</td><td><code>List&lt;String&gt;</code></td><td>Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like &#x27;field1;field2&#x27;.</td></tr><tr><td>key.fields-prefix</td><td>optional</td><td>(none)</td><td>String</td><td>Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and &#x27;key.fields&#x27; will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that &#x27;value.fields-include&#x27; must be set to &#x27;EXCEPT_KEY&#x27;.</td></tr><tr><td>value.format</td><td>required for upsert Kafka</td><td>(none)</td><td>String</td><td>The <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">format</a> used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options.</td></tr><tr><td>value.fields-include</td><td>optional</td><td>ALL</td><td>Enum Possible values: <!-- -->[ALL, EXCEPT_KEY]</td><td>Defines a strategy how to deal with key columns in the data type of the value format. By default, &#x27;ALL&#x27; physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format</td></tr><tr><td>sink.partitioner</td><td>optional</td><td>&#x27;default&#x27;</td><td>String</td><td>Output partitioning from Flink&#x27;s partitions into Kafka&#x27;s partitions. Valid values are <br><code>default</code>: use the kafka default partitioner to partition records. <br><code>fixed</code>: each Flink partition ends up in at most one Kafka partition. <br><code>round-robin</code>: a Flink partition is distributed to Kafka partitions sticky round-robin. <br>raw-hash: Extract value based on &#x27;sink.multiple.partition-pattern&#x27; to &#x27;hash&#x27; as the final partition, which is only used in kafka multiple sink scenarios and is valid when &#x27;format&#x27; is &#x27;raw&#x27;. It only works when record&#x27;s keys are not specified. Custom FlinkKafkaPartitioner subclass: e.g. &#x27;org.mycompany.MyPartitioner&#x27;. See the following <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#sink-partitioning" target="_blank" rel="noopener noreferrer">Sink Partitioning</a> for more details.</td></tr><tr><td>sink.multiple.partition-pattern</td><td>optional</td><td>(none)</td><td>String</td><td>Dynamic partition extraction pattern, like &#x27;${VARIABLE_NAME}&#x27; which is only used in kafka multiple sink scenarios and is valid when &#x27;format&#x27; is &#x27;raw&#x27;.</td></tr><tr><td>sink.semantic</td><td>optional</td><td>at-least-once</td><td>String</td><td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are &#x27;at-least-once&#x27;, &#x27;exactly-once&#x27; and &#x27;none&#x27;. See <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#consistency-guarantees" target="_blank" rel="noopener noreferrer">Consistency guarantees</a> for more details.</td></tr><tr><td>sink.parallelism</td><td>optional</td><td>(none)</td><td>Integer</td><td>Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td></tr><tr><td>inlong.metric.labels</td><td>optional</td><td>(none)</td><td>String</td><td>Inlong metric label, format of value is groupId=<code>{groupId}</code>&amp;streamId=<code>{streamId}</code>&amp;nodeId=<code>{nodeId}</code>.</td></tr></tbody></table><h2 class="anchor anchorWithStickyNavbar_LWe7" id="available-metadata">Available Metadata<a href="#available-metadata" class="hash-link" aria-label="Direct link to Available Metadata" title="Direct link to Available Metadata"></a></h2><p>It supports write metadata for format <code>canal-json-inlong</code>.</p><p>See the <a href="/docs/1.5.0/data_node/extract_node/kafka">Kafka Extract Node</a> for a list of all available metadata fields.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="features">Features<a href="#features" class="hash-link" aria-label="Direct link to Features" title="Direct link to Features"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="support-dynamic-schema-writing">Support Dynamic Schema Writing<a href="#support-dynamic-schema-writing" class="hash-link" aria-label="Direct link to Support Dynamic Schema Writing" title="Direct link to Support Dynamic Schema Writing"></a></h3><p>Dynamic schema writing supports dynamic extraction of topic and partition from data and writes to the corresponding topic
and partition. In order to support dynamic schema writing, you need to set the format of Kafka to &#x27;raw&#x27;,
Also need to set the serialization format of the upstream data (via the option &#x27;sink.multiple.format&#x27;
to set, currently only supports <!-- -->[canal-json|debezium-json]<!-- -->).</p><h4 class="anchor anchorWithStickyNavbar_LWe7" id="dynamic-topic-extraction">Dynamic Topic Extraction<a href="#dynamic-topic-extraction" class="hash-link" aria-label="Direct link to Dynamic Topic Extraction" title="Direct link to Dynamic Topic Extraction"></a></h4><p>Dynamic topic extraction is by parsing the topic pattern and extracting the topic from the data.
In order to support dynamic extraction of topic, you need to set the option &#x27;topic-pattern&#x27;, Kafka Load Node will parse &#x27;topic-pattern&#x27; as the final topic,
If parsing fails, it will be written to the default topic set via &#x27;topic&#x27;. &#x27;topic-pattern&#x27; supports constants and variables, constants are string constants,
variables are strictly represented by &#x27;${VARIABLE_NAME}&#x27;, and the value of the variable comes from the data itself, that is, through &#x27;sink.multiple.format&#x27;
a metadata field of a specified Format, or a physical field in the data.</p><p>Examples of &#x27;topic-parttern&#x27; are as follows:</p><ul><li>&#x27;sink.multiple.format&#x27; is &#x27;canal-json&#x27;:</li></ul><p>The upstream data is:</p><div class="codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-text codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;data&quot;: [</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: &quot;111&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;Big 2-wheel scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: &quot;5.18&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ],</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;database&quot;: &quot;inventory&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;es&quot;: 1589373560000,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 9,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;isDdl&quot;: false,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;mysqlType&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: &quot;INTEGER&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;VARCHAR(255)&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;VARCHAR(512)&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: &quot;FLOAT&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;old&quot;: [</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: &quot;5.15&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ],</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;pkNames&quot;: [</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ],</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;sql&quot;: &quot;&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;sqlType&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 4,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: 12,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: 12,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: 7</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;table&quot;: &quot;products&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;ts&quot;: 1589373560798,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;type&quot;: &quot;UPDATE&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">} </span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>&#x27;topic-pattern&#x27; is &#x27;{database}_${table}&#x27;, and the extracted topic is &#x27;inventory_products&#x27; (&#x27;database&#x27;, &#x27;table&#x27; are metadata fields)</p><p>&#x27;topic-pattern&#x27; is &#x27;{database}<em>${table}</em>${id}&#x27;, and the extracted topic is &#x27;inventory_products_111&#x27; (&#x27;database&#x27;, &#x27;table&#x27; are metadata fields, and &#x27;id&#x27; are physical fields)</p><ul><li>&#x27;sink.multiple.format&#x27; is &#x27;debezium-json&#x27;:</li></ul><p>The upstream data is:</p><div class="codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-text codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;before&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 4,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;Big 2-wheel scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: 5.18</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;after&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 4,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;Big 2-wheel scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: 5.15</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;source&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;db&quot;: &quot;inventory&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;table&quot;: &quot;products&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;op&quot;: &quot;u&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;ts_ms&quot;: 1589362330904,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;transaction&quot;: null</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>&#x27;topic-pattern&#x27; is &#x27;{source.db}_${source.table}&#x27;, and the extracted topic is &#x27;inventory_products&#x27; (&#x27;source.db&#x27;, &#x27;source.table&#x27; are metadata fields)</p><p>&#x27;topic-pattern&#x27; is &#x27;{source.db}<em>${source.table}</em>${id}&#x27;, and the extracted topic is &#x27;inventory_products_4&#x27; (&#x27;source.db&#x27;, &#x27;source.table&#x27; are metadata fields, and &#x27;id&#x27; are physical fields)</p><h4 class="anchor anchorWithStickyNavbar_LWe7" id="dynamic-partition-extraction">Dynamic Partition Extraction<a href="#dynamic-partition-extraction" class="hash-link" aria-label="Direct link to Dynamic Partition Extraction" title="Direct link to Dynamic Partition Extraction"></a></h4><p>Dynamic partition extraction is to extract Partition from data by parsing partition pattern, which is similar to dynamic topic extraction.
To support dynamic extraction of topics, you need to set the option &#x27;sink.partitioner&#x27; to &#x27;raw-hash&#x27;
and option &#x27;sink.multiple.partition-pattern&#x27;, Kafka Load Node will parse &#x27;sink.multiple.partition-pattern&#x27;
as the partition key, hash the partition key and take the remainder of the partition size as the final partition,
If parsing fails, it will return null and execute Kafka&#x27;s default partitioning strategy. &#x27;sink.multiple.partition-pattern&#x27;
support constants, variables and primary keys. Constants are string constants. Variables are strictly represented by &#x27;${VARIABLE_NAME}&#x27;, the value of the variable comes from the data itself,
that is, it can be a metadata field of a format specified by &#x27;sink.multiple.format&#x27;, or it can be a physical field in the data.
The primary key is a special constant &#x27;PRIMARY_KEY&#x27;, which extracts the primary key value of the record based on a certain format data format.</p><p>Notes: Kafka dynamic partition extraction based on &#x27;PRIMARY_KEY&#x27; has a limitation that the primary key information needs to be specified in the data,
For example, if Format is &#x27;canal-json&#x27;, then its primary key Key is &#x27;pkNames&#x27;. In addition, because format &#x27;debezium-json&#x27; has no definition of primary key, here
we agree that the primary key of &#x27;debezium-json&#x27; is also &#x27;pkNames&#x27; and is included in &#x27;source&#x27; like other metadata fields such as &#x27;table&#x27; and &#x27;db&#x27;,
If partitioning by primary key is used, and the format is &#x27;debezium-json&#x27;, you need to ensure that the real data meets the above conventions.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="data-type-mapping">Data Type Mapping<a href="#data-type-mapping" class="hash-link" aria-label="Direct link to Data Type Mapping" title="Direct link to Data Type Mapping"></a></h2><p>Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">Formats</a> pages for more details.</p></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/inlong-website/edit/master/versioned_docs/version-1.5.0/data_node/load_node/kafka.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/docs/1.5.0/data_node/load_node/iceberg"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">Iceberg</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/docs/1.5.0/data_node/load_node/mysql"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">MySQL</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#overview" class="table-of-contents__link toc-highlight">Overview</a></li><li><a href="#supported-version" class="table-of-contents__link toc-highlight">Supported Version</a></li><li><a href="#dependencies" class="table-of-contents__link toc-highlight">Dependencies</a><ul><li><a href="#maven-dependency" class="table-of-contents__link toc-highlight">Maven dependency</a></li></ul></li><li><a href="#how-to-create-a-kafka-load-node" class="table-of-contents__link toc-highlight">How to create a Kafka Load Node</a><ul><li><a href="#usage-for-sql-api" class="table-of-contents__link toc-highlight">Usage for SQL API</a></li><li><a href="#usage-for-inlong-dashboard" class="table-of-contents__link toc-highlight">Usage for InLong Dashboard</a></li><li><a href="#usage-for-inlong-manager-client" class="table-of-contents__link toc-highlight">Usage for InLong Manager Client</a></li></ul></li><li><a href="#kafka-load-node-options" class="table-of-contents__link toc-highlight">Kafka Load Node Options</a></li><li><a href="#available-metadata" class="table-of-contents__link toc-highlight">Available Metadata</a></li><li><a href="#features" class="table-of-contents__link toc-highlight">Features</a><ul><li><a href="#support-dynamic-schema-writing" class="table-of-contents__link toc-highlight">Support Dynamic Schema Writing</a></li></ul></li><li><a href="#data-type-mapping" class="table-of-contents__link toc-highlight">Data Type Mapping</a></li></ul></div></div></div></div></main></div></div><footer class="footer"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">Events</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apachecon.com/" target="_blank" rel="noopener noreferrer" class="footer__link-item">ApacheCon<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a class="acevent" data-format="square" data-mode="dark" data-event="random"></a></li></ul></div><div class="col footer__col"><div class="footer__title">Community</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheInlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://inlong.apache.org/img/apache-inlong-wechat.jpg" target="_blank" rel="noopener noreferrer" class="footer__link-item">WeChat<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="mailto:dev@inlong.apache.org" target="_blank" rel="noopener noreferrer" class="footer__link-item">Email</a></li></ul></div><div class="col footer__col"><div class="footer__title">More</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/blog">Blog</a></li><li class="footer__item"><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><img src="/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></div><div class="footer__copyright"><div style="font-family: Avenir-Medium;font-size: 14px;color: #999;">
<div>Copyright © 2020-2024 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div>
<div style="margin-top: 20px; padding-top: 20px; border-top: 1px solid #666;line-height: 20px;">The Apache Software Foundation Apache InLong, InLong, Apache, the Apache feather, and the Apache InLong project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div>
</div></div></div></div></footer></div>
<script src="/assets/js/runtime~main.63c98e82.js"></script>
<script src="/assets/js/main.070aef2a.js"></script>
</body>
</html>